From 40bf95caa1adb4126c5a01d62d023d203e520dbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:15:04 -0300 Subject: [PATCH 01/12] fix: skip finalized blocks during fork-choice Adds a NonFinalizedChain index table to optimize fork choice operations by avoiding full Block deserialization. The index stores (slot, root) -> parent_root mappings, using only 40 bytes per block instead of hundreds. Changes: - New NonFinalizedChain table with big-endian slot keys for efficient range scans - Refactored compute_lmd_ghost_head to use HashMap instead of HashMap - Added get_non_finalized_chain() and get_block_roots() methods to Store - Automatic pruning on finalization via update_checkpoints() - Updated all fork choice call sites to use new index - Historical Blocks table remains intact for queries The big-endian encoding ensures lexicographic order matches numeric order, enabling efficient pruning without custom comparators. --- crates/blockchain/fork_choice/src/lib.rs | 23 ++- crates/blockchain/src/store.rs | 6 +- .../blockchain/tests/forkchoice_spectests.rs | 27 ++-- crates/storage/src/api/tables.rs | 8 +- crates/storage/src/backend/rocksdb.rs | 1 + crates/storage/src/store.rs | 141 +++++++++++++++++- 6 files changed, 171 insertions(+), 35 deletions(-) diff --git a/crates/blockchain/fork_choice/src/lib.rs b/crates/blockchain/fork_choice/src/lib.rs index 80d2f19..c36049f 100644 --- a/crates/blockchain/fork_choice/src/lib.rs +++ b/crates/blockchain/fork_choice/src/lib.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use ethlambda_types::{attestation::AttestationData, block::Block, primitives::H256}; +use ethlambda_types::{attestation::AttestationData, primitives::H256}; /// Compute the LMD GHOST head of the chain, given a starting root, a set of blocks, /// a set of attestations, and a minimum score threshold. @@ -9,7 +9,7 @@ use ethlambda_types::{attestation::AttestationData, block::Block, primitives::H2 // TODO: add proto-array implementation pub fn compute_lmd_ghost_head( mut start_root: H256, - blocks: &HashMap, + blocks: &HashMap, attestations: &HashMap, min_score: u64, ) -> H256 { @@ -19,36 +19,33 @@ pub fn compute_lmd_ghost_head( if start_root.is_zero() { start_root = *blocks .iter() - .min_by_key(|(_, block)| block.slot) + .min_by_key(|(_, (slot, _))| slot) .map(|(root, _)| root) .expect("we already checked blocks is non-empty"); } - let start_slot = blocks[&start_root].slot; + let start_slot = blocks[&start_root].0; let mut weights: HashMap = HashMap::new(); for attestation_data in attestations.values() { let mut current_root = attestation_data.head.root; - while let Some(block) = blocks.get(¤t_root) - && block.slot > start_slot + while let Some(&(slot, parent_root)) = blocks.get(¤t_root) + && slot > start_slot { *weights.entry(current_root).or_default() += 1; - current_root = block.parent_root; + current_root = parent_root; } } let mut children_map: HashMap> = HashMap::new(); - for (root, block) in blocks { - if block.parent_root.is_zero() { + for (root, &(_, parent_root)) in blocks { + if parent_root.is_zero() { continue; } if min_score > 0 && *weights.get(root).unwrap_or(&0) < min_score { continue; } - children_map - .entry(block.parent_root) - .or_default() - .push(*root); + children_map.entry(parent_root).or_default().push(*root); } let mut head = start_root; diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index fc6114a..c89dec0 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -30,7 +30,7 @@ fn accept_new_attestations(store: &mut Store) { /// Update the head based on the fork choice rule. fn update_head(store: &mut Store) { - let blocks: HashMap = store.iter_blocks().collect(); + let blocks = store.get_non_finalized_chain(); let attestations: HashMap = store.iter_known_attestations().collect(); let old_head = store.head(); let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( @@ -69,7 +69,7 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); - let blocks: HashMap = store.iter_blocks().collect(); + let blocks = store.get_non_finalized_chain(); let attestations: HashMap = store.iter_new_attestations().collect(); let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, @@ -559,7 +559,7 @@ pub fn produce_block_with_signatures( .collect(); // Get known block roots for attestation validation - let known_block_roots: HashSet = store.iter_blocks().map(|(root, _)| root).collect(); + let known_block_roots = store.get_block_roots(); // Collect signature data for block building let gossip_signatures: HashMap = diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 86274e0..83fff67 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -165,15 +165,15 @@ fn validate_checks( } // Also validate the root matches a block at this slot - let blocks: HashMap = st.iter_blocks().collect(); + let blocks = st.get_non_finalized_chain(); let block_found = blocks .iter() - .any(|(root, block)| block.slot == expected_slot && *root == target.root); + .any(|(root, (slot, _))| *slot == expected_slot && *root == target.root); if !block_found { let available: Vec<_> = blocks .iter() - .filter(|(_, block)| block.slot == expected_slot) + .filter(|(_, (slot, _))| *slot == expected_slot) .map(|(root, _)| format!("{:?}", root)) .collect(); return Err(format!( @@ -365,7 +365,7 @@ fn validate_lexicographic_head_among( .into()); } - let blocks: HashMap = st.iter_blocks().collect(); + let blocks = st.get_non_finalized_chain(); let known_attestations: HashMap = st.iter_known_attestations().collect(); // Resolve all fork labels to roots and compute their weights @@ -380,13 +380,12 @@ fn validate_lexicographic_head_among( ) })?; - let block = blocks.get(root).ok_or_else(|| { + let (slot, _parent_root) = blocks.get(root).ok_or_else(|| { format!( "Step {}: block for label '{}' not found in store", step_idx, label ) })?; - let slot = block.slot; // Calculate attestation weight: count attestations voting for this fork // An attestation votes for this fork if its head is this block or a descendant @@ -396,18 +395,18 @@ fn validate_lexicographic_head_among( // Check if attestation head is this block or a descendant if att_head_root == *root { weight += 1; - } else if let Some(att_block) = blocks.get(&att_head_root) { + } else if let Some(&(att_slot, _)) = blocks.get(&att_head_root) { // Walk back from attestation head to see if we reach this block let mut current = att_head_root; - let mut current_slot = att_block.slot; - while current_slot > slot { - if let Some(blk) = blocks.get(¤t) { - if blk.parent_root == *root { + let mut current_slot = att_slot; + while current_slot > *slot { + if let Some(&(_, parent_root)) = blocks.get(¤t) { + if parent_root == *root { weight += 1; break; } - current = blk.parent_root; - current_slot = blocks.get(¤t).map(|b| b.slot).unwrap_or(0); + current = parent_root; + current_slot = blocks.get(¤t).map(|(s, _)| *s).unwrap_or(0); } else { break; } @@ -415,7 +414,7 @@ fn validate_lexicographic_head_among( } } - fork_data.insert(label.as_str(), (*root, slot, weight)); + fork_data.insert(label.as_str(), (*root, *slot, weight)); } // Verify all forks are at the same slot diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 170176f..9140663 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -20,10 +20,15 @@ pub enum Table { AggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, + /// Non-finalized chain index: (slot || root) -> parent_root + /// + /// Fast lookup for fork choice without deserializing full blocks. + /// Pruned when slots become finalized. + NonFinalizedChain, } /// All table variants. -pub const ALL_TABLES: [Table; 8] = [ +pub const ALL_TABLES: [Table; 9] = [ Table::Blocks, Table::BlockSignatures, Table::States, @@ -32,4 +37,5 @@ pub const ALL_TABLES: [Table; 8] = [ Table::GossipSignatures, Table::AggregatedPayloads, Table::Metadata, + Table::NonFinalizedChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 6790906..3171ac2 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -20,6 +20,7 @@ fn cf_name(table: Table) -> &'static str { Table::GossipSignatures => "gossip_signatures", Table::AggregatedPayloads => "aggregated_payloads", Table::Metadata => "metadata", + Table::NonFinalizedChain => "non_finalized_chain", } } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index cc03e5d..8598c3c 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,3 +1,4 @@ +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::api::{StorageBackend, Table}; @@ -84,6 +85,22 @@ fn decode_signature_key(bytes: &[u8]) -> SignatureKey { (validator_id, root) } +/// Encode a NonFinalizedChain key (slot, root) to bytes. +/// Layout: slot (8 bytes big-endian) || root (32 bytes) +/// Big-endian ensures lexicographic ordering matches numeric ordering. +fn encode_non_finalized_chain_key(slot: u64, root: &H256) -> Vec { + let mut result = slot.to_be_bytes().to_vec(); + result.extend_from_slice(&root.0); + result +} + +/// Decode a NonFinalizedChain key from bytes. +fn decode_non_finalized_chain_key(bytes: &[u8]) -> (u64, H256) { + let slot = u64::from_be_bytes(bytes[..8].try_into().expect("valid slot bytes")); + let root = H256::from_slice(&bytes[8..]); + (slot, root) +} + /// Underlying storage of the node. /// Similar to the spec's `Store`, but backed by a pluggable storage backend. /// @@ -166,6 +183,15 @@ impl Store { .put_batch(Table::States, state_entries) .expect("put state"); + // Non-finalized chain index + let index_entries = vec![( + encode_non_finalized_chain_key(anchor_block.slot, &anchor_block_root), + anchor_block.parent_root.as_ssz_bytes(), + )]; + batch + .put_batch(Table::NonFinalizedChain, index_entries) + .expect("put non-finalized chain index"); + batch.commit().expect("commit"); } @@ -244,25 +270,47 @@ impl Store { /// - Head is always updated to the new value. /// - Justified is updated if provided. /// - Finalized is updated if provided. + /// + /// When finalization advances, prunes the NonFinalizedChain index. pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { + // Check if we need to prune before updating metadata + let should_prune = checkpoints.finalized.is_some(); + let old_finalized_slot = if should_prune { + Some(self.latest_finalized().slot) + } else { + None + }; + let mut entries = vec![(KEY_HEAD.to_vec(), checkpoints.head.as_ssz_bytes())]; if let Some(justified) = checkpoints.justified { entries.push((KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes())); } - if let Some(finalized) = checkpoints.finalized { + let new_finalized_slot = if let Some(finalized) = checkpoints.finalized { entries.push((KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes())); - } + Some(finalized.slot) + } else { + None + }; let mut batch = self.backend.begin_write().expect("write batch"); batch.put_batch(Table::Metadata, entries).expect("put"); batch.commit().expect("commit"); + + // Prune after successful checkpoint update + if let (Some(old_slot), Some(new_slot)) = (old_finalized_slot, new_finalized_slot) + && new_slot > old_slot + { + self.prune_non_finalized_chain(new_slot); + } } // ============ Blocks ============ /// Iterate over all (root, block) pairs. + /// + /// DEPRECATED: Use `get_non_finalized_chain()` for fork choice instead. pub fn iter_blocks(&self) -> impl Iterator + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view @@ -278,6 +326,72 @@ impl Store { entries.into_iter() } + /// Get block data for fork choice: root -> (slot, parent_root). + /// + /// Iterates only the NonFinalizedChain table, avoiding Block deserialization. + /// Much faster than `iter_blocks()` for fork choice operations. + pub fn get_non_finalized_chain(&self) -> HashMap { + let view = self.backend.begin_read().expect("read view"); + view.prefix_iterator(Table::NonFinalizedChain, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, v)| { + let (slot, root) = decode_non_finalized_chain_key(&k); + let parent_root = H256::from_ssz_bytes(&v).expect("valid parent_root"); + (root, (slot, parent_root)) + }) + .collect() + } + + /// Get all known block roots as HashSet. + /// + /// Useful for checking block existence without deserializing. + pub fn get_block_roots(&self) -> HashSet { + let view = self.backend.begin_read().expect("read view"); + view.prefix_iterator(Table::NonFinalizedChain, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .map(|(k, _)| { + let (_, root) = decode_non_finalized_chain_key(&k); + root + }) + .collect() + } + + /// Prune slot index entries with slot < finalized_slot. + /// + /// Blocks/states are retained for historical queries, only the + /// NonFinalizedChain index is pruned. + pub fn prune_non_finalized_chain(&mut self, finalized_slot: u64) { + let view = self.backend.begin_read().expect("read view"); + + // Collect keys to delete + let keys_to_delete: Vec<_> = view + .prefix_iterator(Table::NonFinalizedChain, &[]) + .expect("iterator") + .filter_map(|res| res.ok()) + .filter_map(|(k, _)| { + let (slot, _) = decode_non_finalized_chain_key(&k); + if slot < finalized_slot { + Some(k.to_vec()) + } else { + None + } + }) + .collect(); + drop(view); + + if keys_to_delete.is_empty() { + return; + } + + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::NonFinalizedChain, keys_to_delete) + .expect("delete non-finalized chain entries"); + batch.commit().expect("commit"); + } + pub fn get_block(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); view.get(Table::Blocks, &root.as_ssz_bytes()) @@ -294,8 +408,19 @@ impl Store { pub fn insert_block(&mut self, root: H256, block: Block) { let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.as_ssz_bytes(), block.as_ssz_bytes())]; - batch.put_batch(Table::Blocks, entries).expect("put block"); + let block_entries = vec![(root.as_ssz_bytes(), block.as_ssz_bytes())]; + batch + .put_batch(Table::Blocks, block_entries) + .expect("put block"); + + let index_entries = vec![( + encode_non_finalized_chain_key(block.slot, &root), + block.parent_root.as_ssz_bytes(), + )]; + batch + .put_batch(Table::NonFinalizedChain, index_entries) + .expect("put non-finalized chain index"); + batch.commit().expect("commit"); } @@ -336,6 +461,14 @@ impl Store { .put_batch(Table::BlockSignatures, sig_entries) .expect("put block signatures"); + let index_entries = vec![( + encode_non_finalized_chain_key(block.slot, &root), + block.parent_root.as_ssz_bytes(), + )]; + batch + .put_batch(Table::NonFinalizedChain, index_entries) + .expect("put non-finalized chain index"); + batch.commit().expect("commit"); } From 2cee1cf034b9457dd63f87860bb3a34da36212cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:27:08 -0300 Subject: [PATCH 02/12] chore: remove deprecated iter_blocks method --- crates/storage/src/store.rs | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 8598c3c..34898af 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -308,28 +308,10 @@ impl Store { // ============ Blocks ============ - /// Iterate over all (root, block) pairs. - /// - /// DEPRECATED: Use `get_non_finalized_chain()` for fork choice instead. - pub fn iter_blocks(&self) -> impl Iterator + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::Blocks, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let root = H256::from_ssz_bytes(&k).expect("valid root"); - let block = Block::from_ssz_bytes(&v).expect("valid block"); - (root, block) - }) - .collect(); - entries.into_iter() - } - /// Get block data for fork choice: root -> (slot, parent_root). /// /// Iterates only the NonFinalizedChain table, avoiding Block deserialization. - /// Much faster than `iter_blocks()` for fork choice operations. + /// Returns only non-finalized blocks, automatically pruned on finalization. pub fn get_non_finalized_chain(&self) -> HashMap { let view = self.backend.begin_read().expect("read view"); view.prefix_iterator(Table::NonFinalizedChain, &[]) From 7893f79e6bb7e9925c247742981d25bb8263b3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:30:52 -0300 Subject: [PATCH 03/12] perf: optimize prune_non_finalized_chain with early termination Use take_while() to stop iterating once we hit finalized_slot. Since keys are sorted by slot (big-endian encoding), we can stop early instead of scanning the entire table. --- crates/storage/src/store.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 34898af..8f390b5 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -347,19 +347,17 @@ impl Store { pub fn prune_non_finalized_chain(&mut self, finalized_slot: u64) { let view = self.backend.begin_read().expect("read view"); - // Collect keys to delete + // Collect keys to delete - stop once we hit finalized_slot + // Keys are sorted by slot (big-endian encoding) so we can stop early let keys_to_delete: Vec<_> = view .prefix_iterator(Table::NonFinalizedChain, &[]) .expect("iterator") .filter_map(|res| res.ok()) - .filter_map(|(k, _)| { - let (slot, _) = decode_non_finalized_chain_key(&k); - if slot < finalized_slot { - Some(k.to_vec()) - } else { - None - } + .take_while(|(k, _)| { + let (slot, _) = decode_non_finalized_chain_key(k); + slot < finalized_slot }) + .map(|(k, _)| k.to_vec()) .collect(); drop(view); From c62b9fe9e36b7b7e37ec773278460e0927eea66e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:43:38 -0300 Subject: [PATCH 04/12] fix: prune attestations on finalization --- crates/blockchain/src/store.rs | 32 +++--- crates/common/types/src/block.rs | 2 +- crates/common/types/src/signature.rs | 36 ++++++- crates/storage/src/store.rs | 145 +++++++++++++++++++++++---- 4 files changed, 179 insertions(+), 36 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index c89dec0..d8b02c0 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -217,10 +217,9 @@ pub fn on_gossip_attestation( if cfg!(not(feature = "skip-signature-verification")) { // Store signature for later lookup during block building - let signature_key = (validator_id, message); let signature = ValidatorSignature::from_bytes(&signed_attestation.signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature(signature_key, signature); + store.insert_gossip_signature(&attestation.data, validator_id, signature); } metrics::inc_attestations_valid("gossip"); @@ -382,12 +381,10 @@ pub fn on_block( .zip(attestation_signatures.iter()) { let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits); - let data_root = att.data.tree_hash_root(); for validator_id in validator_ids { // Update Proof Map - Store the proof so future block builders can reuse this aggregation - let key: SignatureKey = (validator_id, data_root); - store.push_aggregated_payload(key, proof.clone()); + store.insert_aggregated_payload(&att.data, validator_id, proof.clone()); // Update Fork Choice - Register the vote immediately (historical/on-chain) let attestation = Attestation { @@ -415,14 +412,14 @@ pub fn on_block( if cfg!(not(feature = "skip-signature-verification")) { // Store the proposer's signature for potential future block building - let proposer_sig_key: SignatureKey = ( - proposer_attestation.validator_id, - proposer_attestation.data.tree_hash_root(), - ); let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature(proposer_sig_key, proposer_sig); + store.insert_gossip_signature( + &proposer_attestation.data, + proposer_attestation.validator_id, + proposer_sig, + ); } // Process proposer attestation (enters "new" stage, not "known") @@ -562,10 +559,17 @@ pub fn produce_block_with_signatures( let known_block_roots = store.get_block_roots(); // Collect signature data for block building - let gossip_signatures: HashMap = - store.iter_gossip_signatures().collect(); - let aggregated_payloads: HashMap> = - store.iter_aggregated_payloads().collect(); + let gossip_signatures: HashMap = store + .iter_gossip_signatures() + .filter_map(|(key, stored)| stored.to_validator_signature().ok().map(|sig| (key, sig))) + .collect(); + let aggregated_payloads: HashMap> = store + .iter_aggregated_payloads() + .map(|(key, stored_payloads)| { + let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); + (key, proofs) + }) + .collect(); // Build the block using fixed-point attestation collection let (block, _post_state, signatures) = build_block( diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index 658e239..fc24a63 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -73,7 +73,7 @@ pub type AttestationSignatures = /// The proof can verify that all participants signed the same message in the /// same epoch, using a single verification operation instead of checking /// each signature individually. -#[derive(Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode)] pub struct AggregatedSignatureProof { /// Bitfield indicating which validators' signatures are included. pub participants: AggregationBits, diff --git a/crates/common/types/src/signature.rs b/crates/common/types/src/signature.rs index 3a02059..5addf83 100644 --- a/crates/common/types/src/signature.rs +++ b/crates/common/types/src/signature.rs @@ -3,9 +3,10 @@ use leansig::{ signature::{SignatureScheme, SigningError}, }; use ssz::DecodeError; +use ssz_derive::{Decode, Encode}; use ssz_types::typenum::{Diff, U488, U3600}; -use crate::primitives::H256; +use crate::{block::AggregatedSignatureProof, primitives::H256}; /// The XMSS signature scheme used for validator signatures. /// @@ -91,3 +92,36 @@ impl ValidatorSecretKey { Ok(ValidatorSignature { inner: sig }) } } + +/// Gossip signature stored with slot for pruning. +/// +/// Signatures are stored alongside the slot they pertain to, enabling +/// simple slot-based pruning when blocks become finalized. +#[derive(Debug, Clone, Encode, Decode)] +pub struct StoredSignature { + pub slot: u64, + pub signature_bytes: Vec, +} + +impl StoredSignature { + pub fn new(slot: u64, signature: ValidatorSignature) -> Self { + Self { + slot, + signature_bytes: signature.to_bytes(), + } + } + + pub fn to_validator_signature(&self) -> Result { + ValidatorSignature::from_bytes(&self.signature_bytes) + } +} + +/// Aggregated payload stored with slot for pruning. +/// +/// Aggregated signature proofs are stored with their slot to enable +/// pruning when blocks become finalized. +#[derive(Debug, Clone, Encode, Decode)] +pub struct StoredAggregatedPayload { + pub slot: u64, + pub proof: AggregatedSignatureProof, +} diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 8f390b5..39f1c52 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -10,7 +10,7 @@ use ethlambda_types::{ BlockWithAttestation, SignedBlockWithAttestation, }, primitives::{Decode, Encode, H256, TreeHash}, - signature::ValidatorSignature, + signature::{StoredAggregatedPayload, StoredSignature, ValidatorSignature}, state::{ChainConfig, Checkpoint, State}, }; use tracing::info; @@ -303,6 +303,16 @@ impl Store { && new_slot > old_slot { self.prune_non_finalized_chain(new_slot); + + // Prune signatures and payloads for finalized slots + let pruned_sigs = self.prune_gossip_signatures(new_slot); + let pruned_payloads = self.prune_aggregated_payloads(new_slot); + if pruned_sigs > 0 || pruned_payloads > 0 { + info!( + finalized_slot = new_slot, + pruned_sigs, pruned_payloads, "Pruned finalized signatures" + ); + } } } @@ -372,6 +382,82 @@ impl Store { batch.commit().expect("commit"); } + /// Prune gossip signatures for slots <= finalized_slot. + /// + /// Returns the number of signatures pruned. + pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::GossipSignatures, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(stored) = StoredSignature::from_ssz_bytes(&value_bytes) + && stored.slot <= finalized_slot + { + to_delete.push(key_bytes.to_vec()); + } + } + drop(view); + + let count = to_delete.len(); + if !to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::GossipSignatures, to_delete) + .expect("delete"); + batch.commit().expect("commit"); + } + count + } + + /// Prune aggregated payloads for slots <= finalized_slot. + /// + /// Returns the number of payloads pruned. + pub fn prune_aggregated_payloads(&mut self, finalized_slot: u64) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut updates = vec![]; + let mut deletes = vec![]; + let mut removed_count = 0; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::AggregatedPayloads, &[]) + .expect("iter") + .filter_map(|r| r.ok()) + { + if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { + let original_len = payloads.len(); + payloads.retain(|p| p.slot > finalized_slot); + removed_count += original_len - payloads.len(); + + if payloads.is_empty() { + deletes.push(key_bytes.to_vec()); + } else if payloads.len() < original_len { + updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); + } + } + } + drop(view); + + if !updates.is_empty() || !deletes.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + if !updates.is_empty() { + batch + .put_batch(Table::AggregatedPayloads, updates) + .expect("put"); + } + if !deletes.is_empty() { + batch + .delete_batch(Table::AggregatedPayloads, deletes) + .expect("delete"); + } + batch.commit().expect("commit"); + } + removed_count + } + pub fn get_block(&self, root: &H256) -> Option { let view = self.backend.begin_read().expect("read view"); view.get(Table::Blocks, &root.as_ssz_bytes()) @@ -614,10 +700,10 @@ impl Store { // ============ Gossip Signatures ============ - /// Iterate over all (signature_key, signature) pairs. + /// Iterate over all (signature_key, stored_signature) pairs. pub fn iter_gossip_signatures( &self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view .prefix_iterator(Table::GossipSignatures, &[]) @@ -625,19 +711,19 @@ impl Store { .filter_map(|res| res.ok()) .filter_map(|(k, v)| { let key = decode_signature_key(&k); - ValidatorSignature::from_bytes(&v) + StoredSignature::from_ssz_bytes(&v) .ok() - .map(|sig| (key, sig)) + .map(|stored| (key, stored)) }) .collect(); entries.into_iter() } - pub fn get_gossip_signature(&self, key: &SignatureKey) -> Option { + pub fn get_gossip_signature(&self, key: &SignatureKey) -> Option { let view = self.backend.begin_read().expect("read view"); view.get(Table::GossipSignatures, &encode_signature_key(key)) .expect("get") - .and_then(|bytes| ValidatorSignature::from_bytes(&bytes).ok()) + .and_then(|bytes| StoredSignature::from_ssz_bytes(&bytes).ok()) } pub fn contains_gossip_signature(&self, key: &SignatureKey) -> bool { @@ -647,9 +733,19 @@ impl Store { .is_some() } - pub fn insert_gossip_signature(&mut self, key: SignatureKey, signature: ValidatorSignature) { + pub fn insert_gossip_signature( + &mut self, + attestation_data: &AttestationData, + validator_id: u64, + signature: ValidatorSignature, + ) { + let slot = attestation_data.slot; + let data_root = attestation_data.tree_hash_root(); + let key = (validator_id, data_root); + + let stored = StoredSignature::new(slot, signature); let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encode_signature_key(&key), signature.to_bytes())]; + let entries = vec![(encode_signature_key(&key), stored.as_ssz_bytes())]; batch .put_batch(Table::GossipSignatures, entries) .expect("put signature"); @@ -658,10 +754,10 @@ impl Store { // ============ Aggregated Payloads ============ - /// Iterate over all (signature_key, proofs) pairs. + /// Iterate over all (signature_key, stored_payloads) pairs. pub fn iter_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { + ) -> impl Iterator)> + '_ { let view = self.backend.begin_read().expect("read view"); let entries: Vec<_> = view .prefix_iterator(Table::AggregatedPayloads, &[]) @@ -669,9 +765,9 @@ impl Store { .filter_map(|res| res.ok()) .map(|(k, v)| { let key = decode_signature_key(&k); - let proofs = - Vec::::from_ssz_bytes(&v).expect("valid proofs"); - (key, proofs) + let payloads = + Vec::::from_ssz_bytes(&v).expect("valid payloads"); + (key, payloads) }) .collect(); entries.into_iter() @@ -680,22 +776,31 @@ impl Store { pub fn get_aggregated_payloads( &self, key: &SignatureKey, - ) -> Option> { + ) -> Option> { let view = self.backend.begin_read().expect("read view"); view.get(Table::AggregatedPayloads, &encode_signature_key(key)) .expect("get") .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid proofs") + Vec::::from_ssz_bytes(&bytes).expect("valid payloads") }) } - pub fn push_aggregated_payload(&mut self, key: SignatureKey, proof: AggregatedSignatureProof) { + pub fn insert_aggregated_payload( + &mut self, + attestation_data: &AttestationData, + validator_id: u64, + proof: AggregatedSignatureProof, + ) { + let slot = attestation_data.slot; + let data_root = attestation_data.tree_hash_root(); + let key = (validator_id, data_root); + // Read existing, add new, write back - let mut proofs = self.get_aggregated_payloads(&key).unwrap_or_default(); - proofs.push(proof); + let mut payloads = self.get_aggregated_payloads(&key).unwrap_or_default(); + payloads.push(StoredAggregatedPayload { slot, proof }); let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encode_signature_key(&key), proofs.as_ssz_bytes())]; + let entries = vec![(encode_signature_key(&key), payloads.as_ssz_bytes())]; batch .put_batch(Table::AggregatedPayloads, entries) .expect("put proofs"); From d37bdc7d05de8b1bea958537ade28b2d7667fe80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 15:59:30 -0300 Subject: [PATCH 05/12] docs: document race condition in insert_aggregated_payload Add thread safety documentation to insert_aggregated_payload method to clarify the non-atomic read-modify-write pattern. This method reads existing payloads, appends a new one, and writes back. Concurrent calls could result in lost updates. The method must be called from a single thread. In ethlambda's architecture, the BlockChain actor (GenServer pattern) provides single-threaded access. --- crates/storage/src/store.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 39f1c52..5e1a3e7 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -785,6 +785,21 @@ impl Store { }) } + /// Insert an aggregated signature proof for a validator's attestation. + /// + /// Multiple proofs can be stored for the same (validator, attestation_data) pair, + /// each with its own slot metadata for pruning. + /// + /// # Thread Safety + /// + /// This method uses a read-modify-write pattern that is NOT atomic: + /// 1. Read existing payloads + /// 2. Append new payload + /// 3. Write back + /// + /// Concurrent calls could result in lost updates. This method MUST be called + /// from a single thread. In ethlambda, the Store is owned by the BlockChain + /// actor which provides single-threaded access. pub fn insert_aggregated_payload( &mut self, attestation_data: &AttestationData, @@ -795,7 +810,7 @@ impl Store { let data_root = attestation_data.tree_hash_root(); let key = (validator_id, data_root); - // Read existing, add new, write back + // Read existing, add new, write back (NOT atomic - requires single-threaded access) let mut payloads = self.get_aggregated_payloads(&key).unwrap_or_default(); payloads.push(StoredAggregatedPayload { slot, proof }); From 93467a66738acbdb71b8ebc2e35161a53493dcc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:26:48 -0300 Subject: [PATCH 06/12] refactor: rename NonFinalizedChain to LiveChain Rename NonFinalizedChain table and related functions to LiveChain for better semantic clarity. The table includes the finalized block (as an anchor for fork choice) plus all non-finalized blocks, so 'LiveChain' better describes its contents. Changes: - Table::NonFinalizedChain -> Table::LiveChain - encode_non_finalized_chain_key -> encode_live_chain_key - decode_non_finalized_chain_key -> decode_live_chain_key - get_non_finalized_chain -> get_live_chain - prune_non_finalized_chain -> prune_live_chain - Column family: non_finalized_chain -> live_chain All tests pass. No functional changes. --- crates/blockchain/src/store.rs | 4 +- .../blockchain/tests/forkchoice_spectests.rs | 4 +- crates/storage/src/api/tables.rs | 9 ++-- crates/storage/src/backend/rocksdb.rs | 2 +- crates/storage/src/store.rs | 46 +++++++++---------- 5 files changed, 33 insertions(+), 32 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index d8b02c0..4f0e460 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -30,7 +30,7 @@ fn accept_new_attestations(store: &mut Store) { /// Update the head based on the fork choice rule. fn update_head(store: &mut Store) { - let blocks = store.get_non_finalized_chain(); + let blocks = store.get_live_chain(); let attestations: HashMap = store.iter_known_attestations().collect(); let old_head = store.head(); let new_head = ethlambda_fork_choice::compute_lmd_ghost_head( @@ -69,7 +69,7 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); - let blocks = store.get_non_finalized_chain(); + let blocks = store.get_live_chain(); let attestations: HashMap = store.iter_new_attestations().collect(); let safe_target = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 83fff67..7545d4e 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -165,7 +165,7 @@ fn validate_checks( } // Also validate the root matches a block at this slot - let blocks = st.get_non_finalized_chain(); + let blocks = st.get_live_chain(); let block_found = blocks .iter() .any(|(root, (slot, _))| *slot == expected_slot && *root == target.root); @@ -365,7 +365,7 @@ fn validate_lexicographic_head_among( .into()); } - let blocks = st.get_non_finalized_chain(); + let blocks = st.get_live_chain(); let known_attestations: HashMap = st.iter_known_attestations().collect(); // Resolve all fork labels to roots and compute their weights diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 9140663..36ed953 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -20,11 +20,12 @@ pub enum Table { AggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, - /// Non-finalized chain index: (slot || root) -> parent_root + /// Live chain index: (slot || root) -> parent_root /// /// Fast lookup for fork choice without deserializing full blocks. - /// Pruned when slots become finalized. - NonFinalizedChain, + /// Includes finalized blocks (anchor) and all non-finalized blocks. + /// Pruned when slots become finalized (keeps finalized block itself). + LiveChain, } /// All table variants. @@ -37,5 +38,5 @@ pub const ALL_TABLES: [Table; 9] = [ Table::GossipSignatures, Table::AggregatedPayloads, Table::Metadata, - Table::NonFinalizedChain, + Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 3171ac2..b42053d 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -20,7 +20,7 @@ fn cf_name(table: Table) -> &'static str { Table::GossipSignatures => "gossip_signatures", Table::AggregatedPayloads => "aggregated_payloads", Table::Metadata => "metadata", - Table::NonFinalizedChain => "non_finalized_chain", + Table::LiveChain => "live_chain", } } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 5e1a3e7..48fad43 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -85,17 +85,17 @@ fn decode_signature_key(bytes: &[u8]) -> SignatureKey { (validator_id, root) } -/// Encode a NonFinalizedChain key (slot, root) to bytes. +/// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. -fn encode_non_finalized_chain_key(slot: u64, root: &H256) -> Vec { +fn encode_live_chain_key(slot: u64, root: &H256) -> Vec { let mut result = slot.to_be_bytes().to_vec(); result.extend_from_slice(&root.0); result } -/// Decode a NonFinalizedChain key from bytes. -fn decode_non_finalized_chain_key(bytes: &[u8]) -> (u64, H256) { +/// Decode a LiveChain key from bytes. +fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { let slot = u64::from_be_bytes(bytes[..8].try_into().expect("valid slot bytes")); let root = H256::from_slice(&bytes[8..]); (slot, root) @@ -185,11 +185,11 @@ impl Store { // Non-finalized chain index let index_entries = vec![( - encode_non_finalized_chain_key(anchor_block.slot, &anchor_block_root), + encode_live_chain_key(anchor_block.slot, &anchor_block_root), anchor_block.parent_root.as_ssz_bytes(), )]; batch - .put_batch(Table::NonFinalizedChain, index_entries) + .put_batch(Table::LiveChain, index_entries) .expect("put non-finalized chain index"); batch.commit().expect("commit"); @@ -271,7 +271,7 @@ impl Store { /// - Justified is updated if provided. /// - Finalized is updated if provided. /// - /// When finalization advances, prunes the NonFinalizedChain index. + /// When finalization advances, prunes the LiveChain index. pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { // Check if we need to prune before updating metadata let should_prune = checkpoints.finalized.is_some(); @@ -302,7 +302,7 @@ impl Store { if let (Some(old_slot), Some(new_slot)) = (old_finalized_slot, new_finalized_slot) && new_slot > old_slot { - self.prune_non_finalized_chain(new_slot); + self.prune_live_chain(new_slot); // Prune signatures and payloads for finalized slots let pruned_sigs = self.prune_gossip_signatures(new_slot); @@ -320,15 +320,15 @@ impl Store { /// Get block data for fork choice: root -> (slot, parent_root). /// - /// Iterates only the NonFinalizedChain table, avoiding Block deserialization. + /// Iterates only the LiveChain table, avoiding Block deserialization. /// Returns only non-finalized blocks, automatically pruned on finalization. - pub fn get_non_finalized_chain(&self) -> HashMap { + pub fn get_live_chain(&self) -> HashMap { let view = self.backend.begin_read().expect("read view"); - view.prefix_iterator(Table::NonFinalizedChain, &[]) + view.prefix_iterator(Table::LiveChain, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { - let (slot, root) = decode_non_finalized_chain_key(&k); + let (slot, root) = decode_live_chain_key(&k); let parent_root = H256::from_ssz_bytes(&v).expect("valid parent_root"); (root, (slot, parent_root)) }) @@ -340,11 +340,11 @@ impl Store { /// Useful for checking block existence without deserializing. pub fn get_block_roots(&self) -> HashSet { let view = self.backend.begin_read().expect("read view"); - view.prefix_iterator(Table::NonFinalizedChain, &[]) + view.prefix_iterator(Table::LiveChain, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, _)| { - let (_, root) = decode_non_finalized_chain_key(&k); + let (_, root) = decode_live_chain_key(&k); root }) .collect() @@ -353,18 +353,18 @@ impl Store { /// Prune slot index entries with slot < finalized_slot. /// /// Blocks/states are retained for historical queries, only the - /// NonFinalizedChain index is pruned. - pub fn prune_non_finalized_chain(&mut self, finalized_slot: u64) { + /// LiveChain index is pruned. + pub fn prune_live_chain(&mut self, finalized_slot: u64) { let view = self.backend.begin_read().expect("read view"); // Collect keys to delete - stop once we hit finalized_slot // Keys are sorted by slot (big-endian encoding) so we can stop early let keys_to_delete: Vec<_> = view - .prefix_iterator(Table::NonFinalizedChain, &[]) + .prefix_iterator(Table::LiveChain, &[]) .expect("iterator") .filter_map(|res| res.ok()) .take_while(|(k, _)| { - let (slot, _) = decode_non_finalized_chain_key(k); + let (slot, _) = decode_live_chain_key(k); slot < finalized_slot }) .map(|(k, _)| k.to_vec()) @@ -377,7 +377,7 @@ impl Store { let mut batch = self.backend.begin_write().expect("write batch"); batch - .delete_batch(Table::NonFinalizedChain, keys_to_delete) + .delete_batch(Table::LiveChain, keys_to_delete) .expect("delete non-finalized chain entries"); batch.commit().expect("commit"); } @@ -480,11 +480,11 @@ impl Store { .expect("put block"); let index_entries = vec![( - encode_non_finalized_chain_key(block.slot, &root), + encode_live_chain_key(block.slot, &root), block.parent_root.as_ssz_bytes(), )]; batch - .put_batch(Table::NonFinalizedChain, index_entries) + .put_batch(Table::LiveChain, index_entries) .expect("put non-finalized chain index"); batch.commit().expect("commit"); @@ -528,11 +528,11 @@ impl Store { .expect("put block signatures"); let index_entries = vec![( - encode_non_finalized_chain_key(block.slot, &root), + encode_live_chain_key(block.slot, &root), block.parent_root.as_ssz_bytes(), )]; batch - .put_batch(Table::NonFinalizedChain, index_entries) + .put_batch(Table::LiveChain, index_entries) .expect("put non-finalized chain index"); batch.commit().expect("commit"); From b2f21d63f518d36cf6078fdc9ab994b44a283770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 16:58:10 -0300 Subject: [PATCH 07/12] refactor: simplify update_checkpoints logic Remove unnecessary Option wrapping and complex pattern matching. The old code wrapped old_finalized_slot and new_finalized_slot in Options only to immediately unwrap them in a pattern match. Simplified approach: - Always read old_finalized_slot at start - Update metadata - If finalized checkpoint provided and advanced, prune Same behavior, clearer code. --- crates/storage/src/store.rs | 44 ++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 48fad43..ee92873 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -273,13 +273,8 @@ impl Store { /// /// When finalization advances, prunes the LiveChain index. pub fn update_checkpoints(&mut self, checkpoints: ForkCheckpoints) { - // Check if we need to prune before updating metadata - let should_prune = checkpoints.finalized.is_some(); - let old_finalized_slot = if should_prune { - Some(self.latest_finalized().slot) - } else { - None - }; + // Read old finalized slot before updating metadata + let old_finalized_slot = self.latest_finalized().slot; let mut entries = vec![(KEY_HEAD.to_vec(), checkpoints.head.as_ssz_bytes())]; @@ -287,31 +282,30 @@ impl Store { entries.push((KEY_LATEST_JUSTIFIED.to_vec(), justified.as_ssz_bytes())); } - let new_finalized_slot = if let Some(finalized) = checkpoints.finalized { + if let Some(finalized) = checkpoints.finalized { entries.push((KEY_LATEST_FINALIZED.to_vec(), finalized.as_ssz_bytes())); - Some(finalized.slot) - } else { - None - }; + } let mut batch = self.backend.begin_write().expect("write batch"); batch.put_batch(Table::Metadata, entries).expect("put"); batch.commit().expect("commit"); // Prune after successful checkpoint update - if let (Some(old_slot), Some(new_slot)) = (old_finalized_slot, new_finalized_slot) - && new_slot > old_slot - { - self.prune_live_chain(new_slot); - - // Prune signatures and payloads for finalized slots - let pruned_sigs = self.prune_gossip_signatures(new_slot); - let pruned_payloads = self.prune_aggregated_payloads(new_slot); - if pruned_sigs > 0 || pruned_payloads > 0 { - info!( - finalized_slot = new_slot, - pruned_sigs, pruned_payloads, "Pruned finalized signatures" - ); + if let Some(finalized) = checkpoints.finalized { + if finalized.slot > old_finalized_slot { + self.prune_live_chain(finalized.slot); + + // Prune signatures and payloads for finalized slots + let pruned_sigs = self.prune_gossip_signatures(finalized.slot); + let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); + if pruned_sigs > 0 || pruned_payloads > 0 { + info!( + finalized_slot = finalized.slot, + pruned_sigs, + pruned_payloads, + "Pruned finalized signatures" + ); + } } } } From 9c5321a4e263f10c8cf8f2b6f1a590380a7721e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:20:50 -0300 Subject: [PATCH 08/12] chore: fmt --- crates/storage/src/store.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index ee92873..5db5437 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -301,9 +301,7 @@ impl Store { if pruned_sigs > 0 || pruned_payloads > 0 { info!( finalized_slot = finalized.slot, - pruned_sigs, - pruned_payloads, - "Pruned finalized signatures" + pruned_sigs, pruned_payloads, "Pruned finalized signatures" ); } } From 5c0d155646568f60804e712b741085e443d5be78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:42:36 -0300 Subject: [PATCH 09/12] refactor: collapse nested if in update_checkpoints Fix clippy::collapsible_if warning by using && operator in let-if pattern. Before: if let Some(finalized) = checkpoints.finalized { if finalized.slot > old_finalized_slot { ... } } After: if let Some(finalized) = checkpoints.finalized && finalized.slot > old_finalized_slot { ... } Same behavior, cleaner code. --- crates/storage/src/store.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 5db5437..b652515 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -291,19 +291,19 @@ impl Store { batch.commit().expect("commit"); // Prune after successful checkpoint update - if let Some(finalized) = checkpoints.finalized { - if finalized.slot > old_finalized_slot { - self.prune_live_chain(finalized.slot); - - // Prune signatures and payloads for finalized slots - let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); - if pruned_sigs > 0 || pruned_payloads > 0 { - info!( - finalized_slot = finalized.slot, - pruned_sigs, pruned_payloads, "Pruned finalized signatures" - ); - } + if let Some(finalized) = checkpoints.finalized + && finalized.slot > old_finalized_slot + { + self.prune_live_chain(finalized.slot); + + // Prune signatures and payloads for finalized slots + let pruned_sigs = self.prune_gossip_signatures(finalized.slot); + let pruned_payloads = self.prune_aggregated_payloads(finalized.slot); + if pruned_sigs > 0 || pruned_payloads > 0 { + info!( + finalized_slot = finalized.slot, + pruned_sigs, pruned_payloads, "Pruned finalized signatures" + ); } } } From ae1641725aaa9dd2ac65cb91a73f91fc2944ae4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:46:07 -0300 Subject: [PATCH 10/12] docs: fix incorrect comment --- crates/storage/src/store.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index b652515..d5afff5 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -790,8 +790,7 @@ impl Store { /// 3. Write back /// /// Concurrent calls could result in lost updates. This method MUST be called - /// from a single thread. In ethlambda, the Store is owned by the BlockChain - /// actor which provides single-threaded access. + /// from a single thread. In our case, that thread is the `BlockChain` `GenServer` pub fn insert_aggregated_payload( &mut self, attestation_data: &AttestationData, From 2271e4fb5d21335ced3d37adc72d9a628be32312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 18:01:58 -0300 Subject: [PATCH 11/12] refactor: move new signature types to storage module --- crates/blockchain/src/key_manager.rs | 2 +- crates/blockchain/src/lib.rs | 3 +- crates/blockchain/src/store.rs | 2 +- crates/blockchain/state_transition/src/lib.rs | 2 +- .../blockchain/tests/signature_spectests.rs | 2 +- crates/blockchain/tests/signature_types.rs | 6 ++- crates/common/crypto/src/lib.rs | 6 ++- crates/common/types/src/attestation.rs | 4 +- crates/common/types/src/block.rs | 7 ++-- crates/common/types/src/primitives.rs | 14 ++++--- crates/common/types/src/signature.rs | 37 +------------------ crates/common/types/src/state.rs | 9 ++--- crates/net/p2p/src/gossipsub/handler.rs | 7 ++-- crates/net/p2p/src/req_resp/codec.rs | 2 +- crates/net/p2p/src/req_resp/handlers.rs | 2 +- crates/net/p2p/src/req_resp/messages.rs | 11 ++++-- crates/net/rpc/src/lib.rs | 6 +-- crates/storage/src/lib.rs | 2 + crates/storage/src/store.rs | 8 +++- crates/storage/src/types.rs | 36 ++++++++++++++++++ 20 files changed, 92 insertions(+), 76 deletions(-) create mode 100644 crates/storage/src/types.rs diff --git a/crates/blockchain/src/key_manager.rs b/crates/blockchain/src/key_manager.rs index 1f447bf..50f6bb3 100644 --- a/crates/blockchain/src/key_manager.rs +++ b/crates/blockchain/src/key_manager.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use ethlambda_types::{ attestation::{AttestationData, XmssSignature}, - primitives::{H256, TreeHash}, + primitives::{H256, ssz::TreeHash}, signature::{ValidatorSecretKey, ValidatorSignature}, }; diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index a438a46..90a98c6 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -3,12 +3,11 @@ use std::time::{Duration, SystemTime}; use ethlambda_state_transition::is_proposer; use ethlambda_storage::Store; -use ethlambda_types::primitives::H256; use ethlambda_types::{ ShortRoot, attestation::{Attestation, AttestationData, SignedAttestation}, block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, - primitives::TreeHash, + primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, state::Checkpoint, }; diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 4f0e460..f244050 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -12,7 +12,7 @@ use ethlambda_types::{ AggregatedAttestations, AggregatedSignatureProof, AggregationBits, Block, BlockBody, SignedBlockWithAttestation, }, - primitives::{H256, TreeHash}, + primitives::{H256, ssz::TreeHash}, signature::ValidatorSignature, state::{Checkpoint, State, Validator}, }; diff --git a/crates/blockchain/state_transition/src/lib.rs b/crates/blockchain/state_transition/src/lib.rs index 815b737..bb7555d 100644 --- a/crates/blockchain/state_transition/src/lib.rs +++ b/crates/blockchain/state_transition/src/lib.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use ethlambda_types::{ ShortRoot, block::{AggregatedAttestations, Block, BlockHeader}, - primitives::{H256, TreeHash}, + primitives::{H256, ssz::TreeHash}, state::{Checkpoint, JustificationValidators, State}, }; use tracing::info; diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index d76da29..40ec7aa 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -5,7 +5,7 @@ use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{Block, SignedBlockWithAttestation}, - primitives::TreeHash, + primitives::ssz::TreeHash, state::State, }; diff --git a/crates/blockchain/tests/signature_types.rs b/crates/blockchain/tests/signature_types.rs index deff258..aab9922 100644 --- a/crates/blockchain/tests/signature_types.rs +++ b/crates/blockchain/tests/signature_types.rs @@ -7,10 +7,12 @@ use ethlambda_types::block::{ AttestationSignatures, Block as EthBlock, BlockBody as EthBlockBody, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation, }; -use ethlambda_types::primitives::{BitList, Encode, H256, VariableList}; +use ethlambda_types::primitives::{ + BitList, H256, VariableList, + ssz::{Decode as SszDecode, Encode as SszEncode}, +}; use ethlambda_types::state::{Checkpoint as EthCheckpoint, State, ValidatorPubkeyBytes}; use serde::Deserialize; -use ssz_derive::{Decode as SszDecode, Encode as SszEncode}; use ssz_types::FixedVector; use ssz_types::typenum::{U28, U32}; use std::collections::HashMap; diff --git a/crates/common/crypto/src/lib.rs b/crates/common/crypto/src/lib.rs index 2451afe..ae6496f 100644 --- a/crates/common/crypto/src/lib.rs +++ b/crates/common/crypto/src/lib.rs @@ -1,9 +1,11 @@ use std::sync::Once; -use ethlambda_types::primitives::{Decode, Encode}; use ethlambda_types::{ block::ByteListMiB, - primitives::H256, + primitives::{ + H256, + ssz::{Decode, Encode}, + }, signature::{ValidatorPublicKey, ValidatorSignature}, }; use lean_multisig::{ diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 5040107..309901a 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,7 +1,5 @@ -use ssz_derive::{Decode, Encode}; -use tree_hash_derive::TreeHash; - use crate::{ + primitives::ssz::{Decode, Encode, TreeHash}, signature::SignatureSize, state::{Checkpoint, ValidatorRegistryLimit}, }; diff --git a/crates/common/types/src/block.rs b/crates/common/types/src/block.rs index fc24a63..d54e050 100644 --- a/crates/common/types/src/block.rs +++ b/crates/common/types/src/block.rs @@ -1,11 +1,12 @@ use serde::Serialize; -use ssz_derive::{Decode, Encode}; use ssz_types::typenum::U1048576; -use tree_hash_derive::TreeHash; use crate::{ attestation::{AggregatedAttestation, Attestation, XmssSignature}, - primitives::{ByteList, H256}, + primitives::{ + ByteList, H256, + ssz::{Decode, Encode, TreeHash}, + }, state::ValidatorRegistryLimit, }; diff --git a/crates/common/types/src/primitives.rs b/crates/common/types/src/primitives.rs index 719244f..fb3ded3 100644 --- a/crates/common/types/src/primitives.rs +++ b/crates/common/types/src/primitives.rs @@ -1,10 +1,12 @@ -use tree_hash::Hash256; - -// Re-export SSZ traits to avoid users having to depend on these directly -pub use ssz::{Decode, Encode}; -pub use tree_hash::TreeHash; +// Re-export SSZ traits and types to avoid users having to depend on these directly +pub mod ssz { + pub use ssz::*; + pub use ssz_derive::{Decode, Encode}; + pub use tree_hash::TreeHash; + pub use tree_hash_derive::TreeHash; +} pub use ssz_types::{BitList, BitVector, FixedVector, VariableList}; -pub type H256 = Hash256; +pub type H256 = tree_hash::Hash256; pub type ByteList = ssz_types::VariableList; diff --git a/crates/common/types/src/signature.rs b/crates/common/types/src/signature.rs index 5addf83..e63a856 100644 --- a/crates/common/types/src/signature.rs +++ b/crates/common/types/src/signature.rs @@ -2,11 +2,9 @@ use leansig::{ serialization::Serializable, signature::{SignatureScheme, SigningError}, }; -use ssz::DecodeError; -use ssz_derive::{Decode, Encode}; use ssz_types::typenum::{Diff, U488, U3600}; -use crate::{block::AggregatedSignatureProof, primitives::H256}; +use crate::primitives::{H256, ssz::DecodeError}; /// The XMSS signature scheme used for validator signatures. /// @@ -92,36 +90,3 @@ impl ValidatorSecretKey { Ok(ValidatorSignature { inner: sig }) } } - -/// Gossip signature stored with slot for pruning. -/// -/// Signatures are stored alongside the slot they pertain to, enabling -/// simple slot-based pruning when blocks become finalized. -#[derive(Debug, Clone, Encode, Decode)] -pub struct StoredSignature { - pub slot: u64, - pub signature_bytes: Vec, -} - -impl StoredSignature { - pub fn new(slot: u64, signature: ValidatorSignature) -> Self { - Self { - slot, - signature_bytes: signature.to_bytes(), - } - } - - pub fn to_validator_signature(&self) -> Result { - ValidatorSignature::from_bytes(&self.signature_bytes) - } -} - -/// Aggregated payload stored with slot for pruning. -/// -/// Aggregated signature proofs are stored with their slot to enable -/// pruning when blocks become finalized. -#[derive(Debug, Clone, Encode, Decode)] -pub struct StoredAggregatedPayload { - pub slot: u64, - pub proof: AggregatedSignatureProof, -} diff --git a/crates/common/types/src/state.rs b/crates/common/types/src/state.rs index 4b4038e..ec3e25b 100644 --- a/crates/common/types/src/state.rs +++ b/crates/common/types/src/state.rs @@ -1,14 +1,13 @@ use serde::{Deserialize, Serialize}; -use ssz::DecodeError; -use ssz_derive::{Decode, Encode}; use ssz_types::typenum::{U4096, U262144}; -use tree_hash::TreeHash; -use tree_hash_derive::TreeHash; use crate::{ block::{BlockBody, BlockHeader}, genesis::Genesis, - primitives::H256, + primitives::{ + H256, + ssz::{Decode, DecodeError, Encode, TreeHash}, + }, signature::ValidatorPublicKey, }; diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 5dd9a9b..9335270 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -1,10 +1,11 @@ use ethlambda_types::{ - ShortRoot, attestation::SignedAttestation, block::SignedBlockWithAttestation, + ShortRoot, + attestation::SignedAttestation, + block::SignedBlockWithAttestation, + primitives::ssz::{Decode, Encode, TreeHash}, }; use libp2p::gossipsub::Event; -use ssz::{Decode, Encode}; use tracing::{error, info, trace}; -use tree_hash::TreeHash; use super::{ encoding::{compress_message, decompress_message}, diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 1a151b1..32335d7 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -1,7 +1,7 @@ use std::io; +use ethlambda_types::primitives::ssz::{Decode, Encode}; use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use ssz::{Decode, Encode}; use tracing::trace; use super::{ diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 698af2c..ae6de50 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -5,7 +5,7 @@ use tokio::time::Duration; use tracing::{debug, error, info, warn}; use ethlambda_types::block::SignedBlockWithAttestation; -use ethlambda_types::primitives::TreeHash; +use ethlambda_types::primitives::ssz::TreeHash; use super::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponseCode, diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index cc8ee1e..81ef9d6 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -1,6 +1,11 @@ -use ethlambda_types::{block::SignedBlockWithAttestation, primitives::H256, state::Checkpoint}; -use ssz::Decode as SszDecode; -use ssz_derive::{Decode, Encode}; +use ethlambda_types::{ + block::SignedBlockWithAttestation, + primitives::{ + H256, + ssz::{Decode, Decode as SszDecode, Encode}, + }, + state::Checkpoint, +}; use ssz_types::typenum; pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 9546211..fb5100c 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use axum::{Json, Router, http::HeaderValue, http::header, response::IntoResponse, routing::get}; use ethlambda_storage::Store; -use ethlambda_types::primitives::Encode; +use ethlambda_types::primitives::ssz::Encode; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; @@ -77,7 +77,7 @@ mod tests { use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{BlockBody, BlockHeader}, - primitives::{H256, TreeHash}, + primitives::{H256, ssz::TreeHash}, state::{ChainConfig, Checkpoint, JustificationValidators, JustifiedSlots, State}, }; use http_body_util::BodyExt; @@ -150,7 +150,7 @@ mod tests { #[tokio::test] async fn test_get_latest_finalized_state() { - use ethlambda_types::primitives::Encode; + use ethlambda_types::primitives::ssz::Encode; let state = create_test_state(); let backend = Arc::new(InMemoryBackend::new()); diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 1829fbd..89e2fa3 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,6 +1,8 @@ mod api; pub mod backend; mod store; +mod types; pub use api::StorageBackend; pub use store::{ForkCheckpoints, SignatureKey, Store}; +pub use types::{StoredAggregatedPayload, StoredSignature}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d5afff5..ed55223 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::api::{StorageBackend, Table}; +use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, @@ -9,8 +10,11 @@ use ethlambda_types::{ AggregatedSignatureProof, Block, BlockBody, BlockSignaturesWithAttestation, BlockWithAttestation, SignedBlockWithAttestation, }, - primitives::{Decode, Encode, H256, TreeHash}, - signature::{StoredAggregatedPayload, StoredSignature, ValidatorSignature}, + primitives::{ + H256, + ssz::{Decode, Encode, TreeHash}, + }, + signature::ValidatorSignature, state::{ChainConfig, Checkpoint, State}, }; use tracing::info; diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs new file mode 100644 index 0000000..b23207f --- /dev/null +++ b/crates/storage/src/types.rs @@ -0,0 +1,36 @@ +use ethlambda_types::{ + block::AggregatedSignatureProof, primitives::ssz, signature::ValidatorSignature, +}; + +/// Gossip signature stored with slot for pruning. +/// +/// Signatures are stored alongside the slot they pertain to, enabling +/// simple slot-based pruning when blocks become finalized. +#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] +pub struct StoredSignature { + pub slot: u64, + pub signature_bytes: Vec, +} + +impl StoredSignature { + pub fn new(slot: u64, signature: ValidatorSignature) -> Self { + Self { + slot, + signature_bytes: signature.to_bytes(), + } + } + + pub fn to_validator_signature(&self) -> Result { + ValidatorSignature::from_bytes(&self.signature_bytes) + } +} + +/// Aggregated payload stored with slot for pruning. +/// +/// Aggregated signature proofs are stored with their slot to enable +/// pruning when blocks become finalized. +#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] +pub struct StoredAggregatedPayload { + pub slot: u64, + pub proof: AggregatedSignatureProof, +} From 1dc660ef58c9d02226bc3094f5b325e817e9c62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 20:57:39 -0300 Subject: [PATCH 12/12] fix: add missing import in tests --- crates/blockchain/tests/forkchoice_spectests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 7545d4e..b35385c 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -9,7 +9,7 @@ use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ attestation::Attestation, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, - primitives::{H256, TreeHash, VariableList}, + primitives::{H256, VariableList, ssz::TreeHash}, state::State, };