diff --git a/.claude/skills/test-pr-devnet/scripts/check-status.sh b/.claude/skills/test-pr-devnet/scripts/check-status.sh index f04cc6b..3d82a69 100755 --- a/.claude/skills/test-pr-devnet/scripts/check-status.sh +++ b/.claude/skills/test-pr-devnet/scripts/check-status.sh @@ -48,6 +48,7 @@ echo "Error counts:" for node in zeam_0 ream_0 qlean_0 ethlambda_0; do if docker ps --format "{{.Names}}" | grep -q "^$node$"; then COUNT=$(docker logs "$node" 2>&1 | grep -c "ERROR" || echo "0") + COUNT=$(echo "$COUNT" | tr -d '\n' | xargs) if [[ "$COUNT" -eq 0 ]]; then echo -e " $node: ${GREEN}$COUNT${NC}" else diff --git a/.claude/skills/test-pr-devnet/scripts/test-branch.sh b/.claude/skills/test-pr-devnet/scripts/test-branch.sh index d768dcf..b151b22 100755 --- a/.claude/skills/test-pr-devnet/scripts/test-branch.sh +++ b/.claude/skills/test-pr-devnet/scripts/test-branch.sh @@ -5,7 +5,7 @@ set -euo pipefail # Usage: ./test-branch.sh [branch-name] [--with-sync-test] SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" +ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)" LEAN_QUICKSTART="${LEAN_QUICKSTART:-/Users/mega/lean_consensus/lean-quickstart}" # Colors @@ -210,6 +210,7 @@ echo "Blocks published: $BLOCKS_PUBLISHED" # Count errors ERROR_COUNT=$(docker logs ethlambda_0 2>&1 | grep -c "ERROR" || echo "0") +ERROR_COUNT=$(echo "$ERROR_COUNT" | tr -d '\n' | xargs) if [[ "$ERROR_COUNT" -eq 0 ]]; then echo -e "Errors: ${GREEN}$ERROR_COUNT${NC}" else diff --git a/CLAUDE.md b/CLAUDE.md index 20951cb..98285e1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -112,6 +112,51 @@ let entries = vec![(key, value)]; batch.put_batch(Table::X, entries).expect("msg"); ``` +### Error Handling Patterns + +**Use `inspect` and `inspect_err` for side-effect-only error handling:** +```rust +// ✅ GOOD: Use inspect_err when only logging or performing side effects on error +result + .inspect_err(|err| warn!(%err, "Operation failed")); + +// Extract complex expressions to variables for cleaner formatting +let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); +server.swarm.behaviour_mut().req_resp.send_response(channel, response) + .inspect_err(|err| warn!(%peer, ?err, "Failed to send response")); + +// ✅ GOOD: Use inspect + inspect_err when both branches need side effects +operation() + .inspect(|_| metrics::inc_success()) + .inspect_err(|_| metrics::inc_failed()); + +// ❌ AVOID: Using if let Err when only performing side effects +if let Err(err) = result { + warn!(%err, "Operation failed"); +} + +// ❌ AVOID: Using if/else for both success and error side effects +if let Err(err) = operation() { + metrics::inc_failed(); +} else { + metrics::inc_success(); +} +``` + +**When NOT to use `inspect_err`:** +```rust +// Use if let Err or match when: +// 1. Early return needed +if let Err(err) = operation() { + error!(%err, "Fatal error"); + return false; +} + +// 2. Error needs transformation (use map_err + ?) +let result = operation() + .map_err(|err| CustomError::from(err))?; +``` + ### Metrics (RAII Pattern) ```rust // Timing guard automatically observes duration on drop diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 90a98c6..8f58b39 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -323,11 +323,13 @@ impl BlockChainServer { fn request_missing_block(&mut self, block_root: H256) { // Send request to P2P layer (deduplication handled by P2P module) - if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) { - error!(%block_root, %err, "Failed to send FetchBlock message to P2P"); - } else { - info!(%block_root, "Requested missing block from network"); - } + let _ = self + .p2p_tx + .send(P2PMessage::FetchBlock(block_root)) + .inspect(|_| info!(%block_root, "Requested missing block from network")) + .inspect_err( + |err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P"), + ); } fn process_pending_children(&mut self, parent_root: H256) { @@ -347,9 +349,8 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { - if let Err(err) = store::on_gossip_attestation(&mut self.store, attestation) { - warn!(%err, "Failed to process gossiped attestation"); - } + let _ = store::on_gossip_attestation(&mut self.store, attestation) + .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index f244050..c41af16 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -392,12 +392,12 @@ pub fn on_block( data: att.data.clone(), }; // TODO: validate attestations before processing - if let Err(err) = on_attestation(store, attestation, true) { - warn!(%slot, %validator_id, %err, "Invalid attestation in block"); - metrics::inc_attestations_invalid("block"); - } else { - metrics::inc_attestations_valid("block"); - } + let _ = on_attestation(store, attestation, true) + .inspect(|_| metrics::inc_attestations_valid("block")) + .inspect_err(|err| { + warn!(%slot, %validator_id, %err, "Invalid attestation in block"); + metrics::inc_attestations_invalid("block"); + }); } } @@ -424,12 +424,12 @@ pub fn on_block( // Process proposer attestation (enters "new" stage, not "known") // TODO: validate attestations before processing - if let Err(err) = on_attestation(store, proposer_attestation, false) { - metrics::inc_attestations_invalid("block"); - warn!(%slot, %err, "Invalid proposer attestation in block"); - } else { - metrics::inc_attestations_valid("gossip"); - } + let _ = on_attestation(store, proposer_attestation, false) + .inspect(|_| metrics::inc_attestations_valid("gossip")) + .inspect_err(|err| { + warn!(%slot, %err, "Invalid proposer attestation in block"); + metrics::inc_attestations_invalid("block"); + }); info!(%slot, %block_root, %state_root, "Processed new block"); Ok(()) diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 20d64a3..fe3cdde 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -411,11 +411,11 @@ async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { info!(%peer_id, "Redialing disconnected bootnode"); // NOTE: this dial does some checks and adds a pending outbound connection attempt. // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. - if let Err(e) = server.swarm.dial(addr.clone()) { + let _ = server.swarm.dial(addr.clone()).inspect_err(|e| { warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); // Schedule another redial attempt schedule_peer_redial(server.retry_tx.clone(), peer_id); - } + }); } } diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index 32335d7..a1b0021 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -2,7 +2,7 @@ use std::io; use ethlambda_types::primitives::ssz::{Decode, Encode}; use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tracing::trace; +use tracing::{debug, trace}; use super::{ encoding::{decode_payload, write_payload}, @@ -62,42 +62,9 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - let mut result_byte = 0_u8; - io.read_exact(std::slice::from_mut(&mut result_byte)) - .await?; - - let code = ResponseCode::from(result_byte); - - let payload = decode_payload(io).await?; - - // For non-success responses, the payload contains an SSZ-encoded error message - if code != ResponseCode::SUCCESS { - let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid error message: {err:?}"), - ) - })?; - let error_str = String::from_utf8_lossy(&message).to_string(); - trace!(?code, %error_str, "Received error response"); - return Ok(Response::error(code, message)); - } - - // Success responses contain the actual data match protocol.as_ref() { - STATUS_PROTOCOL_V1 => { - let status = Status::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - Ok(Response::success(ResponsePayload::Status(status))) - } - BLOCKS_BY_ROOT_PROTOCOL_V1 => { - let block = - SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| { - io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")) - })?; - Ok(Response::success(ResponsePayload::BlocksByRoot(block))) - } + STATUS_PROTOCOL_V1 => decode_status_response(io).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -135,15 +102,24 @@ impl libp2p::request_response::Codec for Codec { { match resp { Response::Success { payload } => { - // Send success code (0) - io.write_all(&[ResponseCode::SUCCESS.into()]).await?; - - let encoded = match &payload { - ResponsePayload::Status(status) => status.as_ssz_bytes(), - ResponsePayload::BlocksByRoot(block) => block.as_ssz_bytes(), - }; - - write_payload(io, &encoded).await + match &payload { + ResponsePayload::Status(status) => { + // Send success code (0) + io.write_all(&[ResponseCode::SUCCESS.into()]).await?; + let encoded = status.as_ssz_bytes(); + write_payload(io, &encoded).await + } + ResponsePayload::BlocksByRoot(blocks) => { + // Write each block as separate chunk + for block in blocks { + io.write_all(&[ResponseCode::SUCCESS.into()]).await?; + let encoded = block.as_ssz_bytes(); + write_payload(io, &encoded).await?; + } + // Empty response if no blocks found (stream just ends) + Ok(()) + } + } } Response::Error { code, message } => { // Send error code @@ -157,3 +133,110 @@ impl libp2p::request_response::Codec for Codec { } } } + +/// Decodes a Status protocol response from a single-chunk response stream. +/// +/// Reads the response code byte and payload, returning either a success response +/// with the peer's Status or an error response with the error code and message. +/// Unlike multi-chunk protocols, any error code from the peer is treated as a +/// valid response rather than a connection failure. +/// +/// # Returns +/// +/// Returns `Ok(Response::Success)` containing the peer's `Status` if the response +/// code is `SUCCESS`. +/// +/// Returns `Ok(Response::Error)` containing the error code and message if the peer +/// returned a non-success response code. +/// +/// # Errors +/// +/// Returns `Err` if: +/// - I/O error occurs while reading the response code or payload +/// - Peer's error message cannot be SSZ-decoded (InvalidData) +/// - Peer's Status payload cannot be SSZ-decoded (InvalidData) +async fn decode_status_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + let mut result_byte = 0_u8; + io.read_exact(std::slice::from_mut(&mut result_byte)) + .await?; + + let code = ResponseCode::from(result_byte); + let payload = decode_payload(io).await?; + + if code != ResponseCode::SUCCESS { + let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid error message: {err:?}"), + ) + })?; + let error_str = String::from_utf8_lossy(&message).to_string(); + trace!(?code, %error_str, "Received error response"); + return Ok(Response::error(code, message)); + } + + let status = Status::from_ssz_bytes(&payload) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?; + Ok(Response::success(ResponsePayload::Status(status))) +} + +/// Decodes a BlocksByRoot protocol response from a multi-chunk response stream. +/// +/// Reads chunks until EOF, collecting successfully decoded blocks. Each chunk has +/// its own response code - chunks with error codes are logged and skipped rather +/// than terminating the stream. This allows partial success when some requested +/// blocks are unavailable. The stream ends naturally at EOF (peer closes after +/// sending all available blocks). +/// +/// # Returns +/// +/// Always returns `Ok(Response::Success)` containing a vector of successfully +/// decoded blocks. The vector may be empty if no SUCCESS chunks were received +/// before EOF (either no chunks sent, or all chunks had non-SUCCESS codes) +/// +/// # Errors +/// +/// Returns `Err` if: +/// - I/O error occurs while reading response codes or payloads (except `UnexpectedEof` +/// which signals normal stream termination) +/// - Block payload cannot be SSZ-decoded into `SignedBlockWithAttestation` (InvalidData) +/// +/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this +/// function to return `Err` - they are logged and skipped. +async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +where + T: AsyncRead + Unpin + Send, +{ + let mut blocks = Vec::new(); + + loop { + // Read chunk result code + let mut result_byte = 0_u8; + if let Err(e) = io.read_exact(std::slice::from_mut(&mut result_byte)).await { + if e.kind() == io::ErrorKind::UnexpectedEof { + break; + } + return Err(e); + } + + let code = ResponseCode::from(result_byte); + let payload = decode_payload(io).await?; + + if code != ResponseCode::SUCCESS { + let error_message = ErrorMessage::from_ssz_bytes(&payload) + .map(|msg| String::from_utf8_lossy(&msg).to_string()) + .unwrap_or_else(|_| "".to_string()); + debug!(?code, %error_message, "Skipping block chunk with non-success code"); + continue; + } + + let block = SignedBlockWithAttestation::from_ssz_bytes(&payload) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?; + blocks.push(block); + } + + Ok(Response::success(ResponsePayload::BlocksByRoot(blocks))) +} diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index ae6de50..962a9fe 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -8,8 +8,7 @@ use ethlambda_types::block::SignedBlockWithAttestation; use ethlambda_types::primitives::ssz::TreeHash; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponseCode, - ResponsePayload, Status, error_message, + BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, Status, }; use crate::{ BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, @@ -32,13 +31,16 @@ pub async fn handle_req_resp_message( handle_blocks_by_root_request(server, request, channel, peer).await; } }, - request_response::Message::Response { response, .. } => match response { + request_response::Message::Response { + request_id, + response, + } => match response { Response::Success { payload } => match payload { ResponsePayload::Status(status) => { handle_status_response(status, peer).await; } - ResponsePayload::BlocksByRoot(block) => { - handle_blocks_by_root_response(server, block, peer).await; + ResponsePayload::BlocksByRoot(blocks) => { + handle_blocks_by_root_response(server, blocks, peer, request_id).await; } }, Response::Error { code, message } => { @@ -108,58 +110,59 @@ async fn handle_blocks_by_root_request( let num_roots = request.roots.len(); info!(%peer, num_roots, "Received BlocksByRoot request"); - // TODO: Support multiple blocks per request (currently only handles first root) - // The protocol supports up to 1024 roots, but our response type only holds one block. - let Some(root) = request.roots.first() else { - debug!(%peer, "BlocksByRoot request with no roots"); - return; - }; - - match server.store.get_signed_block(root) { - Some(signed_block) => { - let slot = signed_block.message.block.slot; - info!(%peer, %root, %slot, "Responding to BlocksByRoot request"); - - if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( - channel, - Response::success(ResponsePayload::BlocksByRoot(signed_block)), - ) { - warn!(%peer, %root, ?err, "Failed to send BlocksByRoot response"); - } - } - None => { - debug!(%peer, %root, "Block not found for BlocksByRoot request"); - - if let Err(err) = server.swarm.behaviour_mut().req_resp.send_response( - channel, - Response::error( - ResponseCode::RESOURCE_UNAVAILABLE, - error_message("Block not found"), - ), - ) { - warn!(%peer, %root, ?err, "Failed to send RESOURCE_UNAVAILABLE response"); - } + let mut blocks = Vec::new(); + for root in request.roots.iter() { + if let Some(signed_block) = server.store.get_signed_block(root) { + blocks.push(signed_block); } + // Missing blocks are silently skipped (per spec) } + + let found = blocks.len(); + info!(%peer, num_roots, found, "Responding to BlocksByRoot request"); + + let response = Response::success(ResponsePayload::BlocksByRoot(blocks)); + let _ = server + .swarm + .behaviour_mut() + .req_resp + .send_response(channel, response) + .inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response")); } async fn handle_blocks_by_root_response( server: &mut P2PServer, - block: SignedBlockWithAttestation, + blocks: Vec, peer: PeerId, + request_id: request_response::OutboundRequestId, ) { - let slot = block.message.block.slot; - let root = block.message.block.tree_hash_root(); + info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - info!(%peer, %slot, %root, "Received BlocksByRoot response"); + // Look up which root was requested for this specific request + let Some(requested_root) = server.request_id_map.remove(&request_id) else { + warn!(%peer, ?request_id, "Received response for unknown request_id"); + return; + }; - // Clean up tracking (success!) - if server.pending_requests.remove(&root).is_some() { - info!(%root, "Block fetch succeeded"); - server.request_id_map.retain(|_, r| *r != root); - } + for block in blocks { + let root = block.message.block.tree_hash_root(); - server.blockchain.notify_new_block(block).await; + // Validate that this block matches what we requested + if root != requested_root { + warn!( + %peer, + received_root = %ethlambda_types::ShortRoot(&root.0), + expected_root = %ethlambda_types::ShortRoot(&requested_root.0), + "Received block with mismatched root, ignoring" + ); + continue; + } + + // Clean up tracking for this root + server.pending_requests.remove(&root); + + server.blockchain.notify_new_block(block).await; + } } /// Build a Status message from the current Store state. diff --git a/crates/net/p2p/src/req_resp/messages.rs b/crates/net/p2p/src/req_resp/messages.rs index 81ef9d6..3826f5b 100644 --- a/crates/net/p2p/src/req_resp/messages.rs +++ b/crates/net/p2p/src/req_resp/messages.rs @@ -94,9 +94,7 @@ impl std::fmt::Debug for ResponseCode { #[allow(clippy::large_enum_variant)] pub enum ResponsePayload { Status(Status), - - // TODO: here we assume there's a single block per request - BlocksByRoot(SignedBlockWithAttestation), + BlocksByRoot(Vec), } #[derive(Debug, Clone, Encode, Decode)] @@ -117,6 +115,8 @@ pub type ErrorMessage = ssz_types::VariableList; /// Helper to create an ErrorMessage from a string. /// Debug builds panic if message exceeds 256 bytes (programming error). /// Release builds truncate to 256 bytes. +#[expect(dead_code)] +// TODO: map errors to req/resp error messages pub fn error_message(msg: impl AsRef) -> ErrorMessage { let bytes = msg.as_ref().as_bytes(); debug_assert!( diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index fedd572..6b574da 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -8,5 +8,5 @@ pub use encoding::MAX_COMPRESSED_PAYLOAD_SIZE; pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; pub use messages::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, RequestedBlockRoots, Response, - ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status, error_message, + ResponsePayload, STATUS_PROTOCOL_V1, Status, };