Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,19 @@ async def main():
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:")
print(partitioned_arrow.to_pandas())

# Demo: unsubscribe_partition - unsubscribe from one partition, read remaining
print("\n--- Testing unsubscribe_partition ---")
partitioned_scanner3 = await partitioned_table.new_scan().create_batch_scanner()
for p in partition_infos:
partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
# Unsubscribe from the first partition
first_partition = partition_infos[0]
partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0)
print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})")
remaining_arrow = partitioned_scanner3.to_arrow()
print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):")
print(remaining_arrow.to_pandas())

# Demo: to_pandas() also works for partitioned tables
print("\n--- Testing to_pandas() on partitioned table ---")
partitioned_scanner2 = await partitioned_table.new_scan().create_batch_scanner()
Expand Down
8 changes: 8 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ class LogScanner:
start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning)
"""
...
def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None:
"""Unsubscribe from a specific partition bucket (partitioned tables only).

Args:
partition_id: The partition ID to unsubscribe from
bucket_id: The bucket ID within the partition
"""
...
def poll(self, timeout_ms: int) -> List[ScanRecord]:
"""Poll for individual records with metadata.

Expand Down
17 changes: 17 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,23 @@ impl LogScanner {
})
}

/// Unsubscribe from a specific partition bucket (partitioned tables only).
///
/// Args:
/// partition_id: The partition ID to unsubscribe from
/// bucket_id: The bucket ID within the partition
fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id: i32) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
with_scanner!(
&self.scanner,
unsubscribe_partition(partition_id, bucket_id)
)
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}

/// Poll for individual records with metadata.
///
/// Args:
Expand Down
Loading