-
Notifications
You must be signed in to change notification settings - Fork 26
[TASK-263] Remove async from queueing while writing #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@luoyuxia @leekeiabstraction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR removes unnecessary async from the write queueing path by switching internal queue synchronization to parking_lot::Mutex, making upsert(), delete(), and append() queue synchronously while still allowing async acknowledgement via a returned handle/future; async behavior is retained for flush() since it performs network I/O.
Changes:
- Replace async queueing (
tokio::sync::Mutex) with synchronous locking (parking_lot::Mutex) in the core write accumulator and writer client. - Update Rust API call sites (tests/examples/docs) to remove the first
awaiton queueing methods while keeping per-record ack via awaiting the returned handle. - Apply the same synchronous-queue + async-ack pattern to Python and C++ bindings (including a new Python
WriteResultHandle).
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/client/write/accumulator.rs | Switch queue locks to parking_lot::Mutex and make queue operations synchronous (append/ready/drain/re_enqueue). |
| crates/fluss/src/client/write/writer_client.rs | Make send() synchronous and remove awaits on accumulation. |
| crates/fluss/src/client/write/sender.rs | Adjust sender to use synchronous accumulator APIs; keep async for network/metadata updates. |
| crates/fluss/src/client/write/mod.rs | Update docs for new call pattern around WriteResultFuture. |
| crates/fluss/src/client/table/append.rs | Make append/append_arrow_batch synchronous queueing returning WriteResultFuture. |
| crates/fluss/src/client/table/upsert.rs | Make upsert/delete synchronous queueing returning WriteResultFuture. |
| crates/fluss/tests/integration/log_table.rs | Remove await from queueing calls to match the new synchronous API. |
| crates/fluss/tests/integration/kv_table.rs | Remove await from queueing calls and keep per-record ack via awaiting the returned handle. |
| crates/examples/src/example_table.rs | Update example to use synchronous queueing + async flush. |
| crates/examples/src/example_kv_table.rs | Update KV example for synchronous queueing + optional per-record ack. |
| crates/examples/src/example_partitioned_kv_table.rs | Update partitioned KV example for synchronous queueing + optional per-record ack. |
| bindings/python/src/write_handle.rs | Add Python WriteResultHandle with wait() for per-record ack. |
| bindings/python/src/upsert.rs | Make Python upsert/delete synchronous and return WriteResultHandle; keep flush() async. |
| bindings/python/src/table.rs | Make Python append APIs return WriteResultHandle; convert flush() to async/awaitable. |
| bindings/python/src/lib.rs | Export/register WriteResultHandle. |
| bindings/python/example/example.py | Update Python example to reflect synchronous queueing and awaiting flush()/handle.wait(). |
| bindings/cpp/src/lib.rs | Remove runtime blocking for append queueing; keep blocking for waiting/flush. |
| README.md | Update Rust snippet for synchronous queueing and add flush. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
fixed failing tests, should be fine now |
cefd4b7 to
0802788
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fresh-borzoni Thanks for the pr. LGTM!
Summary
closes #263
tokio::sync::Mutexwithparking_lot::Mutexin the core accumulator and writer clientupsert(),delete(),append()now queue writes synchronously (noawaitneeded) and return aWriteResultHandleWriteResultHandlesupports both fire-and-forget (ignore the handle) and per-record ack (await handle.wait())flush()remains async (real network I/O)