Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ pub async fn main() -> Result<()> {
let table = conn.get_table(&table_path).await;
let append_writer = table.new_append().create_writer();
let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, ["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap();
append_writer.append(batch).await?;
append_writer.append(batch)?;
append_writer.flush().await?;
println!("Start to scan log records......");
// 4: scan the records
let log_scanner = table.new_scan().create_log_scanner();
Expand Down
7 changes: 4 additions & 3 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,9 @@ impl AppendWriter {
fn append(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>, String> {
let generic_row = types::ffi_row_to_core(row);

let result_future = RUNTIME
.block_on(async { self.inner.append(&generic_row).await })
let result_future = self
.inner
.append(&generic_row)
.map_err(|e| format!("Failed to append: {e}"))?;

Ok(Box::new(WriteResult {
Expand All @@ -789,7 +790,7 @@ impl WriteResult {
Err(e) => err_result(1, e.to_string()),
}
} else {
ok_result()
err_result(1, "WriteResult already consumed".to_string())
}
}
}
Expand Down
33 changes: 16 additions & 17 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def main():
# Test 3: Append single rows with Date, Time, Timestamp, Decimal
print("\n--- Testing single row append with temporal/decimal types ---")
# Dict input with all types including Date, Time, Timestamp, Decimal
await append_writer.append(
append_writer.append(
{
"id": 8,
"name": "Helen",
Expand All @@ -200,7 +200,7 @@ async def main():
print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)")

# List input with all types
await append_writer.append(
append_writer.append(
[
9,
"Ivan",
Expand Down Expand Up @@ -242,7 +242,7 @@ async def main():

# Flush all pending data
print("\n--- Flushing data ---")
append_writer.flush()
await append_writer.flush()
print("Successfully flushed data")

# Demo: Check offsets after writes
Expand Down Expand Up @@ -422,9 +422,9 @@ async def main():
upsert_writer = pk_table.new_upsert()
print(f"Created upsert writer: {upsert_writer}")

# Fire-and-forget: queue writes without waiting for individual acks.
# Fire-and-forget: queue writes synchronously, flush at end.
# Records are batched internally for efficiency.
await upsert_writer.upsert(
upsert_writer.upsert(
{
"user_id": 1,
"name": "Alice",
Expand All @@ -441,7 +441,7 @@ async def main():
)
print("Queued user_id=1 (Alice)")

await upsert_writer.upsert(
upsert_writer.upsert(
{
"user_id": 2,
"name": "Bob",
Expand All @@ -456,7 +456,7 @@ async def main():
)
print("Queued user_id=2 (Bob)")

await upsert_writer.upsert(
upsert_writer.upsert(
{
"user_id": 3,
"name": "Charlie",
Expand All @@ -479,7 +479,7 @@ async def main():
# the server confirms this specific write, useful when you need to
# read-after-write or verify critical updates.
print("\n--- Testing Upsert (per-record acknowledgment) ---")
ack = await upsert_writer.upsert(
handle = upsert_writer.upsert(
{
"user_id": 1,
"name": "Alice Updated",
Expand All @@ -494,7 +494,7 @@ async def main():
"balance": Decimal("2345.67"),
}
)
await ack # wait for server acknowledgment before proceeding
await handle.wait() # wait for server acknowledgment
print("Updated user_id=1 (Alice -> Alice Updated) — server acknowledged")

except Exception as e:
Expand Down Expand Up @@ -554,9 +554,8 @@ async def main():
try:
upsert_writer = pk_table.new_upsert()

# Per-record ack for delete — await the handle to confirm deletion
ack = await upsert_writer.delete({"user_id": 3})
await ack
handle = upsert_writer.delete({"user_id": 3})
await handle.wait()
print("Deleted user_id=3 — server acknowledged")

lookuper = pk_table.new_lookup()
Expand Down Expand Up @@ -670,12 +669,12 @@ async def main():
partitioned_writer = await partitioned_table.new_append_writer()

# Append data to US partition
await partitioned_writer.append({"id": 1, "region": "US", "value": 100})
await partitioned_writer.append({"id": 2, "region": "US", "value": 200})
partitioned_writer.append({"id": 1, "region": "US", "value": 100})
partitioned_writer.append({"id": 2, "region": "US", "value": 200})
# Append data to EU partition
await partitioned_writer.append({"id": 3, "region": "EU", "value": 300})
await partitioned_writer.append({"id": 4, "region": "EU", "value": 400})
partitioned_writer.flush()
partitioned_writer.append({"id": 3, "region": "EU", "value": 300})
partitioned_writer.append({"id": 4, "region": "EU", "value": 400})
await partitioned_writer.flush()
print("\nWrote 4 records (2 to US, 2 to EU)")

# Demo: list_partition_offsets
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod metadata;
mod table;
mod upsert;
mod utils;
mod write_handle;

pub use admin::*;
pub use config::*;
Expand All @@ -40,6 +41,7 @@ pub use metadata::*;
pub use table::*;
pub use upsert::*;
pub use utils::*;
pub use write_handle::*;

static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -88,6 +90,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RecordBatch>()?;
m.add_class::<PartitionInfo>()?;
m.add_class::<OffsetType>()?;
m.add_class::<WriteResultHandle>()?;

// Register constants
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
Expand Down
88 changes: 26 additions & 62 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,76 +533,43 @@ impl AppendWriter {
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;

for batch in batch_list {
// Drop the ack coroutine — fire-and-forget
let _ = self.write_arrow_batch(py, batch)?;
// Drop the handle — fire-and-forget for bulk writes
drop(self.write_arrow_batch(py, batch)?);
}
Ok(())
}

/// Write Arrow batch data
/// Write Arrow batch data.
///
/// Returns:
/// A coroutine that can be awaited for server acknowledgment,
/// or ignored for fire-and-forget behavior.
pub fn write_arrow_batch<'py>(
&self,
py: Python<'py>,
batch: Py<PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
/// WriteResultHandle that can be ignored (fire-and-forget) or
/// awaited via `handle.wait()` for server acknowledgment.
pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> PyResult<WriteResultHandle> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
let rust_batch: ArrowRecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?;

let inner = self.inner.clone();

future_into_py(py, async move {
let result_future = inner
.append_arrow_batch(rust_batch)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;

Python::attach(|py| {
future_into_py(py, async move {
result_future
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
Ok(())
})
.map(|bound| bound.unbind())
})
})
let result_future = self
.inner
.append_arrow_batch(rust_batch)
.map_err(|e| FlussError::new_err(e.to_string()))?;
Ok(WriteResultHandle::new(result_future))
}

/// Append a single row to the table
/// Append a single row to the table.
///
/// Returns:
/// A coroutine that can be awaited for server acknowledgment,
/// or ignored for fire-and-forget behavior.
pub fn append<'py>(
&self,
py: Python<'py>,
row: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
/// WriteResultHandle that can be ignored (fire-and-forget) or
/// awaited via `handle.wait()` for server acknowledgment.
pub fn append(&self, row: &Bound<'_, PyAny>) -> PyResult<WriteResultHandle> {
let generic_row = python_to_generic_row(row, &self.table_info)?;
let inner = self.inner.clone();

future_into_py(py, async move {
let result_future = inner
.append(&generic_row)
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;

Python::attach(|py| {
future_into_py(py, async move {
result_future
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
Ok(())
})
.map(|bound| bound.unbind())
})
})
let result_future = self
.inner
.append(&generic_row)
.map_err(|e| FlussError::new_err(e.to_string()))?;
Ok(WriteResultHandle::new(result_future))
}

/// Write Pandas DataFrame data
Expand Down Expand Up @@ -636,16 +603,13 @@ impl AppendWriter {
}

/// Flush any pending data
pub fn flush(&self, py: Python) -> PyResult<()> {
pub fn flush<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
// Release the GIL before blocking on I/O
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
inner
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
future_into_py(py, async move {
inner
.flush()
.await
.map_err(|e| FlussError::new_err(e.to_string()))
})
}

Expand Down
Loading