diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 657e2861..a1f3b773 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -97,7 +97,7 @@ jobs: CPPFLAGS: -I${{ env.LIBROCKSDB_PATH }}/include LDFLAGS: -L${{ env.LIBROCKSDB_PATH }}/lib run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests @@ -131,7 +131,7 @@ jobs: - name: Build python-rocksdb run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests @@ -164,7 +164,7 @@ jobs: - name: Build python-rocksdb run: | - uv venv + uv venv --clear uv pip install --editable .[test] - name: Run tests diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..b7fb6a7c --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +.PHONY: test + +test: + pip install -e .[test] + pytest rocksdb/tests/ diff --git a/docs/api/options.rst b/docs/api/options.rst index 1c77b180..c018fd61 100644 --- a/docs/api/options.rst +++ b/docs/api/options.rst @@ -713,6 +713,21 @@ Options objects | *Type:* ``int`` | *Default:* ``0`` + .. py:attribute:: max_total_wal_size + + Once write-ahead logs exceed this size, we will start forcing the flush of + column families whose memtables are backed by the oldest live WAL file + (i.e. the ones that are causing all the space amplification). If set to 0 + (default), we will dynamically choose the WAL size limit to be + [sum of all write_buffer_size * max_write_buffer_number] * 4. + This option takes effect only when there are more than one column family as + otherwise the wal size is dictated by the write_buffer_size. + + Dynamically changeable through SetDBOptions() API. + + | *Type:* ``int`` + | *Default:* ``0`` + .. py:attribute:: manifest_preallocation_size Number of bytes to preallocate (via fallocate) the manifest diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 6e2658a1..752e5468 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -1229,6 +1229,12 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.WAL_size_limit_MB = value + property max_total_wal_size: + def __get__(self): + return self.opts.max_total_wal_size + def __set__(self, value): + self.opts.max_total_wal_size = value + property manifest_preallocation_size: def __get__(self): return self.opts.manifest_preallocation_size @@ -2002,6 +2008,51 @@ cdef class DB(object): st = self.db.CompactRange(c_options, cf_handle, begin_ptr, end_ptr) check_status(st) + def flush(self, wait=True, column_families=None): + """ + Flush memtable data for column families. + + If atomic flush is not enabled, flushing multiple column families is + equivalent to calling flush for each column family individually. + If atomic flush is enabled, all specified column families will be + flushed atomically up to the latest sequence number at the time + when flush is requested. + + Args: + wait (bool): If True (default), the flush will wait until the + flush is done. + column_families: Specifies which column families to flush: + - None (default): flushes ALL column families in the database + - A single ColumnFamilyHandle: flushes only that column family + - A list/tuple of ColumnFamilyHandle objects: flushes those column families + """ + cdef Status st + cdef options.FlushOptions flush_opts + cdef vector[db.ColumnFamilyHandle*] cf_handles + cdef _ColumnFamilyHandle handle + flush_opts.wait = wait + + # Handle different input types for column_families + if column_families is None: + # Flush ALL column families + for handle in self.cf_handles: + cf_handles.push_back(handle.handle) + elif isinstance(column_families, ColumnFamilyHandle): + # Single column family + cf_handles.push_back((column_families).get_handle()) + elif isinstance(column_families, (list, tuple)): + # Multiple column families + for cf in column_families: + if not isinstance(cf, ColumnFamilyHandle): + raise TypeError("All items in column_families must be ColumnFamilyHandle objects") + cf_handles.push_back((cf).get_handle()) + else: + raise TypeError("column_families must be None, a ColumnFamilyHandle, or a list of ColumnFamilyHandle objects") + + with nogil: + st = self.db.Flush(flush_opts, cf_handles) + check_status(st) + @staticmethod def __parse_read_opts( verify_checksums=False, diff --git a/rocksdb/db.pxd b/rocksdb/db.pxd index 262f9173..0dc7d462 100644 --- a/rocksdb/db.pxd +++ b/rocksdb/db.pxd @@ -167,6 +167,7 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": const string& GetName() except+ nogil const options.Options& GetOptions(ColumnFamilyHandle*) except+ nogil Status Flush(const options.FlushOptions&, ColumnFamilyHandle*) except+ nogil + Status Flush(const options.FlushOptions&, const vector[ColumnFamilyHandle*]&) except+ nogil Status DisableFileDeletions() except+ nogil Status EnableFileDeletions() except+ nogil Status Close() except+ nogil diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index 167ed5fd..fc82de35 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -88,6 +88,7 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": int table_cache_numshardbits uint64_t WAL_ttl_seconds uint64_t WAL_size_limit_MB + uint64_t max_total_wal_size size_t manifest_preallocation_size cpp_bool allow_mmap_reads cpp_bool allow_mmap_writes diff --git a/rocksdb/tests/test_db.py b/rocksdb/tests/test_db.py index 19663ade..e3b32156 100644 --- a/rocksdb/tests/test_db.py +++ b/rocksdb/tests/test_db.py @@ -734,3 +734,101 @@ def test_compact_range(self): self.db.compact_range(column_family=self.cf_b) + def test_flush(self): + # Check initial state + initial_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + + # Write some data + self.db.put(b"a", b"1") + self.db.put(b"b", b"2") + + # Flush with default parameters (all column families, wait=True) + self.db.flush() + + # Verify flush created SST files at level 0 + final_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + self.assertGreater(final_l0_files, initial_l0_files) + + def test_flush_no_wait(self): + # Write some data + self.db.put(b"a", b"1") + + # Flush without waiting - just verify it doesn't raise an exception + self.db.flush(wait=False) + + def test_flush_all_column_families(self): + # Check initial state for each column family + initial_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put(b"default_key", b"default_value") + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Flush all column families (default behavior) + self.db.flush() + + # Verify all column families were flushed (SST files created) + final_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0')) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_default_l0, initial_default_l0) + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) + + def test_flush_single_column_family(self): + # Check initial state + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Flush only cf_a + self.db.flush(column_families=self.cf_a) + + # Verify only cf_a was flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertEqual(final_cf_b_l0, initial_cf_b_l0) # cf_b should NOT be flushed + + def test_flush_multiple_column_families(self): + # Check initial state + initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + # Write to multiple column families + self.db.put((self.cf_a, b"a_key"), b"a_value") + self.db.put((self.cf_b, b"b_key"), b"b_value") + + # Flush both cf_a and cf_b + self.db.flush(column_families=[self.cf_a, self.cf_b]) + + # Verify both were flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) + + # Verify both were flushed (SST files created) + final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a)) + final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b)) + + self.assertGreater(final_cf_a_l0, initial_cf_a_l0) + self.assertGreater(final_cf_b_l0, initial_cf_b_l0) + + def test_flush_invalid_column_families(self): + # Test that passing invalid type raises TypeError + with self.assertRaises(TypeError): + self.db.flush(column_families="invalid") + + with self.assertRaises(TypeError): + self.db.flush(column_families=["invalid", self.cf_a]) + diff --git a/rocksdb/tests/test_options.py b/rocksdb/tests/test_options.py index 87049131..fae30162 100644 --- a/rocksdb/tests/test_options.py +++ b/rocksdb/tests/test_options.py @@ -171,3 +171,14 @@ def test_row_cache(self): self.assertIsNone(opts.row_cache) opts.row_cache = cache = rocksdb.LRUCache(2*1024*1024) self.assertEqual(cache, opts.row_cache) + + def test_max_total_wal_size(self): + opts = rocksdb.Options() + # Default value is 0 + self.assertEqual(opts.max_total_wal_size, 0) + # Set to a specific value and verify setter works + opts.max_total_wal_size = 100 * 1024 * 1024 # 100 MB + self.assertEqual(opts.max_total_wal_size, 100 * 1024 * 1024) + # Set back to 0 (dynamic) + opts.max_total_wal_size = 0 + self.assertEqual(opts.max_total_wal_size, 0)