Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
CPPFLAGS: -I${{ env.LIBROCKSDB_PATH }}/include
LDFLAGS: -L${{ env.LIBROCKSDB_PATH }}/lib
run: |
uv venv
uv venv --clear
uv pip install --editable .[test]

- name: Run tests
Expand Down Expand Up @@ -131,7 +131,7 @@ jobs:

- name: Build python-rocksdb
run: |
uv venv
uv venv --clear
uv pip install --editable .[test]

- name: Run tests
Expand Down Expand Up @@ -164,7 +164,7 @@ jobs:

- name: Build python-rocksdb
run: |
uv venv
uv venv --clear
uv pip install --editable .[test]

- name: Run tests
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.PHONY: test

test:
pip install -e .[test]
pytest rocksdb/tests/
15 changes: 15 additions & 0 deletions docs/api/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,21 @@ Options objects
| *Type:* ``int``
| *Default:* ``0``

.. py:attribute:: max_total_wal_size

Once write-ahead logs exceed this size, we will start forcing the flush of
column families whose memtables are backed by the oldest live WAL file
(i.e. the ones that are causing all the space amplification). If set to 0
(default), we will dynamically choose the WAL size limit to be
[sum of all write_buffer_size * max_write_buffer_number] * 4.
This option takes effect only when there are more than one column family as
otherwise the wal size is dictated by the write_buffer_size.

Dynamically changeable through SetDBOptions() API.

| *Type:* ``int``
| *Default:* ``0``

.. py:attribute:: manifest_preallocation_size

Number of bytes to preallocate (via fallocate) the manifest
Expand Down
51 changes: 51 additions & 0 deletions rocksdb/_rocksdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,12 @@ cdef class Options(ColumnFamilyOptions):
def __set__(self, value):
self.opts.WAL_size_limit_MB = value

property max_total_wal_size:
def __get__(self):
return self.opts.max_total_wal_size
def __set__(self, value):
self.opts.max_total_wal_size = value

property manifest_preallocation_size:
def __get__(self):
return self.opts.manifest_preallocation_size
Expand Down Expand Up @@ -2002,6 +2008,51 @@ cdef class DB(object):
st = self.db.CompactRange(c_options, cf_handle, begin_ptr, end_ptr)
check_status(st)

def flush(self, wait=True, column_families=None):
"""
Flush memtable data for column families.

If atomic flush is not enabled, flushing multiple column families is
equivalent to calling flush for each column family individually.
If atomic flush is enabled, all specified column families will be
flushed atomically up to the latest sequence number at the time
when flush is requested.

Args:
wait (bool): If True (default), the flush will wait until the
flush is done.
column_families: Specifies which column families to flush:
- None (default): flushes ALL column families in the database
- A single ColumnFamilyHandle: flushes only that column family
- A list/tuple of ColumnFamilyHandle objects: flushes those column families
"""
cdef Status st
cdef options.FlushOptions flush_opts
cdef vector[db.ColumnFamilyHandle*] cf_handles
cdef _ColumnFamilyHandle handle
flush_opts.wait = wait

# Handle different input types for column_families
if column_families is None:
# Flush ALL column families
for handle in self.cf_handles:
cf_handles.push_back(handle.handle)
elif isinstance(column_families, ColumnFamilyHandle):
# Single column family
cf_handles.push_back((<ColumnFamilyHandle?>column_families).get_handle())
elif isinstance(column_families, (list, tuple)):
# Multiple column families
for cf in column_families:
if not isinstance(cf, ColumnFamilyHandle):
raise TypeError("All items in column_families must be ColumnFamilyHandle objects")
cf_handles.push_back((<ColumnFamilyHandle?>cf).get_handle())
else:
raise TypeError("column_families must be None, a ColumnFamilyHandle, or a list of ColumnFamilyHandle objects")

with nogil:
st = self.db.Flush(flush_opts, cf_handles)
check_status(st)

@staticmethod
def __parse_read_opts(
verify_checksums=False,
Expand Down
1 change: 1 addition & 0 deletions rocksdb/db.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb":
const string& GetName() except+ nogil
const options.Options& GetOptions(ColumnFamilyHandle*) except+ nogil
Status Flush(const options.FlushOptions&, ColumnFamilyHandle*) except+ nogil
Status Flush(const options.FlushOptions&, const vector[ColumnFamilyHandle*]&) except+ nogil
Status DisableFileDeletions() except+ nogil
Status EnableFileDeletions() except+ nogil
Status Close() except+ nogil
Expand Down
1 change: 1 addition & 0 deletions rocksdb/options.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
int table_cache_numshardbits
uint64_t WAL_ttl_seconds
uint64_t WAL_size_limit_MB
uint64_t max_total_wal_size
size_t manifest_preallocation_size
cpp_bool allow_mmap_reads
cpp_bool allow_mmap_writes
Expand Down
98 changes: 98 additions & 0 deletions rocksdb/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,101 @@ def test_compact_range(self):

self.db.compact_range(column_family=self.cf_b)

def test_flush(self):
# Check initial state
initial_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0'))

# Write some data
self.db.put(b"a", b"1")
self.db.put(b"b", b"2")

# Flush with default parameters (all column families, wait=True)
self.db.flush()

# Verify flush created SST files at level 0
final_l0_files = int(self.db.get_property(b'rocksdb.num-files-at-level0'))
self.assertGreater(final_l0_files, initial_l0_files)

def test_flush_no_wait(self):
# Write some data
self.db.put(b"a", b"1")

# Flush without waiting - just verify it doesn't raise an exception
self.db.flush(wait=False)

def test_flush_all_column_families(self):
# Check initial state for each column family
initial_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0'))
initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

# Write to multiple column families
self.db.put(b"default_key", b"default_value")
self.db.put((self.cf_a, b"a_key"), b"a_value")
self.db.put((self.cf_b, b"b_key"), b"b_value")

# Flush all column families (default behavior)
self.db.flush()

# Verify all column families were flushed (SST files created)
final_default_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0'))
final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

self.assertGreater(final_default_l0, initial_default_l0)
self.assertGreater(final_cf_a_l0, initial_cf_a_l0)
self.assertGreater(final_cf_b_l0, initial_cf_b_l0)

def test_flush_single_column_family(self):
# Check initial state
initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

# Write to multiple column families
self.db.put((self.cf_a, b"a_key"), b"a_value")
self.db.put((self.cf_b, b"b_key"), b"b_value")

# Flush only cf_a
self.db.flush(column_families=self.cf_a)

# Verify only cf_a was flushed (SST files created)
final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

self.assertGreater(final_cf_a_l0, initial_cf_a_l0)
self.assertEqual(final_cf_b_l0, initial_cf_b_l0) # cf_b should NOT be flushed

def test_flush_multiple_column_families(self):
# Check initial state
initial_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
initial_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

# Write to multiple column families
self.db.put((self.cf_a, b"a_key"), b"a_value")
self.db.put((self.cf_b, b"b_key"), b"b_value")

# Flush both cf_a and cf_b
self.db.flush(column_families=[self.cf_a, self.cf_b])

# Verify both were flushed (SST files created)
final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

self.assertGreater(final_cf_a_l0, initial_cf_a_l0)
self.assertGreater(final_cf_b_l0, initial_cf_b_l0)

# Verify both were flushed (SST files created)
final_cf_a_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_a))
final_cf_b_l0 = int(self.db.get_property(b'rocksdb.num-files-at-level0', self.cf_b))

self.assertGreater(final_cf_a_l0, initial_cf_a_l0)
self.assertGreater(final_cf_b_l0, initial_cf_b_l0)

def test_flush_invalid_column_families(self):
# Test that passing invalid type raises TypeError
with self.assertRaises(TypeError):
self.db.flush(column_families="invalid")

with self.assertRaises(TypeError):
self.db.flush(column_families=["invalid", self.cf_a])

11 changes: 11 additions & 0 deletions rocksdb/tests/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,14 @@ def test_row_cache(self):
self.assertIsNone(opts.row_cache)
opts.row_cache = cache = rocksdb.LRUCache(2*1024*1024)
self.assertEqual(cache, opts.row_cache)

def test_max_total_wal_size(self):
opts = rocksdb.Options()
# Default value is 0
self.assertEqual(opts.max_total_wal_size, 0)
# Set to a specific value and verify setter works
opts.max_total_wal_size = 100 * 1024 * 1024 # 100 MB
self.assertEqual(opts.max_total_wal_size, 100 * 1024 * 1024)
# Set back to 0 (dynamic)
opts.max_total_wal_size = 0
self.assertEqual(opts.max_total_wal_size, 0)