From 6f8aefb7f1cc1e3146877d9a94ce5963c17c2333 Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Tue, 16 Dec 2025 13:02:17 -0300 Subject: [PATCH 1/7] chore: max_total_wal_size option --- docs/api/options.rst | 15 +++++++++++++++ rocksdb/_rocksdb.pyx | 6 ++++++ rocksdb/options.pxd | 1 + 3 files changed, 22 insertions(+) diff --git a/docs/api/options.rst b/docs/api/options.rst index 1c77b18..c018fd6 100644 --- a/docs/api/options.rst +++ b/docs/api/options.rst @@ -713,6 +713,21 @@ Options objects | *Type:* ``int`` | *Default:* ``0`` + .. py:attribute:: max_total_wal_size + + Once write-ahead logs exceed this size, we will start forcing the flush of + column families whose memtables are backed by the oldest live WAL file + (i.e. the ones that are causing all the space amplification). If set to 0 + (default), we will dynamically choose the WAL size limit to be + [sum of all write_buffer_size * max_write_buffer_number] * 4. + This option takes effect only when there are more than one column family as + otherwise the wal size is dictated by the write_buffer_size. + + Dynamically changeable through SetDBOptions() API. + + | *Type:* ``int`` + | *Default:* ``0`` + .. py:attribute:: manifest_preallocation_size Number of bytes to preallocate (via fallocate) the manifest diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 6e2658a..e95dd8a 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -1229,6 +1229,12 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.WAL_size_limit_MB = value + property max_total_wal_size: + def __get__(self): + return self.opts.max_total_wal_size + def __set__(self, value): + self.opts.max_total_wal_size = value + property manifest_preallocation_size: def __get__(self): return self.opts.manifest_preallocation_size diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index 167ed5f..fc82de3 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -88,6 +88,7 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": int table_cache_numshardbits uint64_t WAL_ttl_seconds uint64_t WAL_size_limit_MB + uint64_t max_total_wal_size size_t manifest_preallocation_size cpp_bool allow_mmap_reads cpp_bool allow_mmap_writes From 2787f07de77e06b5aed7cffddda45c78899f6ba5 Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Tue, 16 Dec 2025 13:14:51 -0300 Subject: [PATCH 2/7] chore: explose flush method in the DB class --- rocksdb/_rocksdb.pyx | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index e95dd8a..9e74a2c 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -2008,6 +2008,28 @@ cdef class DB(object): st = self.db.CompactRange(c_options, cf_handle, begin_ptr, end_ptr) check_status(st) + def flush(self, wait=True, ColumnFamilyHandle column_family=None): + """ + Flush all memtable data. + + Args: + wait (bool): If True (default), the flush will wait until the + flush is done. + column_family: Column family to flush. If None, the default + column family is used. + """ + cdef Status st + cdef options.FlushOptions flush_opts + flush_opts.wait = wait + + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() + + with nogil: + st = self.db.Flush(flush_opts, cf_handle) + check_status(st) + @staticmethod def __parse_read_opts( verify_checksums=False, From 3b86fef70eebeab1fd2dd3ff5d728bbabae6da22 Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Tue, 16 Dec 2025 13:30:39 -0300 Subject: [PATCH 3/7] chore: flush all column families instead when not specifying --- rocksdb/_rocksdb.pyx | 39 +++++++++++++++++++++++++++++++-------- rocksdb/db.pxd | 1 + 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 9e74a2c..752e546 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -2008,26 +2008,49 @@ cdef class DB(object): st = self.db.CompactRange(c_options, cf_handle, begin_ptr, end_ptr) check_status(st) - def flush(self, wait=True, ColumnFamilyHandle column_family=None): + def flush(self, wait=True, column_families=None): """ - Flush all memtable data. + Flush memtable data for column families. + + If atomic flush is not enabled, flushing multiple column families is + equivalent to calling flush for each column family individually. + If atomic flush is enabled, all specified column families will be + flushed atomically up to the latest sequence number at the time + when flush is requested. Args: wait (bool): If True (default), the flush will wait until the flush is done. - column_family: Column family to flush. If None, the default - column family is used. + column_families: Specifies which column families to flush: + - None (default): flushes ALL column families in the database + - A single ColumnFamilyHandle: flushes only that column family + - A list/tuple of ColumnFamilyHandle objects: flushes those column families """ cdef Status st cdef options.FlushOptions flush_opts + cdef vector[db.ColumnFamilyHandle*] cf_handles + cdef _ColumnFamilyHandle handle flush_opts.wait = wait - cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() - if column_family: - cf_handle = (column_family).get_handle() + # Handle different input types for column_families + if column_families is None: + # Flush ALL column families + for handle in self.cf_handles: + cf_handles.push_back(handle.handle) + elif isinstance(column_families, ColumnFamilyHandle): + # Single column family + cf_handles.push_back((column_families).get_handle()) + elif isinstance(column_families, (list, tuple)): + # Multiple column families + for cf in column_families: + if not isinstance(cf, ColumnFamilyHandle): + raise TypeError("All items in column_families must be ColumnFamilyHandle objects") + cf_handles.push_back((cf).get_handle()) + else: + raise TypeError("column_families must be None, a ColumnFamilyHandle, or a list of ColumnFamilyHandle objects") with nogil: - st = self.db.Flush(flush_opts, cf_handle) + st = self.db.Flush(flush_opts, cf_handles) check_status(st) @staticmethod diff --git a/rocksdb/db.pxd b/rocksdb/db.pxd index 262f917..0dc7d46 100644 --- a/rocksdb/db.pxd +++ b/rocksdb/db.pxd @@ -167,6 +167,7 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": const string& GetName() except+ nogil const options.Options& GetOptions(ColumnFamilyHandle*) except+ nogil Status Flush(const options.FlushOptions&, ColumnFamilyHandle*) except+ nogil + Status Flush(const options.FlushOptions&, const vector[ColumnFamilyHandle*]&) except+ nogil Status DisableFileDeletions() except+ nogil Status EnableFileDeletions() except+ nogil Status Close() except+ nogil From 41e57fa0f845b60049ec0670ac60c471b262c78f Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Tue, 16 Dec 2025 20:10:20 -0300 Subject: [PATCH 4/7] ci: add tests for the new bindings --- rocksdb/tests/test_db.py | 139 ++++++++++++++++++++++++++++++++++ rocksdb/tests/test_options.py | 11 +++ 2 files changed, 150 insertions(+) diff --git a/rocksdb/tests/test_db.py b/rocksdb/tests/test_db.py index 19663ad..f9ed05e 100644 --- a/rocksdb/tests/test_db.py +++ b/rocksdb/tests/test_db.py @@ -734,3 +734,142 @@ def test_compact_range(self): self.db.compact_range(column_family=self.cf_b) + def test_flush(self): + # Check initial state + initial_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + + # Write some data + self.db.put(b"a", b"1") + self.db.put(b"b", b"2") + + # Verify data is in memtable (memtable size > 0) + memtable_size_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) + self.assertGreater(memtable_size_before, 0) + + # Flush with default parameters (all column families, wait=True) + self.db.flush() + + # Verify memtable size decreased after flush + memtable_size_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) + self.assertLess(memtable_size_after, memtable_size_before) + + # Verify flush created SST files at level 0 + final_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + self.assertGreater(final_l0_files, initial_l0_files) + + def test_flush_no_wait(self): + # Write some data + self.db.put(b"a", b"1") + + # Flush without waiting - just verify it doesn't raise an exception + self.db.flush(wait=False) + + def test_flush_all_column_families(self): + # Check initial state for each column family + initial_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put(b"default_key", b"default_value") + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Check memtable sizes before flush + memtable_default_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) + memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertGreater(memtable_default_before, 0) + self.assertGreater(memtable_cf_a_before, 0) + self.assertGreater(memtable_cf_b_before, 0) + + # Flush all column families (default behavior) + self.db.flush() + + # Verify memtable sizes decreased after flush + memtable_default_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) + memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertLess(memtable_default_after, memtable_default_before) + self.assertLess(memtable_cf_a_after, memtable_cf_a_before) + self.assertLess(memtable_cf_b_after, memtable_cf_b_before) + + # Verify all column families were flushed (SST files created) + final_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_default_l0, initial_default_l0) + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) + + def test_flush_single_column_family(self): + # Check initial state + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Check memtable sizes before flush + memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertGreater(memtable_cf_a_before, 0) + self.assertGreater(memtable_cf_b_before, 0) + + # Flush only cf_a + self.db.flush(column_families=self.cf_a) + + # Verify memtable size decreased for cf_a but not for cf_b + memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertLess(memtable_cf_a_after, memtable_cf_a_before) + self.assertEqual(memtable_cf_b_after, memtable_cf_b_before) # cf_b memtable unchanged + + # Verify only cf_a was flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertEqual(final_cf_b_l0, initial_cf_b_l0) # cf_b should NOT be flushed + + def test_flush_multiple_column_families(self): + # Check initial state + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Check memtable sizes before flush + memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertGreater(memtable_cf_a_before, 0) + self.assertGreater(memtable_cf_b_before, 0) + + # Flush both cf_a and cf_b + self.db.flush(column_families=[self.cf_a, self.cf_b]) + + # Verify memtable sizes decreased for both + memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) + memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) + self.assertLess(memtable_cf_a_after, memtable_cf_a_before) + self.assertLess(memtable_cf_b_after, memtable_cf_b_before) + + # Verify both were flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) + + def test_flush_invalid_column_families(self): + # Test that passing invalid type raises TypeError + with self.assertRaises(TypeError): + self.db.flush(column_families="invalid") + + with self.assertRaises(TypeError): + self.db.flush(column_families=["invalid", self.cf_a]) + diff --git a/rocksdb/tests/test_options.py b/rocksdb/tests/test_options.py index 8704913..fae3016 100644 --- a/rocksdb/tests/test_options.py +++ b/rocksdb/tests/test_options.py @@ -171,3 +171,14 @@ def test_row_cache(self): self.assertIsNone(opts.row_cache) opts.row_cache = cache = rocksdb.LRUCache(2*1024*1024) self.assertEqual(cache, opts.row_cache) + + def test_max_total_wal_size(self): + opts = rocksdb.Options() + # Default value is 0 + self.assertEqual(opts.max_total_wal_size, 0) + # Set to a specific value and verify setter works + opts.max_total_wal_size = 100 * 1024 * 1024 # 100 MB + self.assertEqual(opts.max_total_wal_size, 100 * 1024 * 1024) + # Set back to 0 (dynamic) + opts.max_total_wal_size = 0 + self.assertEqual(opts.max_total_wal_size, 0) From c444bde97f5b417309a924a094f4c4b6dc5c58bd Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Tue, 16 Dec 2025 20:13:43 -0300 Subject: [PATCH 5/7] chore: add a Makefile --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b7fb6a7 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +.PHONY: test + +test: + pip install -e .[test] + pytest rocksdb/tests/ From 0556ac645247c43566c4f2240cb996d7cd191b23 Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Wed, 17 Dec 2025 13:53:20 -0300 Subject: [PATCH 6/7] fix: remove rocksdb.cur-size-all-mem-tables from tests, because it apparently behaves differently in different environments --- rocksdb/tests/test_db.py | 53 +++++----------------------------------- 1 file changed, 6 insertions(+), 47 deletions(-) diff --git a/rocksdb/tests/test_db.py b/rocksdb/tests/test_db.py index f9ed05e..e3b3215 100644 --- a/rocksdb/tests/test_db.py +++ b/rocksdb/tests/test_db.py @@ -742,17 +742,9 @@ def test_flush(self): self.db.put(b"a", b"1") self.db.put(b"b", b"2") - # Verify data is in memtable (memtable size > 0) - memtable_size_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) - self.assertGreater(memtable_size_before, 0) - # Flush with default parameters (all column families, wait=True) self.db.flush() - # Verify memtable size decreased after flush - memtable_size_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) - self.assertLess(memtable_size_after, memtable_size_before) - # Verify flush created SST files at level 0 final_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0')) self.assertGreater(final_l0_files, initial_l0_files) @@ -775,25 +767,9 @@ def test_flush_all_column_families(self): self.db.put((self.cf_a, b"a_key"), b"a_value") self.db.put((self.cf_b, b"b_key"), b"b_value") - # Check memtable sizes before flush - memtable_default_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) - memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertGreater(memtable_default_before, 0) - self.assertGreater(memtable_cf_a_before, 0) - self.assertGreater(memtable_cf_b_before, 0) - # Flush all column families (default behavior) self.db.flush() - # Verify memtable sizes decreased after flush - memtable_default_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables')) - memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertLess(memtable_default_after, memtable_default_before) - self.assertLess(memtable_cf_a_after, memtable_cf_a_before) - self.assertLess(memtable_cf_b_after, memtable_cf_b_before) - # Verify all column families were flushed (SST files created) final_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0')) final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) @@ -812,21 +788,9 @@ def test_flush_single_column_family(self): self.db.put((self.cf_a, b"a_key"), b"a_value") self.db.put((self.cf_b, b"b_key"), b"b_value") - # Check memtable sizes before flush - memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertGreater(memtable_cf_a_before, 0) - self.assertGreater(memtable_cf_b_before, 0) - # Flush only cf_a self.db.flush(column_families=self.cf_a) - # Verify memtable size decreased for cf_a but not for cf_b - memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertLess(memtable_cf_a_after, memtable_cf_a_before) - self.assertEqual(memtable_cf_b_after, memtable_cf_b_before) # cf_b memtable unchanged - # Verify only cf_a was flushed (SST files created) final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) @@ -843,20 +807,15 @@ def test_flush_multiple_column_families(self): self.db.put((self.cf_a, b"a_key"), b"a_value") self.db.put((self.cf_b, b"b_key"), b"b_value") - # Check memtable sizes before flush - memtable_cf_a_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_before = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertGreater(memtable_cf_a_before, 0) - self.assertGreater(memtable_cf_b_before, 0) - # Flush both cf_a and cf_b self.db.flush(column_families=[self.cf_a, self.cf_b]) - # Verify memtable sizes decreased for both - memtable_cf_a_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_a)) - memtable_cf_b_after = int(self.db.get_property(b'rocksdb.cur-size-all-mem-tables', self.cf_b)) - self.assertLess(memtable_cf_a_after, memtable_cf_a_before) - self.assertLess(memtable_cf_b_after, memtable_cf_b_before) + # Verify both were flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) # Verify both were flushed (SST files created) final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) From 46a64519ccf8430fe07ccb615575e20ba51cbee2 Mon Sep 17 00:00:00 2001 From: Luis Helder Date: Thu, 5 Feb 2026 18:21:51 -0300 Subject: [PATCH 7/7] fix: uv now requires a --clear option when running 'uv venv' to clear existing environments. See https://github.com/astral-sh/uv/pull/17757 --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 657e286..a1f3b77 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -97,7 +97,7 @@ jobs: CPPFLAGS: -I${{ env.LIBROCKSDB_PATH }}/include LDFLAGS: -L${{ env.LIBROCKSDB_PATH }}/lib run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests @@ -131,7 +131,7 @@ jobs: - name: Build python-rocksdb run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests @@ -164,7 +164,7 @@ jobs: - name: Build python-rocksdb run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests