From 9807a57b3306718ed12d0f6eb684d730ef9c0ef0 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 7 Feb 2026 22:44:05 +0000 Subject: [PATCH] [TASK-276] Add unsubscribe_partition to python bindings --- bindings/python/example/example.py | 13 +++++++++++++ bindings/python/fluss/__init__.pyi | 8 ++++++++ bindings/python/src/table.rs | 17 +++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index d56879a4..dd7f1b14 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -717,6 +717,19 @@ async def main(): print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") print(partitioned_arrow.to_pandas()) + # Demo: unsubscribe_partition - unsubscribe from one partition, read remaining + print("\n--- Testing unsubscribe_partition ---") + partitioned_scanner3 = await partitioned_table.new_scan().create_batch_scanner() + for p in partition_infos: + partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) + # Unsubscribe from the first partition + first_partition = partition_infos[0] + partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0) + print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})") + remaining_arrow = partitioned_scanner3.to_arrow() + print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):") + print(remaining_arrow.to_pandas()) + # Demo: to_pandas() also works for partitioned tables print("\n--- Testing to_pandas() on partitioned table ---") partitioned_scanner2 = await partitioned_table.new_scan().create_batch_scanner() diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index a2bbaac4..526dad78 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -420,6 +420,14 @@ class LogScanner: start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning) """ ... + def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None: + """Unsubscribe from a specific partition bucket (partitioned tables only). + + Args: + partition_id: The partition ID to unsubscribe from + bucket_id: The bucket ID within the partition + """ + ... def poll(self, timeout_ms: int) -> List[ScanRecord]: """Poll for individual records with metadata. diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8af6b13e..1a7dbdce 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1657,6 +1657,23 @@ impl LogScanner { }) } + /// Unsubscribe from a specific partition bucket (partitioned tables only). + /// + /// Args: + /// partition_id: The partition ID to unsubscribe from + /// bucket_id: The bucket ID within the partition + fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id: i32) -> PyResult<()> { + py.detach(|| { + TOKIO_RUNTIME.block_on(async { + with_scanner!( + &self.scanner, + unsubscribe_partition(partition_id, bucket_id) + ) + .map_err(|e| FlussError::new_err(e.to_string())) + }) + }) + } + /// Poll for individual records with metadata. /// /// Args: