[TASK-240] Fire-and-forget behaviour for efficient batching#258
[TASK-240] Fire-and-forget behaviour for efficient batching#258luoyuxia merged 1 commit intoapache:mainfrom
Conversation
bb0468a to
43d79bb
Compare
43d79bb to
f06e7aa
Compare
|
@luoyuxia @leekeiabstraction PTAL 🙏 |
There was a problem hiding this comment.
Pull request overview
This PR implements fire-and-forget write semantics for efficient batching in Fluss, addressing issue #240. Write operations (upsert, delete, append) now return a WriteResultFuture immediately after queueing records instead of blocking until server acknowledgment. This enables users to queue multiple writes that accumulate in the RecordAccumulator and are sent in batches by the background Sender, similar to rdkafka's DeliveryFuture pattern.
Changes:
- Introduced
WriteResultFuturewrapper type that can be awaited for per-record acknowledgment or dropped for fire-and-forget semantics - Updated Rust API: write methods return
Result<WriteResultFuture>instead of blocking on acknowledgment - Updated Python bindings to support nested await pattern for both fire-and-forget and per-record acknowledgment
- Updated C++ bindings with
WriteResultclass and dual overloads for both usage patterns
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/client/write/mod.rs | Added WriteResultFuture type that wraps ResultHandle in a pollable future |
| crates/fluss/src/client/table/upsert.rs | Updated upsert() and delete() to return WriteResultFuture; marked unused result types with #[allow(dead_code)] |
| crates/fluss/src/client/table/append.rs | Updated append() and append_arrow_batch() to return WriteResultFuture |
| crates/fluss/tests/integration/kv_table.rs | Updated tests to demonstrate both fire-and-forget (with flush) and per-record acknowledgment patterns |
| crates/fluss/tests/integration/log_table.rs | Added explicit flush calls after fire-and-forget append operations |
| crates/examples/src/example_table.rs | Simplified example to use fire-and-forget pattern with flush; removed unnecessary try_join! |
| crates/examples/src/example_kv_table.rs | Demonstrates both patterns: fire-and-forget bulk inserts and per-record ack for updates/deletes |
| crates/examples/src/example_partitioned_kv_table.rs | Shows flush after bulk inserts and double-await for critical operations |
| bindings/python/src/upsert.rs | Implemented nested coroutine pattern using Python::attach and future_into_py for dual await support |
| bindings/python/src/table.rs | Updated AppendWriter methods to return nested coroutines; modified write_arrow to drop ack handles |
| bindings/python/example/example.py | Comprehensive example showing both fire-and-forget batch writes and per-record acknowledgment |
| bindings/cpp/src/lib.rs | Modified FFI layer: append() returns Result<Box<WriteResult>> instead of FfiResult |
| bindings/cpp/src/table.cpp | Implemented WriteResult class with move semantics and Wait() method; added dual Append() overloads |
| bindings/cpp/include/fluss.hpp | Added WriteResult class declaration with RAII semantics |
| bindings/cpp/examples/example.cpp | Demonstrates both fire-and-forget and per-record acknowledgment patterns |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
We may wish to refactor queueing to use sync to avoid |
Agree on it. |
|
Created an issue for this: #263 |
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. LGTM!
Summary
closes #240
Fire-and-forget write semantics for efficient batching
Write operations (
upsert,delete,append) now return aWriteResultFutureimmediately after queueing the record, instead of blocking until server acknowledgment. This enables efficient batching and records accumulate in theRecordAccumulatorand are sent in batches by the backgroundSender, similar to rdkafka'sDeliveryFuture.Usage patterns
Fire-and-forget — queue writes, flush at end:
Per-record acknowledgment — double-await for read-after-write consistency:
Notes: