feat(cold): add stream_logs for incremental log streaming#30
feat(cold): add stream_logs for incremental log streaming#30
Conversation
Add `stream_logs` to the `ColdStorage` trait, enabling incremental streaming of log results via bounded `mpsc` channels instead of materializing entire result sets into `Vec<T>`. This bounds memory usage, limits MDBX read transaction and SQL connection hold times, and enables active deadline enforcement independent of consumer polling behavior. Key design decisions: - Spawned producer task + bounded channel (`ReceiverStream`) - Per-block resource acquisition (short-lived MDBX txs / SQL queries) - Fixed anchor hash on the `to` block, re-checked every block for reorg detection - `tokio::time::timeout_at` on sends for active deadline enforcement - Backend-owned semaphore (8 permits) limiting concurrent streams Implements for all three backends: in-memory, MDBX, and SQL (SQLite + PostgreSQL). Includes streaming conformance tests. Bumps workspace version to 0.6.0 (new required trait method). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use fully qualified `crate::LogStream` in doc comment to fix `-D rustdoc::broken-intra-doc-links` in CI. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
prestwich
left a comment
There was a problem hiding this comment.
generally review for unnecessary allocations
…anup Move the streaming loop from individual backends into the shared ColdStorageTask, replacing `stream_logs` on the ColdStorage trait with two simpler primitives (`get_block_hash`, `get_logs_block`). This eliminates ~200 lines of duplicated streaming logic across backends. - Rewrite collect_logs_block in MDBX backend functionally - Extract check_block_hash helper in MDBX backend - Extract append_filter_clause utility in SQL backend - Remove unnecessary collect in SQL get_logs_block - Remove unused tokio/tokio-stream deps from backend crates - Update conformance tests to run through ColdStorageHandle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
[Claude Code] Addressed all review comments in 2e3b4f1:
All clippy (both |
…tency Move the streaming loop from the task runner into ColdStorage::produce_log_stream with a default implementation using get_block_hash + get_logs_block per block. MDBX and SQL backends override with single-transaction implementations for MVCC consistency and fewer round-trips. Add caller-supplied deadline clamped to the task's configured maximum. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove anchor_hash from produce_log_stream trait; backends that hold a consistent snapshot (MDBX Tx<Ro>, PG REPEATABLE READ) no longer need external reorg detection. Extract produce_log_stream_default for backends without snapshot semantics. - Add first_log_index column to receipts, replacing the O(n*k) correlated subquery with an O(1) JOIN for block_log_index. - Split SQL produce_log_stream by backend: PostgreSQL uses REPEATABLE READ with row-level streaming; SQLite delegates to the default implementation to avoid single-connection starvation. - Document partial-delivery semantics on LogStream and stream_logs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- MDBX: iterate receipt cursors inline instead of collecting into intermediate Vec before processing. Both collect_logs_block and produce_log_stream_blocking now process receipts as they are read. - SQL: write filter placeholders directly into the clause string instead of collect/join. Accept iterators in append_filter_clause to avoid intermediate Vec<&[u8]> in build_log_filter_clause. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rait These per-block methods were only used by the default produce_log_stream_default implementation. Replace them with existing get_header and get_logs calls, reducing the trait surface area. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Derive Clone on DatabaseEnv (all fields already cheaply cloneable) - Remove Arc<DatabaseEnv> wrapper from MdbxColdBackend, clone env directly into spawn_blocking for produce_log_stream - Uncomment MDBX produce_log_stream override (single-txn MVCC path) - Change blob/opt_blob helpers to return borrowed &[u8] instead of Vec<u8>, eliminating per-row heap allocations for fixed-size fields - Add b256_col helper for direct B256 extraction from rows - Update decode_u128_required and decode_access_list_or_empty to accept Option<&[u8]> instead of &Option<Vec<u8>> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Filter addresses and topics are already fixed-size slices living in the Filter struct. Borrow them as &[u8] instead of copying each one into a fresh Vec<u8>. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Make produce_log_stream a required trait method — every backend must explicitly choose its streaming strategy. The reorg-detecting helper moves from traits.rs to stream.rs and remains exported as produce_log_stream_default for non-snapshot backends (mem, SQLite). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The MDBX streaming and default streaming implementations reported the per-block `remaining` count instead of the original `max_logs` in TooManyLogs errors. When logs spanned multiple blocks, this caused the error to report a smaller limit than what the caller configured. Add a multi-block conformance test that catches this. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Bundle from, to, max_logs, sender, and deadline into a StreamParams struct, reducing the trait method from 7 parameters to 2 (filter + params). Remove #[allow(clippy::too_many_arguments)] annotations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… macro Add try_stream! macros in the MDBX (blocking) and SQL (async) streaming implementations to replace repeated match/send-error/return blocks. Also thread StreamParams through produce_log_stream_pg directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change ColdStorage::get_logs from `filter: Filter` to `filter: &Filter` since no implementation needs ownership. In the default streaming helper, clone the filter once before the loop and mutate block_option per iteration instead of cloning the full filter (address + topics arrays) on every block. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Merge duplicate alloy::consensus import paths in conformance tests - Use mut parameter instead of let-rebinding in collect_stream - Unwrap directly in tests instead of checking is_some() first - Replace closures with function references in header_from_row - Remove duplicate doc comments on append_filter_clause - Condense truncate_above with loop over table names Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add alloy-primitives with the sqlx feature to get native sqlx Type/Encode/Decode impls for Address, B256, Bytes, and FixedBytes<N>. This eliminates manual from_slice/copy_from_slice calls on the read path and removes the b256_col helper and from_address converter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Annotate the block-iteration loops, channel sends, deadline checks, reorg detection, and snapshot-isolation setup in both produce_log_stream_pg and produce_log_stream_default. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Annotate the blocking log stream method with comments covering MVCC snapshot semantics, cursor reuse, block iteration, filter matching, log limit enforcement, and blocking channel sends. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
stream_logsmethod to theColdStoragetrait for incremental log streaming via boundedmpscchannels, avoiding full result materialization intoVec<T>MemColdBackend), MDBX (MdbxColdBackend), and SQL (SqlColdBackend— SQLite + PostgreSQL)StreamLogsrequest variant, handle method, and runner dispatch for the task-based architectureStreamDeadlineExceeded,ReorgDetected)DatabaseEnvcheaply cloneable (deriveClone)Vec<u8>heap allocations in SQL row extraction (borrow&[u8]from rows)&[u8]from theFilterstruct instead of cloning intoVec<Vec<u8>>get_block_hashandget_logs_blockfrom theColdStoragetrait (replaced by existingget_header/get_logs)TooManyLogsbug: MDBX and default streaming reportedremaininginstead ofmax_logsin error, causing wrong limit values when logs span multiple blocksStreamParamsstruct: bundlesfrom,to,max_logs,sender,deadlineto reduceproduce_log_streamfrom 7 parameters to 2try_stream!macros: reduce repeated match/send-error/return boilerplate in MDBX and SQL streaming implementationsFilteringet_logs: change fromfilter: Filtertofilter: &Filter, eliminating per-block clone in the default streaming helper (N clones → 1)Design
Each
stream_logscall spawns a producer task that sendsRpcLogitems through a boundedmpscchannel. The consumer receives aReceiverStream<ColdResult<RpcLog>>(concrete type, no boxing).Key properties:
timeout_aton channel sends, guaranteeing resource release withinstream_deadlineregardless of consumer behaviorspawn_blocking; PostgreSQL usesREPEATABLE READisolationtoblock hash at stream start, re-checked before every block; snapshot backends skip this (unnecessary under MVCC)sendto fail, producer exits immediately&[u8]from rows; filter params borrow from theFilterstruct;DatabaseEnvclones cheaply withoutArcwrapper;get_logsborrows&Filteravoiding ownership transferTest plan
cargo clippy -p signet-cold --all-features --all-targetscargo clippy -p signet-cold --no-default-features --all-targetscargo clippy -p signet-cold-mdbx --all-features --all-targetscargo clippy -p signet-cold-mdbx --no-default-features --all-targetscargo clippy -p signet-cold-sql --all-features --all-targetscargo clippy -p signet-cold-sql --no-default-features --all-targetscargo clippy -p signet-hot-mdbx --all-features --all-targetscargo clippy -p signet-hot-mdbx --no-default-features --all-targetscargo +nightly fmtcargo t -p signet-hot-mdbx(44 passed)cargo t -p signet-cold(2 passed)cargo t -p signet-cold-mdbx(4 passed)cargo t -p signet-cold-sql --all-features(2 passed — SQLite + PG conformance)./scripts/test-postgres.sh(PostgreSQL conformance via Docker)🤖 Generated with Claude Code