From e88db6f9528d920e9264f64657b59f938ba2a82e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 10:35:12 -0300 Subject: [PATCH 01/13] feat: support multiple blocks in a request --- crates/net/p2p/src/req_resp/codec.rs | 66 ++++++++++++++++++------ crates/net/p2p/src/req_resp/handlers.rs | 67 +++++++++---------------- crates/net/p2p/src/req_resp/messages.rs | 5 +- crates/net/p2p/src/req_resp/mod.rs | 2 +- 4 files changed, 80 insertions(+), 60 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 1a151b1..4978fbb 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -92,11 +92,40 @@ impl libp2p::request_response::Codec for Codec { Ok(Response::success(ResponsePayload::Status(status))) } BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let block = - SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - Ok(Response::success(ResponsePayload::BlocksByRoot(block))) + let mut blocks = Vec::new(); + + // First chunk already read (result_byte, payload from above) + if code == ResponseCode::SUCCESS { + let block = + SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + blocks.push(block); + } + + // Read remaining chunks until EOF + loop { + let mut result_byte = 0_u8; + match io.read_exact(std::slice::from_mut(&mut result_byte)).await { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e), + } + + let chunk_code = ResponseCode::from(result_byte); + let chunk_payload = decode_payload(io).await?; + + if chunk_code == ResponseCode::SUCCESS { + let block = SignedBlockWithAttestation::from_ssz_bytes(&chunk_payload) + .map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + blocks.push(block); + } + // Non-success codes (RESOURCE_UNAVAILABLE) are skipped - block not available + } + + Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) } _ => Err(io::Error::new( io::ErrorKind::InvalidData, @@ -135,15 +164,24 @@ impl libp2p::request_response::Codec for Codec { { match resp { Response::Success { payload } => { - // Send success code (0) - io.write_all(&[ResponseCode::SUCCESS.into()]).await?; - - let encoded = match &payload { - ResponsePayload::Status(status) => status.as_ssz_bytes(), - ResponsePayload::BlocksByRoot(block) => block.as_ssz_bytes(), - }; - - write_payload(io, &encoded).await + match &payload { + ResponsePayload::Status(status) => { + // Send success code (0) + io.write_all(&[ResponseCode::SUCCESS.into()]).await?; + let encoded = status.as_ssz_bytes(); + write_payload(io, &encoded).await + } + ResponsePayload::BlocksByRoot(blocks) => { + // Write each block as separate chunk + for block in blocks { + io.write_all(&[ResponseCode::SUCCESS.into()]).await?; + let encoded = block.as_ssz_bytes(); + write_payload(io, &encoded).await?; + } + // Empty response if no blocks found (stream just ends) + Ok(()) + } + } } Response::Error { code, message } => { // Send error code diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 698af2c..392ce3f 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -8,8 +8,7 @@ use ethlambda_types::block::SignedBlockWithAttestation; use ethlambda_types::primitives::TreeHash; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponseCode, - ResponsePayload, Status, error_message, + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status, }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, @@ -108,58 +107,42 @@ async fn handle_blocks_by_root_request( let num_roots = request.roots.len(); info!(%peer, num_roots, "Received BlocksByRoot request"); - // TODO: Support multiple blocks per request (currently only handles first root) - // The protocol supports up to 1024 roots, but our response type only holds one block. - let Some(root) = request.roots.first() else { - debug!(%peer, "BlocksByRoot request with no roots"); - return; - }; - - match server.store.get_signed_block(root) { - Some(signed_block) => { - let slot = signed_block.message.block.slot; - info!(%peer, %root, %slot, "Responding to BlocksByRoot request"); - - if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( - channel, - Response::success(ResponsePayload::BlocksByRoot(signed_block)), - ) { - warn!(%peer, %root, ?err, "Failed to send BlocksByRoot response"); - } + let mut blocks = Vec::new(); + for root in request.roots.iter() { + if let Some(signed_block) = server.store.get_signed_block(root) { + blocks.push(signed_block); } - None => { - debug!(%peer, %root, "Block not found for BlocksByRoot request"); + // Missing blocks are silently skipped (per spec) + } - if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( - channel, - Response::error( - ResponseCode::RESOURCE_UNAVAILABLE, - error_message("Block not found"), - ), - ) { - warn!(%peer, %root, ?err, "Failed to send RESOURCE_UNAVAILABLE response"); - } - } + let found = blocks.len(); + info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); + + if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( + channel, + Response::success(ResponsePayload::BlocksByRoot(blocks)), + ) { + warn!(%peer, ?err, "Failed to send BlocksByRoot response"); } } async fn handle_blocks_by_root_response( server: &mut P2PServer, - block: SignedBlockWithAttestation, + blocks: Vec, peer: PeerId, ) { - let slot = block.message.block.slot; - let root = block.message.block.tree_hash_root(); + info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - info!(%peer, %slot, %root, "Received BlocksByRoot response"); + for block in blocks { + let root = block.message.block.tree_hash_root(); - // Clean up tracking (success!) - if server.pending_requests.remove(&root).is_some() { - info!(%root, "Block fetch succeeded"); - server.request_id_map.retain(|_, r| *r != root); - } + // Clean up tracking for this root + if server.pending_requests.remove(&root).is_some() { + server.request_id_map.retain(|_, r| *r != root); + } - server.blockchain.notify_new_block(block).await; + server.blockchain.notify_new_block(block).await; + } } /// Build a Status message from the current Store state. diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index cc8ee1e..66c5794 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -89,9 +89,7 @@ impl std::fmt::Debug for ResponseCode { #[allow(clippy::large_enum_variant)] pub enum ResponsePayload { Status(Status), - - // TODO: here we assume there's a single block per request - BlocksByRoot(SignedBlockWithAttestation), + BlocksByRoot(Vec), } #[derive(Debug, Clone, Encode, Decode)] @@ -112,6 +110,7 @@ pub type ErrorMessage = ssz_types::VariableList; /// Helper to create an ErrorMessage from a string. /// Debug builds panic if message exceeds 256 bytes (programming error). /// Release builds truncate to 256 bytes. +#[allow(dead_code)] pub fn error_message(msg: impl AsRef) -> ErrorMessage { let bytes = msg.as_ref().as_bytes(); debug_assert!( diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index fedd572..6b574da 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -8,5 +8,5 @@ pub use encoding::MAX_COMPRESSED_PAYLOAD_SIZE; pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; pub use messages::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response, - ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, error_message, + ResponsePayload, STATUS_PROTOCOL_V1, Status, }; From b330ff891c426e718141977a661d48aea164a25d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 11:24:43 -0300 Subject: [PATCH 02/13] fix: clean up vars in script, use correct path --- .claude/skills/test-pr-devnet/scripts/check-status.sh | 1 + .claude/skills/test-pr-devnet/scripts/test-branch.sh | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.claude/skills/test-pr-devnet/scripts/check-status.sh b/.claude/skills/test-pr-devnet/scripts/check-status.sh index f04cc6b..3d82a69 100755 --- a/.claude/skills/test-pr-devnet/scripts/check-status.sh +++ b/.claude/skills/test-pr-devnet/scripts/check-status.sh @@ -48,6 +48,7 @@ echo "Error counts:" for node in zeam_0 ream_0 qlean_0 ethlambda_0; do if docker ps --format "{{.Names}}" | grep -q "^$node$"; then COUNT=$(docker logs "$node" 2>&1 | grep -c "ERROR" || echo "0") + COUNT=$(echo "$COUNT" | tr -d '\n' | xargs) if [[ "$COUNT" -eq 0 ]]; then echo -e " $node: ${GREEN}$COUNT${NC}" else diff --git a/.claude/skills/test-pr-devnet/scripts/test-branch.sh b/.claude/skills/test-pr-devnet/scripts/test-branch.sh index d768dcf..b151b22 100755 --- a/.claude/skills/test-pr-devnet/scripts/test-branch.sh +++ b/.claude/skills/test-pr-devnet/scripts/test-branch.sh @@ -5,7 +5,7 @@ set -euo pipefail # Usage: ./test-branch.sh [branch-name] [--with-sync-test] SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" +ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)" LEAN_QUICKSTART="${LEAN_QUICKSTART:-/Users/mega/lean_consensus/lean-quickstart}" # Colors @@ -210,6 +210,7 @@ echo "Blocks published: $BLOCKS_PUBLISHED" # Count errors ERROR_COUNT=$(docker logs ethlambda_0 2>&1 | grep -c "ERROR" || echo "0") +ERROR_COUNT=$(echo "$ERROR_COUNT" | tr -d '\n' | xargs) if [[ "$ERROR_COUNT" -eq 0 ]]; then echo -e "Errors: ${GREEN}$ERROR_COUNT${NC}" else From ccac52b4fb63af02340baae64b80a73a5fe7c1cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 2 Feb 2026 14:36:25 -0300 Subject: [PATCH 03/13] fix: treat EOF as empty response --- crates/net/p2p/src/req_resp/codec.rs | 35 ++++++++++++++++++---------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 4978fbb..0aad60b 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -63,8 +63,22 @@ impl libp2p::request_response::Codec for Codec { T: AsyncRead + Unpin + Send, { let mut result_byte = 0_u8; - io.read_exact(std::slice::from_mut(&mut result_byte)) - .await?; + + // Try to read first chunk's result code + // For BlocksByRoot, EOF here means empty response (all blocks unavailable) + match io.read_exact(std::slice::from_mut(&mut result_byte)).await { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + // EOF before any chunks = valid empty response for BlocksByRoot + // Status protocol requires at least one response, so EOF is an error + if protocol.as_ref() == BLOCKS_BY_ROOT_PROTOCOL_V1 { + return Ok(Response::success(ResponsePayload::BlocksByRoot(Vec::new()))); + } else { + return Err(e); + } + } + Err(e) => return Err(e), + } let code = ResponseCode::from(result_byte); @@ -92,16 +106,13 @@ impl libp2p::request_response::Codec for Codec { Ok(Response::success(ResponsePayload::Status(status))) } BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let mut blocks = Vec::new(); - - // First chunk already read (result_byte, payload from above) - if code == ResponseCode::SUCCESS { - let block = - SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - blocks.push(block); - } + // First chunk (guaranteed SUCCESS if we reach here) + let first_block = + SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + + let mut blocks = vec![first_block]; // Read remaining chunks until EOF loop { From 0502a30a4bd605939900476bd8cfa727a941cc0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Feb 2026 18:08:16 -0300 Subject: [PATCH 04/13] chore: log non-zero code response chunks --- crates/net/p2p/src/req_resp/codec.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 0aad60b..d4c000b 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -2,7 +2,7 @@ use std::io; use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use ssz::{Decode, Encode}; -use tracing::trace; +use tracing::{debug, trace}; use super::{ encoding::{decode_payload, write_payload}, @@ -126,14 +126,20 @@ impl libp2p::request_response::Codec for Codec { let chunk_code = ResponseCode::from(result_byte); let chunk_payload = decode_payload(io).await?; - if chunk_code == ResponseCode::SUCCESS { - let block = SignedBlockWithAttestation::from_ssz_bytes(&chunk_payload) - .map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - blocks.push(block); + if chunk_code != ResponseCode::SUCCESS { + // Non-success codes (RESOURCE_UNAVAILABLE) are skipped - block not available + let error_message = ErrorMessage::from_ssz_bytes(&chunk_payload) + .map(|msg| String::from_utf8_lossy(&msg).to_string()) + .unwrap_or_else(|_| "".to_string()); + debug!(?chunk_code, %error_message, "Skipping block chunk with non-success code"); + continue; } - // Non-success codes (RESOURCE_UNAVAILABLE) are skipped - block not available + + let block = SignedBlockWithAttestation::from_ssz_bytes(&chunk_payload) + .map_err(|err| { + io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) + })?; + blocks.push(block); } Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) From 559e37e9fb27acd43ecc03240881f50b7dd5ce62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Feb 2026 18:16:51 -0300 Subject: [PATCH 05/13] fix: check received block root on response --- crates/net/p2p/src/req_resp/handlers.rs | 31 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 392ce3f..3e67f2b 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -31,13 +31,16 @@ pub async fn handle_req_resp_message( handle_blocks_by_root_request(server, request, channel, peer).await; } }, - request_response::Message::Response { response, .. } => match response { + request_response::Message::Response { + request_id, + response, + } => match response { Response::Success { payload } => match payload { ResponsePayload::Status(status) => { handle_status_response(status, peer).await; } - ResponsePayload::BlocksByRoot(block) => { - handle_blocks_by_root_response(server, block, peer).await; + ResponsePayload::BlocksByRoot(blocks) => { + handle_blocks_by_root_response(server, blocks, peer, request_id).await; } }, Response::Error { code, message } => { @@ -130,17 +133,33 @@ async fn handle_blocks_by_root_response( server: &mut P2PServer, blocks: Vec, peer: PeerId, + request_id: request_response::OutboundRequestId, ) { info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); + // Look up which root was requested for this specific request + let Some(requested_root) = server.request_id_map.remove(&request_id) else { + warn!(%peer, ?request_id, "Received response for unknown request_id"); + return; + }; + for block in blocks { let root = block.message.block.tree_hash_root(); - // Clean up tracking for this root - if server.pending_requests.remove(&root).is_some() { - server.request_id_map.retain(|_, r| *r != root); + // Validate that this block matches what we requested + if root != requested_root { + warn!( + %peer, + received_root = %ethlambda_types::ShortRoot(&root.0), + expected_root = %ethlambda_types::ShortRoot(&requested_root.0), + "Received block with mismatched root, ignoring" + ); + continue; } + // Clean up tracking for this root + server.pending_requests.remove(&root); + server.blockchain.notify_new_block(block).await; } } From b043ae8eb1109e90762551c53ce8127865dc34a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Feb 2026 18:40:49 -0300 Subject: [PATCH 06/13] refactor: split decoding into two functions --- crates/net/p2p/src/req_resp/codec.rs | 150 ++++++++++++--------------- 1 file changed, 69 insertions(+), 81 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index d4c000b..8fa02f1 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -62,88 +62,9 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - let mut result_byte = 0_u8; - - // Try to read first chunk's result code - // For BlocksByRoot, EOF here means empty response (all blocks unavailable) - match io.read_exact(std::slice::from_mut(&mut result_byte)).await { - Ok(()) => {} - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { - // EOF before any chunks = valid empty response for BlocksByRoot - // Status protocol requires at least one response, so EOF is an error - if protocol.as_ref() == BLOCKS_BY_ROOT_PROTOCOL_V1 { - return Ok(Response::success(ResponsePayload::BlocksByRoot(Vec::new()))); - } else { - return Err(e); - } - } - Err(e) => return Err(e), - } - - let code = ResponseCode::from(result_byte); - - let payload = decode_payload(io).await?; - - // For non-success responses, the payload contains an SSZ-encoded error message - if code != ResponseCode::SUCCESS { - let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid error message: {err:?}"), - ) - })?; - let error_str = String::from_utf8_lossy(&message).to_string(); - trace!(?code, %error_str, "Received error response"); - return Ok(Response::error(code, message)); - } - - // Success responses contain the actual data match protocol.as_ref() { - STATUS_PROTOCOL_V1 => { - let status = Status::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - Ok(Response::success(ResponsePayload::Status(status))) - } - BLOCKS_BY_ROOT_PROTOCOL_V1 => { - // First chunk (guaranteed SUCCESS if we reach here) - let first_block = - SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - - let mut blocks = vec![first_block]; - - // Read remaining chunks until EOF - loop { - let mut result_byte = 0_u8; - match io.read_exact(std::slice::from_mut(&mut result_byte)).await { - Ok(()) => {} - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, - Err(e) => return Err(e), - } - - let chunk_code = ResponseCode::from(result_byte); - let chunk_payload = decode_payload(io).await?; - - if chunk_code != ResponseCode::SUCCESS { - // Non-success codes (RESOURCE_UNAVAILABLE) are skipped - block not available - let error_message = ErrorMessage::from_ssz_bytes(&chunk_payload) - .map(|msg| String::from_utf8_lossy(&msg).to_string()) - .unwrap_or_else(|_| "".to_string()); - debug!(?chunk_code, %error_message, "Skipping block chunk with non-success code"); - continue; - } - - let block = SignedBlockWithAttestation::from_ssz_bytes(&chunk_payload) - .map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - blocks.push(block); - } - - Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) - } + STATUS_PROTOCOL_V1 => decode_status_response(io).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -212,3 +133,70 @@ impl libp2p::request_response::Codec for Codec { } } } + +/// Decode Status response (single-chunk protocol). +/// Errors are fatal and returned as error responses. +async fn decode_status_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + let mut result_byte = 0_u8; + io.read_exact(std::slice::from_mut(&mut result_byte)) + .await?; + + let code = ResponseCode::from(result_byte); + let payload = decode_payload(io).await?; + + if code != ResponseCode::SUCCESS { + let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid error message: {err:?}"), + ) + })?; + let error_str = String::from_utf8_lossy(&message).to_string(); + trace!(?code, %error_str, "Received error response"); + return Ok(Response::error(code, message)); + } + + let status = Status::from_ssz_bytes(&payload) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?; + Ok(Response::success(ResponsePayload::Status(status))) +} + +/// Decode BlocksByRoot response (multi-chunk protocol). +/// Error chunks are non-fatal and skipped. EOF at any point (even before first chunk) +/// returns successfully with all blocks collected so far. +async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + let mut blocks = Vec::new(); + + loop { + // Read chunk result code + let mut result_byte = 0_u8; + match io.read_exact(std::slice::from_mut(&mut result_byte)).await { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e), + } + + let code = ResponseCode::from(result_byte); + let payload = decode_payload(io).await?; + + if code != ResponseCode::SUCCESS { + let error_message = ErrorMessage::from_ssz_bytes(&payload) + .map(|msg| String::from_utf8_lossy(&msg).to_string()) + .unwrap_or_else(|_| "".to_string()); + debug!(?code, %error_message, "Skipping block chunk with non-success code"); + continue; + } + + let block = SignedBlockWithAttestation::from_ssz_bytes(&payload) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?; + blocks.push(block); + } + + Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) +} From e0f7af356e58f43b44c9fa10a36b41df7330cc81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Feb 2026 18:42:57 -0300 Subject: [PATCH 07/13] chore: change allow to expect --- crates/net/p2p/src/req_resp/messages.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 889d6cf..3826f5b 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -115,7 +115,8 @@ pub type ErrorMessage = ssz_types::VariableList; /// Helper to create an ErrorMessage from a string. /// Debug builds panic if message exceeds 256 bytes (programming error). /// Release builds truncate to 256 bytes. -#[allow(dead_code)] +#[expect(dead_code)] +// TODO: map errors to req/resp error messages pub fn error_message(msg: impl AsRef) -> ErrorMessage { let bytes = msg.as_ref().as_bytes(); debug_assert!( From 99bb20308bd17d825baf799630290e4692744b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:04:13 -0300 Subject: [PATCH 08/13] docs: improve function docs --- crates/net/p2p/src/req_resp/codec.rs | 49 +++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 9f997d1..4d2a3af 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -134,8 +134,27 @@ impl libp2p::request_response::Codec for Codec { } } -/// Decode Status response (single-chunk protocol). -/// Errors are fatal and returned as error responses. +/// Decodes a Status protocol response from a single-chunk response stream. +/// +/// Reads the response code byte and payload, returning either a success response +/// with the peer's Status or an error response with the error code and message. +/// Unlike multi-chunk protocols, any error code from the peer is treated as a +/// valid response rather than a connection failure. +/// +/// # Returns +/// +/// Returns `Ok(Response::Success)` containing the peer's `Status` if the response +/// code is `SUCCESS`. +/// +/// Returns `Ok(Response::Error)` containing the error code and message if the peer +/// returned a non-success response code. +/// +/// # Errors +/// +/// Returns `Err` if: +/// - I/O error occurs while reading the response code or payload +/// - Peer's error message cannot be SSZ-decoded (InvalidData) +/// - Peer's Status payload cannot be SSZ-decoded (InvalidData) async fn decode_status_response(io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, @@ -164,9 +183,29 @@ where Ok(Response::success(ResponsePayload::Status(status))) } -/// Decode BlocksByRoot response (multi-chunk protocol). -/// Error chunks are non-fatal and skipped. EOF at any point (even before first chunk) -/// returns successfully with all blocks collected so far. +/// Decodes a BlocksByRoot protocol response from a multi-chunk response stream. +/// +/// Reads chunks until EOF, collecting successfully decoded blocks. Each chunk has +/// its own response code - chunks with error codes are logged and skipped rather +/// than terminating the stream. This allows partial success when some requested +/// blocks are unavailable. The stream ends naturally at EOF (peer closes after +/// sending all available blocks). +/// +/// # Returns +/// +/// Always returns `Ok(Response::Success)` containing a vector of successfully +/// decoded blocks. The vector may be empty if no SUCCESS chunks were received +/// before EOF (either no chunks sent, or all chunks had non-SUCCESS codes) +/// +/// # Errors +/// +/// Returns `Err` if: +/// - I/O error occurs while reading response codes or payloads (except `UnexpectedEof` +/// which signals normal stream termination) +/// - Block payload cannot be SSZ-decoded into `SignedBlockWithAttestation` (InvalidData) +/// +/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this +/// function to return `Err` - they are logged and skipped. async fn decode_blocks_by_root_response(io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, From 81e87eeea1f6a4ecb08ad94944f40c9652ece248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:31:35 -0300 Subject: [PATCH 09/13] refactor: use inspect_err --- crates/blockchain/src/lib.rs | 14 ++++++-------- crates/blockchain/src/store.rs | 24 ++++++++++++------------ crates/net/p2p/src/lib.rs | 4 ++-- crates/net/p2p/src/req_resp/handlers.rs | 13 +++++++------ 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 90a98c6..15e7c4c 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -323,11 +323,10 @@ impl BlockChainServer { fn request_missing_block(&mut self, block_root: H256) { // Send request to P2P layer (deduplication handled by P2P module) - if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) { - error!(%block_root, %err, "Failed to send FetchBlock message to P2P"); - } else { - info!(%block_root, "Requested missing block from network"); - } + self.p2p_tx + .send(P2PMessage::FetchBlock(block_root)) + .inspect(|_| info!(%block_root, "Requested missing block from network")) + .inspect_err(|err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P")); } fn process_pending_children(&mut self, parent_root: H256) { @@ -347,9 +346,8 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - if let Err(err) = store::on_gossip_attestation(&mut self.store, attestation) { - warn!(%err, "Failed to process gossiped attestation"); - } + store::on_gossip_attestation(&mut self.store, attestation) + .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index f244050..8f377c9 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -392,12 +392,12 @@ pub fn on_block( data: att.data.clone(), }; // TODO: validate attestations before processing - if let Err(err) = on_attestation(store, attestation, true) { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - } else { - metrics::inc_attestations_valid("block"); - } + on_attestation(store, attestation, true) + .inspect(|_| metrics::inc_attestations_valid("block")) + .inspect_err(|err| { + warn!(%slot, %validator_id, %err, "Invalid attestation in block"); + metrics::inc_attestations_invalid("block"); + }); } } @@ -424,12 +424,12 @@ pub fn on_block( // Process proposer attestation (enters "new" stage, not "known") // TODO: validate attestations before processing - if let Err(err) = on_attestation(store, proposer_attestation, false) { - metrics::inc_attestations_invalid("block"); - warn!(%slot, %err, "Invalid proposer attestation in block"); - } else { - metrics::inc_attestations_valid("gossip"); - } + on_attestation(store, proposer_attestation, false) + .inspect(|_| metrics::inc_attestations_valid("gossip")) + .inspect_err(|err| { + warn!(%slot, %err, "Invalid proposer attestation in block"); + metrics::inc_attestations_invalid("block"); + }); info!(%slot, %block_root, %state_root, "Processed new block"); Ok(()) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 20d64a3..e6cf174 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -411,11 +411,11 @@ async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { info!(%peer_id, "Redialing disconnected bootnode"); // NOTE: this dial does some checks and adds a pending outbound connection attempt. // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. - if let Err(e) = server.swarm.dial(addr.clone()) { + server.swarm.dial(addr.clone()).inspect_err(|e| { warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); // Schedule another redial attempt schedule_peer_redial(server.retry_tx.clone(), peer_id); - } + }); } } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 9402292..a06e074 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -121,12 +121,13 @@ async fn handle_blocks_by_root_request( let found = blocks.len(); info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); - if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( - channel, - Response::success(ResponsePayload::BlocksByRoot(blocks)), - ) { - warn!(%peer, ?err, "Failed to send BlocksByRoot response"); - } + let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); + server + .swarm + .behaviour_mut() + .req_resp + .send_response(channel, response) + .inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response")); } async fn handle_blocks_by_root_response( From eefc507d56ac7057a86347394aabfaf69bbcddae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:40:01 -0300 Subject: [PATCH 10/13] refactor: use if let Err instead of match --- crates/net/p2p/src/req_resp/codec.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 4d2a3af..a1b0021 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -215,10 +215,11 @@ where loop { // Read chunk result code let mut result_byte = 0_u8; - match io.read_exact(std::slice::from_mut(&mut result_byte)).await { - Ok(()) => {} - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, - Err(e) => return Err(e), + if let Err(e) = io.read_exact(std::slice::from_mut(&mut result_byte)).await { + if e.kind() == io::ErrorKind::UnexpectedEof { + break; + } + return Err(e); } let code = ResponseCode::from(result_byte); From 030d9e4448fea8a64d3ea0ca63ea646fb71f4804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:40:10 -0300 Subject: [PATCH 11/13] docs: update CLAUDE.md with inspect usage preference --- CLAUDE.md | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 20951cb..98285e1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -112,6 +112,51 @@ let entries = vec![(key, value)]; batch.put_batch(Table::X, entries).expect("msg"); ``` +### Error Handling Patterns + +**Use `inspect` and `inspect_err` for side-effect-only error handling:** +```rust +// ✅ GOOD: Use inspect_err when only logging or performing side effects on error +result + .inspect_err(|err| warn!(%err, "Operation failed")); + +// Extract complex expressions to variables for cleaner formatting +let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); +server.swarm.behaviour_mut().req_resp.send_response(channel, response) + .inspect_err(|err| warn!(%peer, ?err, "Failed to send response")); + +// ✅ GOOD: Use inspect + inspect_err when both branches need side effects +operation() + .inspect(|_| metrics::inc_success()) + .inspect_err(|_| metrics::inc_failed()); + +// ❌ AVOID: Using if let Err when only performing side effects +if let Err(err) = result { + warn!(%err, "Operation failed"); +} + +// ❌ AVOID: Using if/else for both success and error side effects +if let Err(err) = operation() { + metrics::inc_failed(); +} else { + metrics::inc_success(); +} +``` + +**When NOT to use `inspect_err`:** +```rust +// Use if let Err or match when: +// 1. Early return needed +if let Err(err) = operation() { + error!(%err, "Fatal error"); + return false; +} + +// 2. Error needs transformation (use map_err + ?) +let result = operation() + .map_err(|err| CustomError::from(err))?; +``` + ### Metrics (RAII Pattern) ```rust // Timing guard automatically observes duration on drop From 0e310ad278637aba44455d074a78d6ecd80e1c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:52:49 -0300 Subject: [PATCH 12/13] fix: ignore inspected errors --- crates/blockchain/src/lib.rs | 9 ++++++--- crates/blockchain/src/store.rs | 4 ++-- crates/net/p2p/src/lib.rs | 2 +- crates/net/p2p/src/req_resp/handlers.rs | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 15e7c4c..8f58b39 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -323,10 +323,13 @@ impl BlockChainServer { fn request_missing_block(&mut self, block_root: H256) { // Send request to P2P layer (deduplication handled by P2P module) - self.p2p_tx + let _ = self + .p2p_tx .send(P2PMessage::FetchBlock(block_root)) .inspect(|_| info!(%block_root, "Requested missing block from network")) - .inspect_err(|err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P")); + .inspect_err( + |err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P"), + ); } fn process_pending_children(&mut self, parent_root: H256) { @@ -346,7 +349,7 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - store::on_gossip_attestation(&mut self.store, attestation) + let _ = store::on_gossip_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 8f377c9..c41af16 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -392,7 +392,7 @@ pub fn on_block( data: att.data.clone(), }; // TODO: validate attestations before processing - on_attestation(store, attestation, true) + let _ = on_attestation(store, attestation, true) .inspect(|_| metrics::inc_attestations_valid("block")) .inspect_err(|err| { warn!(%slot, %validator_id, %err, "Invalid attestation in block"); @@ -424,7 +424,7 @@ pub fn on_block( // Process proposer attestation (enters "new" stage, not "known") // TODO: validate attestations before processing - on_attestation(store, proposer_attestation, false) + let _ = on_attestation(store, proposer_attestation, false) .inspect(|_| metrics::inc_attestations_valid("gossip")) .inspect_err(|err| { warn!(%slot, %err, "Invalid proposer attestation in block"); diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index e6cf174..fe3cdde 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -411,7 +411,7 @@ async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { info!(%peer_id, "Redialing disconnected bootnode"); // NOTE: this dial does some checks and adds a pending outbound connection attempt. // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. - server.swarm.dial(addr.clone()).inspect_err(|e| { + let _ = server.swarm.dial(addr.clone()).inspect_err(|e| { warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); // Schedule another redial attempt schedule_peer_redial(server.retry_tx.clone(), peer_id); diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index a06e074..962a9fe 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -122,7 +122,7 @@ async fn handle_blocks_by_root_request( info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); - server + let _ = server .swarm .behaviour_mut() .req_resp From 56dc837f8b12c8fbd41fcd9d82b803a05b5b611d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:54:39 -0300 Subject: [PATCH 13/13] ci: rerun