diff --git a/README.md b/README.md index ee9478c6..5e771d9d 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,8 @@ pub async fn main() -> Result<()> { let table = conn.get_table(&table_path).await; let append_writer = table.new_append().create_writer(); let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, ["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap(); - append_writer.append(batch).await?; + append_writer.append(batch)?; + append_writer.flush().await?; println!("Start to scan log records......"); // 4: scan the records let log_scanner = table.new_scan().create_log_scanner(); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 7944c100..4957c993 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -761,8 +761,9 @@ impl AppendWriter { fn append(&mut self, row: &ffi::FfiGenericRow) -> Result, String> { let generic_row = types::ffi_row_to_core(row); - let result_future = RUNTIME - .block_on(async { self.inner.append(&generic_row).await }) + let result_future = self + .inner + .append(&generic_row) .map_err(|e| format!("Failed to append: {e}"))?; Ok(Box::new(WriteResult { @@ -789,7 +790,7 @@ impl WriteResult { Err(e) => err_result(1, e.to_string()), } } else { - ok_result() + err_result(1, "WriteResult already consumed".to_string()) } } } diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 1cabaa5f..d56879a4 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -184,7 +184,7 @@ async def main(): # Test 3: Append single rows with Date, Time, Timestamp, Decimal print("\n--- Testing single row append with temporal/decimal types ---") # Dict input with all types including Date, Time, Timestamp, Decimal - await append_writer.append( + append_writer.append( { "id": 8, "name": "Helen", @@ -200,7 +200,7 @@ async def main(): print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)") # List input with all types - await append_writer.append( + append_writer.append( [ 9, "Ivan", @@ -242,7 +242,7 @@ async def main(): # Flush all pending data print("\n--- Flushing data ---") - append_writer.flush() + await append_writer.flush() print("Successfully flushed data") # Demo: Check offsets after writes @@ -422,9 +422,9 @@ async def main(): upsert_writer = pk_table.new_upsert() print(f"Created upsert writer: {upsert_writer}") - # Fire-and-forget: queue writes without waiting for individual acks. + # Fire-and-forget: queue writes synchronously, flush at end. # Records are batched internally for efficiency. - await upsert_writer.upsert( + upsert_writer.upsert( { "user_id": 1, "name": "Alice", @@ -441,7 +441,7 @@ async def main(): ) print("Queued user_id=1 (Alice)") - await upsert_writer.upsert( + upsert_writer.upsert( { "user_id": 2, "name": "Bob", @@ -456,7 +456,7 @@ async def main(): ) print("Queued user_id=2 (Bob)") - await upsert_writer.upsert( + upsert_writer.upsert( { "user_id": 3, "name": "Charlie", @@ -479,7 +479,7 @@ async def main(): # the server confirms this specific write, useful when you need to # read-after-write or verify critical updates. print("\n--- Testing Upsert (per-record acknowledgment) ---") - ack = await upsert_writer.upsert( + handle = upsert_writer.upsert( { "user_id": 1, "name": "Alice Updated", @@ -494,7 +494,7 @@ async def main(): "balance": Decimal("2345.67"), } ) - await ack # wait for server acknowledgment before proceeding + await handle.wait() # wait for server acknowledgment print("Updated user_id=1 (Alice -> Alice Updated) — server acknowledged") except Exception as e: @@ -554,9 +554,8 @@ async def main(): try: upsert_writer = pk_table.new_upsert() - # Per-record ack for delete — await the handle to confirm deletion - ack = await upsert_writer.delete({"user_id": 3}) - await ack + handle = upsert_writer.delete({"user_id": 3}) + await handle.wait() print("Deleted user_id=3 — server acknowledged") lookuper = pk_table.new_lookup() @@ -670,12 +669,12 @@ async def main(): partitioned_writer = await partitioned_table.new_append_writer() # Append data to US partition - await partitioned_writer.append({"id": 1, "region": "US", "value": 100}) - await partitioned_writer.append({"id": 2, "region": "US", "value": 200}) + partitioned_writer.append({"id": 1, "region": "US", "value": 100}) + partitioned_writer.append({"id": 2, "region": "US", "value": 200}) # Append data to EU partition - await partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) - await partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) - partitioned_writer.flush() + partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) + partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) + await partitioned_writer.flush() print("\nWrote 4 records (2 to US, 2 to EU)") # Demo: list_partition_offsets diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index ae7f6c50..f1f4ee6b 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -30,6 +30,7 @@ mod metadata; mod table; mod upsert; mod utils; +mod write_handle; pub use admin::*; pub use config::*; @@ -40,6 +41,7 @@ pub use metadata::*; pub use table::*; pub use upsert::*; pub use utils::*; +pub use write_handle::*; static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -88,6 +90,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Register constants m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index e987d43d..8af6b13e 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -533,76 +533,43 @@ impl AppendWriter { let batch_list: Vec> = batches.extract(py)?; for batch in batch_list { - // Drop the ack coroutine — fire-and-forget - let _ = self.write_arrow_batch(py, batch)?; + // Drop the handle — fire-and-forget for bulk writes + drop(self.write_arrow_batch(py, batch)?); } Ok(()) } - /// Write Arrow batch data + /// Write Arrow batch data. /// /// Returns: - /// A coroutine that can be awaited for server acknowledgment, - /// or ignored for fire-and-forget behavior. - pub fn write_arrow_batch<'py>( - &self, - py: Python<'py>, - batch: Py, - ) -> PyResult> { + /// WriteResultHandle that can be ignored (fire-and-forget) or + /// awaited via `handle.wait()` for server acknowledgment. + pub fn write_arrow_batch(&self, py: Python, batch: Py) -> PyResult { // This shares the underlying Arrow buffers without copying data let batch_bound = batch.bind(py); let rust_batch: ArrowRecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound) .map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?; - let inner = self.inner.clone(); - - future_into_py(py, async move { - let result_future = inner - .append_arrow_batch(rust_batch) - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - - Python::attach(|py| { - future_into_py(py, async move { - result_future - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Ok(()) - }) - .map(|bound| bound.unbind()) - }) - }) + let result_future = self + .inner + .append_arrow_batch(rust_batch) + .map_err(|e| FlussError::new_err(e.to_string()))?; + Ok(WriteResultHandle::new(result_future)) } - /// Append a single row to the table + /// Append a single row to the table. /// /// Returns: - /// A coroutine that can be awaited for server acknowledgment, - /// or ignored for fire-and-forget behavior. - pub fn append<'py>( - &self, - py: Python<'py>, - row: &Bound<'py, PyAny>, - ) -> PyResult> { + /// WriteResultHandle that can be ignored (fire-and-forget) or + /// awaited via `handle.wait()` for server acknowledgment. + pub fn append(&self, row: &Bound<'_, PyAny>) -> PyResult { let generic_row = python_to_generic_row(row, &self.table_info)?; - let inner = self.inner.clone(); - - future_into_py(py, async move { - let result_future = inner - .append(&generic_row) - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Python::attach(|py| { - future_into_py(py, async move { - result_future - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Ok(()) - }) - .map(|bound| bound.unbind()) - }) - }) + let result_future = self + .inner + .append(&generic_row) + .map_err(|e| FlussError::new_err(e.to_string()))?; + Ok(WriteResultHandle::new(result_future)) } /// Write Pandas DataFrame data @@ -636,16 +603,13 @@ impl AppendWriter { } /// Flush any pending data - pub fn flush(&self, py: Python) -> PyResult<()> { + pub fn flush<'py>(&self, py: Python<'py>) -> PyResult> { let inner = self.inner.clone(); - // Release the GIL before blocking on I/O - py.detach(|| { - TOKIO_RUNTIME.block_on(async { - inner - .flush() - .await - .map_err(|e| FlussError::new_err(e.to_string())) - }) + future_into_py(py, async move { + inner + .flush() + .await + .map_err(|e| FlussError::new_err(e.to_string())) }) } diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs index 5c10dedc..0aa69d74 100644 --- a/bindings/python/src/upsert.rs +++ b/bindings/python/src/upsert.rs @@ -18,26 +18,24 @@ use crate::table::{python_pk_to_generic_row, python_to_generic_row}; use crate::*; use pyo3_async_runtimes::tokio::future_into_py; -use std::sync::Arc; -use tokio::sync::Mutex; +use std::sync::{Arc, Mutex}; /// Writer for upserting and deleting data in a Fluss primary key table. /// -/// Each upsert/delete operation queues the write and returns a coroutine -/// that can be awaited for per-record acknowledgment, or ignored for -/// fire-and-forget semantics (call `flush()` to ensure delivery). +/// Each upsert/delete operation synchronously queues the write. Call `flush()` +/// to ensure all queued writes are delivered to the server. /// /// # Example: /// writer = table.new_upsert() /// -/// # Fire-and-forget with flush -/// await writer.upsert(row1) -/// await writer.upsert(row2) +/// # Fire-and-forget — ignore the returned handle +/// writer.upsert(row1) +/// writer.upsert(row2) /// await writer.flush() /// -/// # Or await individual acknowledgment -/// ack = await writer.upsert(row3) -/// await ack +/// # Per-record ack — call wait() on the handle +/// handle = writer.upsert(critical_row) +/// await handle.wait() #[pyclass] pub struct UpsertWriter { inner: Arc, @@ -46,7 +44,7 @@ pub struct UpsertWriter { struct UpsertWriterInner { table_upsert: fcore::client::TableUpsert, /// Lazily initialized writer - created on first write operation - writer: Mutex>, + writer: Mutex>>, table_info: fcore::metadata::TableInfo, } @@ -57,100 +55,65 @@ impl UpsertWriter { /// If a row with the same primary key exists, it will be updated. /// Otherwise, a new row will be inserted. /// + /// The write is queued synchronously. Call `flush()` to ensure delivery. + /// /// Args: /// row: A dict, list, or tuple containing the row data. /// For dict: keys are column names, values are column values. /// For list/tuple: values must be in schema order. - /// - /// Returns: - /// A coroutine that can be awaited for server acknowledgment, - /// or ignored for fire-and-forget behavior. - pub fn upsert<'py>( - &self, - py: Python<'py>, - row: &Bound<'_, PyAny>, - ) -> PyResult> { + pub fn upsert(&self, row: &Bound<'_, PyAny>) -> PyResult { let generic_row = python_to_generic_row(row, &self.inner.table_info)?; - let inner = self.inner.clone(); - - future_into_py(py, async move { - let mut guard = inner.get_or_create_writer().await?; - let writer = guard.as_mut().unwrap(); - let result_future = writer - .upsert(&generic_row) - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Python::attach(|py| { - future_into_py(py, async move { - result_future - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Ok(()) - }) - .map(|bound| bound.unbind()) - }) - }) + let writer = self.inner.get_or_create_writer()?; + let result_future = writer + .upsert(&generic_row) + .map_err(|e| FlussError::new_err(e.to_string()))?; + Ok(WriteResultHandle::new(result_future)) } /// Delete a row from the table by primary key. /// + /// The delete is queued synchronously. Call `flush()` to ensure delivery. + /// /// Args: /// pk: A dict, list, or tuple containing only the primary key values. /// For dict: keys are PK column names. /// For list/tuple: values in PK column order. - /// - /// Returns: - /// A coroutine that can be awaited for server acknowledgment, - /// or ignored for fire-and-forget behavior. - pub fn delete<'py>( - &self, - py: Python<'py>, - pk: &Bound<'_, PyAny>, - ) -> PyResult> { + pub fn delete(&self, pk: &Bound<'_, PyAny>) -> PyResult { let generic_row = python_pk_to_generic_row(pk, &self.inner.table_info)?; - let inner = self.inner.clone(); - - future_into_py(py, async move { - let mut guard = inner.get_or_create_writer().await?; - let writer = guard.as_mut().unwrap(); - let result_future = writer - .delete(&generic_row) - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Python::attach(|py| { - future_into_py(py, async move { - result_future - .await - .map_err(|e| FlussError::new_err(e.to_string()))?; - Ok(()) - }) - .map(|bound| bound.unbind()) - }) - }) + let writer = self.inner.get_or_create_writer()?; + let result_future = writer + .delete(&generic_row) + .map_err(|e| FlussError::new_err(e.to_string()))?; + Ok(WriteResultHandle::new(result_future)) } /// Flush all pending upsert/delete operations to the server. /// - /// This method sends all buffered operations and blocks until they are + /// This method sends all buffered operations and waits until they are /// acknowledged according to the writer's ack configuration. /// /// Returns: /// None on success pub fn flush<'py>(&self, py: Python<'py>) -> PyResult> { - let inner = self.inner.clone(); + // Clone the Arc out of the lock so we don't hold the guard across await + let writer = { + let guard = self + .inner + .writer + .lock() + .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?; + guard.as_ref().cloned() + }; future_into_py(py, async move { - let writer_guard = inner.writer.lock().await; - - if let Some(writer) = writer_guard.as_ref() { + if let Some(writer) = writer { writer .flush() .await .map_err(|e| FlussError::new_err(e.to_string())) } else { - // Nothing to flush - no writer was created yet Ok(()) } }) @@ -197,17 +160,18 @@ impl UpsertWriter { impl UpsertWriterInner { /// Get the cached writer or create one on first use. - async fn get_or_create_writer( - &self, - ) -> PyResult>> { - let mut guard = self.writer.lock().await; + fn get_or_create_writer(&self) -> PyResult> { + let mut guard = self + .writer + .lock() + .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))?; if guard.is_none() { let writer = self .table_upsert .create_writer() .map_err(|e| FlussError::new_err(e.to_string()))?; - *guard = Some(writer); + *guard = Some(Arc::new(writer)); } - Ok(guard) + Ok(guard.as_ref().unwrap().clone()) } } diff --git a/bindings/python/src/write_handle.rs b/bindings/python/src/write_handle.rs new file mode 100644 index 00000000..4f3ce997 --- /dev/null +++ b/bindings/python/src/write_handle.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; +use pyo3_async_runtimes::tokio::future_into_py; +use std::sync::Mutex; + +/// Handle for a pending write operation. +/// +/// Returned by `upsert()`, `delete()`, `append()`, etc. +/// Can be safely ignored for fire-and-forget semantics, +/// or awaited via `wait()` for per-record acknowledgment. +/// +/// # Example: +/// # Fire-and-forget — just ignore the handle +/// writer.upsert(row1) +/// writer.upsert(row2) +/// await writer.flush() +/// +/// # Per-record ack — call wait() +/// handle = writer.upsert(critical_row) +/// await handle.wait() +#[pyclass] +pub struct WriteResultHandle { + inner: Mutex>, +} + +impl WriteResultHandle { + pub fn new(future: fcore::client::WriteResultFuture) -> Self { + Self { + inner: Mutex::new(Some(future)), + } + } +} + +#[pymethods] +impl WriteResultHandle { + /// Wait for server acknowledgment of this specific write. + /// + /// Returns: + /// None on success, raises FlussError on failure. + pub fn wait<'py>(&self, py: Python<'py>) -> PyResult> { + let future = self + .inner + .lock() + .map_err(|e| FlussError::new_err(format!("Lock poisoned: {e}")))? + .take() + .ok_or_else(|| FlussError::new_err("WriteResultHandle already consumed"))?; + + future_into_py(py, async move { + future + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + Ok(()) + }) + } + + fn __repr__(&self) -> String { + let consumed = self.inner.lock().map(|g| g.is_none()).unwrap_or(false); + if consumed { + "WriteResultHandle(consumed)".to_string() + } else { + "WriteResultHandle(pending)".to_string() + } + } +} diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index 2fcb1342..3acf73f2 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -62,7 +62,7 @@ pub async fn main() -> Result<()> { row.set_field(0, id); row.set_field(1, name); row.set_field(2, age); - upsert_writer.upsert(&row).await?; + upsert_writer.upsert(&row)?; println!("Upserted: {row:?}"); } upsert_writer.flush().await?; @@ -85,7 +85,7 @@ pub async fn main() -> Result<()> { row.set_field(0, 1); row.set_field(1, "Verso"); row.set_field(2, 33i64); - upsert_writer.upsert(&row).await?.await?; + upsert_writer.upsert(&row)?.await?; println!("Updated: {row:?}"); let result = lookuper.lookup(&make_key(1)).await?; @@ -100,7 +100,7 @@ pub async fn main() -> Result<()> { // For delete, only primary key field needs to be set; other fields can remain null let mut row = GenericRow::new(3); row.set_field(0, 2); - upsert_writer.delete(&row).await?.await?; + upsert_writer.delete(&row)?.await?; println!("Deleted row with id=2"); let result = lookuper.lookup(&make_key(2)).await?; diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index feb8f05b..ee1f541f 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -74,7 +74,7 @@ pub async fn main() -> Result<()> { row.set_field(1, region); row.set_field(2, zone); row.set_field(3, score); - upsert_writer.upsert(&row).await?; + upsert_writer.upsert(&row)?; println!("Upserted: {row:?}"); } upsert_writer.flush().await?; @@ -102,7 +102,7 @@ pub async fn main() -> Result<()> { row.set_field(1, "APAC"); row.set_field(2, 1i64); row.set_field(3, 4321i64); - upsert_writer.upsert(&row).await?.await?; + upsert_writer.upsert(&row)?.await?; println!("Updated: {row:?}"); let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?; @@ -118,7 +118,7 @@ pub async fn main() -> Result<()> { row.set_field(0, 1002); row.set_field(1, "EMEA"); row.set_field(2, 2i64); - upsert_writer.delete(&row).await?.await?; + upsert_writer.delete(&row)?.await?; println!("Deleted: {row:?}"); let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?; diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index ee9bc7b3..199fce23 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -64,12 +64,12 @@ pub async fn main() -> Result<()> { let table = conn.get_table(&table_path).await?; let append_writer = table.new_append()?.create_writer()?; // Fire-and-forget: queue writes then flush - append_writer.append(&row).await?; + append_writer.append(&row)?; let mut row = GenericRow::new(3); row.set_field(0, 233333); row.set_field(1, "tt44"); row.set_field(2, 987_654_321_987i64); - append_writer.append(&row).await?; + append_writer.append(&row)?; append_writer.flush().await?; // scan rows diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index e26b61ad..942253fa 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -80,7 +80,7 @@ impl AppendWriter { /// # Returns /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). - pub async fn append(&self, row: &R) -> Result { + pub fn append(&self, row: &R) -> Result { let physical_table_path = Arc::new(get_physical_path( &self.table_path, self.partition_getter.as_ref(), @@ -92,7 +92,7 @@ impl AppendWriter { self.table_info.schema_id, row, ); - let result_handle = self.writer_client.send(&record).await?; + let result_handle = self.writer_client.send(&record)?; Ok(WriteResultFuture::new(result_handle)) } @@ -107,7 +107,7 @@ impl AppendWriter { /// # Returns /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). - pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result { + pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result { let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 { let first_row = ColumnarRow::new(Arc::new(batch.clone())); Arc::new(get_physical_path( @@ -125,7 +125,7 @@ impl AppendWriter { self.table_info.schema_id, batch, ); - let result_handle = self.writer_client.send(&record).await?; + let result_handle = self.writer_client.send(&record)?; Ok(WriteResultFuture::new(result_handle)) } diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs index a1646cc3..7057b901 100644 --- a/crates/fluss/src/client/table/upsert.rs +++ b/crates/fluss/src/client/table/upsert.rs @@ -354,7 +354,7 @@ impl UpsertWriter { /// # Returns /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). - pub async fn upsert(&self, row: &R) -> Result { + pub fn upsert(&self, row: &R) -> Result { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; @@ -379,7 +379,7 @@ impl UpsertWriter { Some(row_bytes), ); - let result_handle = self.writer_client.send(&write_record).await?; + let result_handle = self.writer_client.send(&write_record)?; Ok(WriteResultFuture::new(result_handle)) } @@ -395,7 +395,7 @@ impl UpsertWriter { /// # Returns /// A [`WriteResultFuture`] that can be awaited to wait for server acknowledgment, /// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery). - pub async fn delete(&self, row: &R) -> Result { + pub fn delete(&self, row: &R) -> Result { self.check_field_count(row)?; let (key, bucket_key) = self.get_keys(row)?; @@ -415,7 +415,7 @@ impl UpsertWriter { None, ); - let result_handle = self.writer_client.send(&write_record).await?; + let result_handle = self.writer_client.send(&write_record)?; Ok(WriteResultFuture::new(result_handle)) } } diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 5eae868d..2c364524 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -25,11 +25,11 @@ use crate::metadata::{PhysicalTablePath, TableBucket}; use crate::util::current_time_ms; use crate::{BucketId, PartitionId, TableId}; use dashmap::DashMap; +use parking_lot::Mutex; use parking_lot::RwLock; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicI32, AtomicI64, Ordering}; -use tokio::sync::Mutex; // Type alias to simplify complex nested types type BucketBatches = Vec<(BucketId, Arc>>)>; @@ -144,7 +144,7 @@ impl RecordAccumulator { )) } - pub async fn append( + pub fn append( &self, record: &WriteRecord<'_>, bucket_id: BucketId, @@ -180,7 +180,7 @@ impl RecordAccumulator { .clone() }; - let mut dq_guard = dq.lock().await; + let mut dq_guard = dq.lock(); if let Some(append_result) = self.try_append(record, &mut dq_guard)? { return Ok(append_result); } @@ -193,7 +193,7 @@ impl RecordAccumulator { self.append_new_batch(cluster, record, &mut dq_guard) } - pub async fn ready(&self, cluster: &Arc) -> Result { + pub fn ready(&self, cluster: &Arc) -> Result { // Snapshot just the Arcs we need, avoiding cloning the entire BucketAndWriteBatches struct let entries: Vec<(Arc, Option, BucketBatches)> = self .write_batches @@ -216,18 +216,16 @@ impl RecordAccumulator { let mut unknown_leader_tables = HashSet::new(); for (physical_table_path, mut partition_id, bucket_batches) in entries { - next_ready_check_delay_ms = self - .bucket_ready( - &physical_table_path, - physical_table_path.get_partition_name().is_some(), - &mut partition_id, - bucket_batches, - &mut ready_nodes, - &mut unknown_leader_tables, - cluster, - next_ready_check_delay_ms, - ) - .await? + next_ready_check_delay_ms = self.bucket_ready( + &physical_table_path, + physical_table_path.get_partition_name().is_some(), + &mut partition_id, + bucket_batches, + &mut ready_nodes, + &mut unknown_leader_tables, + cluster, + next_ready_check_delay_ms, + )? } Ok(ReadyCheckResult { @@ -238,7 +236,7 @@ impl RecordAccumulator { } #[allow(clippy::too_many_arguments)] - async fn bucket_ready( + fn bucket_ready( &self, physical_table_path: &Arc, is_partitioned_table: bool, @@ -274,7 +272,7 @@ impl RecordAccumulator { } for (bucket_id, batch) in bucket_batches { - let batch_guard = batch.lock().await; + let batch_guard = batch.lock(); if batch_guard.is_empty() { continue; } @@ -316,7 +314,7 @@ impl RecordAccumulator { next_ready_check_delay_ms } - pub async fn drain( + pub fn drain( &self, cluster: Arc, nodes: &HashSet, @@ -327,9 +325,7 @@ impl RecordAccumulator { } let mut batches = HashMap::new(); for node in nodes { - let ready = self - .drain_batches_for_one_node(&cluster, node, max_size) - .await?; + let ready = self.drain_batches_for_one_node(&cluster, node, max_size)?; if !ready.is_empty() { batches.insert(node.id(), ready); } @@ -338,7 +334,7 @@ impl RecordAccumulator { Ok(batches) } - async fn drain_batches_for_one_node( + fn drain_batches_for_one_node( &self, cluster: &Cluster, node: &ServerNode, @@ -352,15 +348,13 @@ impl RecordAccumulator { return Ok(ready); } - // Get the start index without holding the lock across awaits let start = { - let mut nodes_drain_index_guard = self.nodes_drain_index.lock().await; + let mut nodes_drain_index_guard = self.nodes_drain_index.lock(); let drain_index = nodes_drain_index_guard.entry(node.id()).or_insert(0); *drain_index % buckets.len() }; let mut current_index = start; - // Assigned at the start of each loop iteration (line 323), used after loop (line 376) let mut last_processed_index; loop { @@ -383,7 +377,7 @@ impl RecordAccumulator { if let Some(deque) = deque { let mut maybe_batch = None; { - let mut batch_lock = deque.lock().await; + let mut batch_lock = deque.lock(); if !batch_lock.is_empty() { let first_batch = batch_lock.front().unwrap(); @@ -419,7 +413,7 @@ impl RecordAccumulator { // Store the last processed index to maintain round-robin fairness { - let mut nodes_drain_index_guard = self.nodes_drain_index.lock().await; + let mut nodes_drain_index_guard = self.nodes_drain_index.lock(); nodes_drain_index_guard.insert(node.id(), last_processed_index); } @@ -430,7 +424,7 @@ impl RecordAccumulator { self.incomplete_batches.write().remove(&batch_id); } - pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) { + pub fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) { ready_write_batch.write_batch.re_enqueued(); let physical_table_path = ready_write_batch.write_batch.physical_table_path(); let bucket_id = ready_write_batch.table_bucket.bucket_id(); @@ -456,7 +450,7 @@ impl RecordAccumulator { .clone() }; - let mut dq_guard = dq.lock().await; + let mut dq_guard = dq.lock(); dq_guard.push_front(ready_write_batch.write_batch); } @@ -604,20 +598,18 @@ mod tests { }; let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); - accumulator.append(&record, 0, &cluster, false).await?; + accumulator.append(&record, 0, &cluster, false)?; let server = cluster.get_tablet_server(1).expect("server"); let nodes = HashSet::from([server.clone()]); - let mut batches = accumulator - .drain(cluster.clone(), &nodes, 1024 * 1024) - .await?; + let mut batches = accumulator.drain(cluster.clone(), &nodes, 1024 * 1024)?; let mut drained = batches.remove(&1).expect("drained batches"); let batch = drained.pop().expect("batch"); assert_eq!(batch.write_batch.attempts(), 0); - accumulator.re_enqueue(batch).await; + accumulator.re_enqueue(batch); - let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024).await?; + let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?; let mut drained = batches.remove(&1).expect("drained batches"); let batch = drained.pop().expect("batch"); assert_eq!(batch.write_batch.attempts(), 1); diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index 49eff05d..2c848d35 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -205,8 +205,8 @@ impl ResultHandle { /// A future that represents a pending write operation. /// /// This type implements [`Future`], allowing users to either: -/// 1. Await immediately to block on acknowledgment: `writer.upsert(&row).await?.await?` -/// 2. Fire-and-forget with later flush: `writer.upsert(&row).await?; writer.flush().await?` +/// 1. Await immediately to block on acknowledgment: `writer.upsert(&row)?.await?` +/// 2. Fire-and-forget with later flush: `writer.upsert(&row)?; writer.flush().await?` /// /// This pattern is similar to rdkafka's `DeliveryFuture` and allows for efficient batching /// when users don't need immediate per-record acknowledgment. diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index f336d0cd..069f2d2b 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -78,7 +78,7 @@ impl Sender { async fn run_once(&self) -> Result<()> { let cluster = self.metadata.get_cluster(); - let ready_check_result = self.accumulator.ready(&cluster).await?; + let ready_check_result = self.accumulator.ready(&cluster)?; // Update metadata if needed if !ready_check_result.unknown_leader_tables.is_empty() { @@ -124,14 +124,11 @@ impl Sender { return Ok(()); } - let batches = self - .accumulator - .drain( - cluster.clone(), - &ready_check_result.ready_nodes, - self.max_request_size, - ) - .await?; + let batches = self.accumulator.drain( + cluster.clone(), + &ready_check_result.ready_nodes, + self.max_request_size, + )?; if !batches.is_empty() { self.add_to_inflight_batches(&batches); @@ -233,8 +230,7 @@ impl Sender { self.handle_batches_with_local_error( request_batches, format!("Failed to build write request: {e}"), - ) - .await?; + )?; continue; } }; @@ -377,9 +373,8 @@ impl Sender { .error_message() .cloned() .unwrap_or_else(|| error.message().to_string()); - if let Some(physical_table_path) = self - .handle_write_batch_error(ready_batch, error, message) - .await? + if let Some(physical_table_path) = + self.handle_write_batch_error(ready_batch, error, message)? { invalid_metadata_tables .insert(physical_table_path.get_table_path().clone()); @@ -392,14 +387,11 @@ impl Sender { for bucket in pending_buckets { if let Some(ready_batch) = records_by_bucket.remove(&bucket) { - if let Some(physical_table_path) = self - .handle_write_batch_error( - ready_batch, - FlussError::UnknownServerError, - format!("Missing response for table bucket {bucket}"), - ) - .await? - { + if let Some(physical_table_path) = self.handle_write_batch_error( + ready_batch, + FlussError::UnknownServerError, + format!("Missing response for table bucket {bucket}"), + )? { invalid_metadata_tables.insert(physical_table_path.get_table_path().clone()); invalid_physical_table_paths.insert(physical_table_path); } @@ -438,9 +430,8 @@ impl Sender { let mut invalid_physical_table_paths: HashSet> = HashSet::new(); for batch in batches { - if let Some(physical_table_path) = self - .handle_write_batch_error(batch, error, message.clone()) - .await? + if let Some(physical_table_path) = + self.handle_write_batch_error(batch, error, message.clone())? { invalid_metadata_tables.insert(physical_table_path.get_table_path().clone()); invalid_physical_table_paths.insert(physical_table_path); @@ -451,7 +442,7 @@ impl Sender { Ok(()) } - async fn handle_batches_with_local_error( + fn handle_batches_with_local_error( &self, batches: Vec, message: String, @@ -467,7 +458,7 @@ impl Sender { Ok(()) } - async fn handle_write_batch_error( + fn handle_write_batch_error( &self, ready_write_batch: ReadyWriteBatch, error: FlussError, @@ -480,7 +471,7 @@ impl Sender { physical_table_path.as_ref(), ready_write_batch.table_bucket.bucket_id() ); - self.re_enqueue_batch(ready_write_batch).await; + self.re_enqueue_batch(ready_write_batch); return Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)); } @@ -504,9 +495,9 @@ impl Sender { Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)) } - async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) { + fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) { self.remove_from_inflight_batches(&ready_write_batch); - self.accumulator.re_enqueue(ready_write_batch).await; + self.accumulator.re_enqueue(ready_write_batch); } fn remove_from_inflight_batches(&self, ready_write_batch: &ReadyWriteBatch) { @@ -656,7 +647,7 @@ mod tests { use crate::test_utils::{build_cluster_arc, build_table_info}; use std::collections::{HashMap, HashSet}; - async fn build_ready_batch( + fn build_ready_batch( accumulator: &RecordAccumulator, cluster: Arc, table_path: Arc, @@ -667,11 +658,11 @@ mod tests { values: vec![Datum::Int32(1)], }; let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); - let result = accumulator.append(&record, 0, &cluster, false).await?; + let result = accumulator.append(&record, 0, &cluster, false)?; let result_handle = result.result_handle.expect("result handle"); let server = cluster.get_tablet_server(1).expect("server"); let nodes = HashSet::from([server.clone()]); - let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024).await?; + let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?; let mut drained = batches.remove(&1).expect("drained batches"); let batch = drained.pop().expect("batch"); Ok((batch, result_handle)) @@ -686,19 +677,21 @@ mod tests { let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 1000, 1, 1); let (batch, _handle) = - build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path.clone()).await?; + build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path.clone())?; let mut inflight = HashMap::new(); inflight.insert(1, vec![batch]); sender.add_to_inflight_batches(&inflight); let batch = inflight.remove(&1).unwrap().pop().unwrap(); - sender - .handle_write_batch_error(batch, FlussError::RequestTimeOut, "timeout".to_string()) - .await?; + sender.handle_write_batch_error( + batch, + FlussError::RequestTimeOut, + "timeout".to_string(), + )?; let server = cluster.get_tablet_server(1).expect("server"); let nodes = HashSet::from([server.clone()]); - let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024).await?; + let mut batches = accumulator.drain(cluster, &nodes, 1024 * 1024)?; let mut drained = batches.remove(&1).expect("drained batches"); let batch = drained.pop().expect("batch"); assert_eq!(batch.write_batch.attempts(), 1); @@ -713,15 +706,12 @@ mod tests { let accumulator = Arc::new(RecordAccumulator::new(Config::default())); let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 1000, 1, 0); - let (batch, handle) = - build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path).await?; - sender - .handle_write_batch_error( - batch, - FlussError::InvalidTableException, - "invalid".to_string(), - ) - .await?; + let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path)?; + sender.handle_write_batch_error( + batch, + FlussError::InvalidTableException, + "invalid".to_string(), + )?; let batch_result = handle.wait().await?; assert!(matches!( @@ -740,7 +730,7 @@ mod tests { let accumulator = Arc::new(RecordAccumulator::new(Config::default())); let sender = Sender::new(metadata, accumulator.clone(), 1024 * 1024, 1000, 1, 0); - let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster, table_path).await?; + let (batch, handle) = build_ready_batch(accumulator.as_ref(), cluster, table_path)?; let request_buckets = vec![batch.table_bucket.clone()]; let mut records_by_bucket = HashMap::new(); records_by_bucket.insert(batch.table_bucket.clone(), batch); diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index c386adf7..330affaa 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -91,7 +91,7 @@ impl WriterClient { } } - pub async fn send(&self, record: &WriteRecord<'_>) -> Result { + pub fn send(&self, record: &WriteRecord<'_>) -> Result { let physical_table_path = &record.physical_table_path; let cluster = self.metadata.get_cluster(); let bucket_key = record.bucket_key.as_ref(); @@ -99,19 +99,13 @@ impl WriterClient { let (bucket_assigner, bucket_id) = self.assign_bucket(&record.table_info, bucket_key, physical_table_path)?; - let mut result = self - .accumulate - .append(record, bucket_id, &cluster, true) - .await?; + let mut result = self.accumulate.append(record, bucket_id, &cluster, true)?; if result.abort_record_for_new_batch { let prev_bucket_id = bucket_id; bucket_assigner.on_new_batch(&cluster, prev_bucket_id); let bucket_id = bucket_assigner.assign_bucket(bucket_key, &cluster)?; - result = self - .accumulate - .append(record, bucket_id, &cluster, false) - .await?; + result = self.accumulate.append(record, bucket_id, &cluster, false)?; } if result.batch_is_full || result.new_batch_created { diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 0bfe4a37..ab5f5b6f 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -101,10 +101,7 @@ mod kv_table_test { row.set_field(0, *id); row.set_field(1, *name); row.set_field(2, *age); - upsert_writer - .upsert(&row) - .await - .expect("Failed to upsert row"); + upsert_writer.upsert(&row).expect("Failed to upsert row"); } upsert_writer.flush().await.expect("Failed to flush"); @@ -138,7 +135,6 @@ mod kv_table_test { updated_row.set_field(2, 33i64); upsert_writer .upsert(&updated_row) - .await .expect("Failed to upsert updated row") .await .expect("Failed to wait for upsert acknowledgment"); @@ -168,7 +164,6 @@ mod kv_table_test { delete_row.set_field(0, 1); upsert_writer .delete(&delete_row) - .await .expect("Failed to delete") .await .expect("Failed to wait for delete acknowledgment"); @@ -268,7 +263,7 @@ mod kv_table_test { row.set_field(0, *region); row.set_field(1, *user_id); row.set_field(2, *score); - upsert_writer.upsert(&row).await.expect("Failed to upsert"); + upsert_writer.upsert(&row).expect("Failed to upsert"); } upsert_writer.flush().await.expect("Failed to flush"); @@ -308,7 +303,6 @@ mod kv_table_test { update_row.set_field(2, 500i64); upsert_writer .upsert(&update_row) - .await .expect("Failed to update") .await .expect("Failed to wait for update acknowledgment"); @@ -379,7 +373,6 @@ mod kv_table_test { row.set_field(3, 6942i64); upsert_writer .upsert(&row) - .await .expect("Failed to upsert initial row") .await .expect("Failed to wait for upsert acknowledgment"); @@ -421,7 +414,6 @@ mod kv_table_test { partial_row.set_field(3, 420i64); partial_writer .upsert(&partial_row) - .await .expect("Failed to upsert") .await .expect("Failed to wait for upsert acknowledgment"); @@ -509,7 +501,7 @@ mod kv_table_test { row.set_field(1, *user_id); row.set_field(2, *name); row.set_field(3, *score); - upsert_writer.upsert(&row).await.expect("Failed to upsert"); + upsert_writer.upsert(&row).expect("Failed to upsert"); } upsert_writer.flush().await.expect("Failed to flush"); @@ -546,7 +538,6 @@ mod kv_table_test { updated_row.set_field(3, 999i64); upsert_writer .upsert(&updated_row) - .await .expect("Failed to upsert updated row") .await .expect("Failed to wait for upsert acknowledgment"); @@ -585,7 +576,6 @@ mod kv_table_test { delete_key.set_field(1, 1); upsert_writer .delete(&delete_key) - .await .expect("Failed to delete") .await .expect("Failed to wait for delete acknowledgment"); @@ -721,7 +711,6 @@ mod kv_table_test { upsert_writer .upsert(&row) - .await .expect("Failed to upsert row with all datatypes") .await .expect("Failed to wait for upsert acknowledgment"); @@ -826,7 +815,6 @@ mod kv_table_test { upsert_writer .upsert(&row_with_nulls) - .await .expect("Failed to upsert row with nulls") .await .expect("Failed to wait for upsert acknowledgment"); diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 8d7773de..e7433726 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -99,14 +99,12 @@ mod table_test { record_batch!(("c1", Int32, [1, 2, 3]), ("c2", Utf8, ["a1", "a2", "a3"])).unwrap(); append_writer .append_arrow_batch(batch1) - .await .expect("Failed to append batch"); let batch2 = record_batch!(("c1", Int32, [4, 5, 6]), ("c2", Utf8, ["a4", "a5", "a6"])).unwrap(); append_writer .append_arrow_batch(batch2) - .await .expect("Failed to append batch"); // Flush to ensure all writes are acknowledged @@ -230,7 +228,6 @@ mod table_test { .unwrap(); append_writer .append_arrow_batch(batch) - .await .expect("Failed to append batch"); // Flush to ensure all writes are acknowledged @@ -332,7 +329,6 @@ mod table_test { .unwrap(); append_writer .append_arrow_batch(batch) - .await .expect("Failed to append batch"); append_writer.flush().await.expect("Failed to flush"); @@ -485,19 +481,16 @@ mod table_test { .append_arrow_batch( record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", "b"])).unwrap(), ) - .await .unwrap(); writer .append_arrow_batch( record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c", "d"])).unwrap(), ) - .await .unwrap(); writer .append_arrow_batch( record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e", "f"])).unwrap(), ) - .await .unwrap(); writer.flush().await.unwrap(); @@ -525,7 +518,6 @@ mod table_test { .append_arrow_batch( record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g", "h"])).unwrap(), ) - .await .unwrap(); writer.flush().await.unwrap(); @@ -759,7 +751,6 @@ mod table_test { append_writer .append(&row) - .await .expect("Failed to append row with all datatypes"); // Append a row with null values for all columns @@ -770,7 +761,6 @@ mod table_test { append_writer .append(&row_with_nulls) - .await .expect("Failed to append row with nulls"); append_writer.flush().await.expect("Failed to flush"); @@ -1033,10 +1023,7 @@ mod table_test { row.set_field(0, *id); row.set_field(1, *region); row.set_field(2, *value); - append_writer - .append(&row) - .await - .expect("Failed to append row"); + append_writer.append(&row).expect("Failed to append row"); } append_writer.flush().await.expect("Failed to flush"); @@ -1051,7 +1038,6 @@ mod table_test { .unwrap(); append_writer .append_arrow_batch(us_batch) - .await .expect("Failed to append US batch"); let eu_batch = record_batch!( @@ -1062,7 +1048,6 @@ mod table_test { .unwrap(); append_writer .append_arrow_batch(eu_batch) - .await .expect("Failed to append EU batch"); append_writer diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 0efe3882..baac7729 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -148,12 +148,11 @@ mod table_remote_scan_test { row.set_field(0, i as i32); let v = format!("v{}", i); row.set_field(1, v.as_str()); - append_writer - .append(&row) - .await - .expect("Failed to append row"); + append_writer.append(&row).expect("Failed to append row"); } + append_writer.flush().await.expect("Failed to flush"); + // Create a log scanner and subscribe to all buckets to read appended records let num_buckets = table.table_info().get_num_buckets(); let log_scanner = table