Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/skills/test-pr-devnet/scripts/check-status.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ echo "Error counts:"
for node in zeam_0 ream_0 qlean_0 ethlambda_0; do
if docker ps --format "{{.Names}}" | grep -q "^$node$"; then
COUNT=$(docker logs "$node" 2>&1 | grep -c "ERROR" || echo "0")
COUNT=$(echo "$COUNT" | tr -d '\n' | xargs)
if [[ "$COUNT" -eq 0 ]]; then
echo -e " $node: ${GREEN}$COUNT${NC}"
else
Expand Down
3 changes: 2 additions & 1 deletion .claude/skills/test-pr-devnet/scripts/test-branch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -euo pipefail
# Usage: ./test-branch.sh [branch-name] [--with-sync-test]

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)"
ETHLAMBDA_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)"
LEAN_QUICKSTART="${LEAN_QUICKSTART:-/Users/mega/lean_consensus/lean-quickstart}"

# Colors
Expand Down Expand Up @@ -210,6 +210,7 @@ echo "Blocks published: $BLOCKS_PUBLISHED"

# Count errors
ERROR_COUNT=$(docker logs ethlambda_0 2>&1 | grep -c "ERROR" || echo "0")
ERROR_COUNT=$(echo "$ERROR_COUNT" | tr -d '\n' | xargs)
if [[ "$ERROR_COUNT" -eq 0 ]]; then
echo -e "Errors: ${GREEN}$ERROR_COUNT${NC}"
else
Expand Down
45 changes: 45 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,51 @@ let entries = vec![(key, value)];
batch.put_batch(Table::X, entries).expect("msg");
```

### Error Handling Patterns

**Use `inspect` and `inspect_err` for side-effect-only error handling:**
```rust
// ✅ GOOD: Use inspect_err when only logging or performing side effects on error
result
.inspect_err(|err| warn!(%err, "Operation failed"));

// Extract complex expressions to variables for cleaner formatting
let response = Response::success(ResponsePayload::BlocksByRoot(blocks));
server.swarm.behaviour_mut().req_resp.send_response(channel, response)
.inspect_err(|err| warn!(%peer, ?err, "Failed to send response"));

// ✅ GOOD: Use inspect + inspect_err when both branches need side effects
operation()
.inspect(|_| metrics::inc_success())
.inspect_err(|_| metrics::inc_failed());

// ❌ AVOID: Using if let Err when only performing side effects
if let Err(err) = result {
warn!(%err, "Operation failed");
}

// ❌ AVOID: Using if/else for both success and error side effects
if let Err(err) = operation() {
metrics::inc_failed();
} else {
metrics::inc_success();
}
```

**When NOT to use `inspect_err`:**
```rust
// Use if let Err or match when:
// 1. Early return needed
if let Err(err) = operation() {
error!(%err, "Fatal error");
return false;
}

// 2. Error needs transformation (use map_err + ?)
let result = operation()
.map_err(|err| CustomError::from(err))?;
```

### Metrics (RAII Pattern)
```rust
// Timing guard automatically observes duration on drop
Expand Down
17 changes: 9 additions & 8 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ impl BlockChainServer {

fn request_missing_block(&mut self, block_root: H256) {
// Send request to P2P layer (deduplication handled by P2P module)
if let Err(err) = self.p2p_tx.send(P2PMessage::FetchBlock(block_root)) {
error!(%block_root, %err, "Failed to send FetchBlock message to P2P");
} else {
info!(%block_root, "Requested missing block from network");
}
let _ = self
.p2p_tx
.send(P2PMessage::FetchBlock(block_root))
.inspect(|_| info!(%block_root, "Requested missing block from network"))
.inspect_err(
|err| error!(%block_root, %err, "Failed to send FetchBlock message to P2P"),
);
}

fn process_pending_children(&mut self, parent_root: H256) {
Expand All @@ -347,9 +349,8 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: SignedAttestation) {
if let Err(err) = store::on_gossip_attestation(&mut self.store, attestation) {
warn!(%err, "Failed to process gossiped attestation");
}
let _ = store::on_gossip_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}
}

Expand Down
24 changes: 12 additions & 12 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ pub fn on_block(
data: att.data.clone(),
};
// TODO: validate attestations before processing
if let Err(err) = on_attestation(store, attestation, true) {
warn!(%slot, %validator_id, %err, "Invalid attestation in block");
metrics::inc_attestations_invalid("block");
} else {
metrics::inc_attestations_valid("block");
}
let _ = on_attestation(store, attestation, true)
.inspect(|_| metrics::inc_attestations_valid("block"))
.inspect_err(|err| {
warn!(%slot, %validator_id, %err, "Invalid attestation in block");
metrics::inc_attestations_invalid("block");
});
}
}

Expand All @@ -424,12 +424,12 @@ pub fn on_block(

// Process proposer attestation (enters "new" stage, not "known")
// TODO: validate attestations before processing
if let Err(err) = on_attestation(store, proposer_attestation, false) {
metrics::inc_attestations_invalid("block");
warn!(%slot, %err, "Invalid proposer attestation in block");
} else {
metrics::inc_attestations_valid("gossip");
}
let _ = on_attestation(store, proposer_attestation, false)
.inspect(|_| metrics::inc_attestations_valid("gossip"))
.inspect_err(|err| {
warn!(%slot, %err, "Invalid proposer attestation in block");
metrics::inc_attestations_invalid("block");
});

info!(%slot, %block_root, %state_root, "Processed new block");
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,11 @@ async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) {
info!(%peer_id, "Redialing disconnected bootnode");
// NOTE: this dial does some checks and adds a pending outbound connection attempt.
// It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event.
if let Err(e) = server.swarm.dial(addr.clone()) {
let _ = server.swarm.dial(addr.clone()).inspect_err(|e| {
warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
// Schedule another redial attempt
schedule_peer_redial(server.retry_tx.clone(), peer_id);
}
});
}
}

Expand Down
173 changes: 128 additions & 45 deletions crates/net/p2p/src/req_resp/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;

use ethlambda_types::primitives::ssz::{Decode, Encode};
use libp2p::futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::trace;
use tracing::{debug, trace};

use super::{
encoding::{decode_payload, write_payload},
Expand Down Expand Up @@ -62,42 +62,9 @@ impl libp2p::request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
let mut result_byte = 0_u8;
io.read_exact(std::slice::from_mut(&mut result_byte))
.await?;

let code = ResponseCode::from(result_byte);

let payload = decode_payload(io).await?;

// For non-success responses, the payload contains an SSZ-encoded error message
if code != ResponseCode::SUCCESS {
let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid error message: {err:?}"),
)
})?;
let error_str = String::from_utf8_lossy(&message).to_string();
trace!(?code, %error_str, "Received error response");
return Ok(Response::error(code, message));
}

// Success responses contain the actual data
match protocol.as_ref() {
STATUS_PROTOCOL_V1 => {
let status = Status::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}"))
})?;
Ok(Response::success(ResponsePayload::Status(status)))
}
BLOCKS_BY_ROOT_PROTOCOL_V1 => {
let block =
SignedBlockWithAttestation::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}"))
})?;
Ok(Response::success(ResponsePayload::BlocksByRoot(block)))
}
STATUS_PROTOCOL_V1 => decode_status_response(io).await,
BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await,
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand Down Expand Up @@ -135,15 +102,24 @@ impl libp2p::request_response::Codec for Codec {
{
match resp {
Response::Success { payload } => {
// Send success code (0)
io.write_all(&[ResponseCode::SUCCESS.into()]).await?;

let encoded = match &payload {
ResponsePayload::Status(status) => status.as_ssz_bytes(),
ResponsePayload::BlocksByRoot(block) => block.as_ssz_bytes(),
};

write_payload(io, &encoded).await
match &payload {
ResponsePayload::Status(status) => {
// Send success code (0)
io.write_all(&[ResponseCode::SUCCESS.into()]).await?;
let encoded = status.as_ssz_bytes();
write_payload(io, &encoded).await
}
ResponsePayload::BlocksByRoot(blocks) => {
// Write each block as separate chunk
for block in blocks {
io.write_all(&[ResponseCode::SUCCESS.into()]).await?;
let encoded = block.as_ssz_bytes();
write_payload(io, &encoded).await?;
}
// Empty response if no blocks found (stream just ends)
Ok(())
}
}
}
Response::Error { code, message } => {
// Send error code
Expand All @@ -157,3 +133,110 @@ impl libp2p::request_response::Codec for Codec {
}
}
}

/// Decodes a Status protocol response from a single-chunk response stream.
///
/// Reads the response code byte and payload, returning either a success response
/// with the peer's Status or an error response with the error code and message.
/// Unlike multi-chunk protocols, any error code from the peer is treated as a
/// valid response rather than a connection failure.
///
/// # Returns
///
/// Returns `Ok(Response::Success)` containing the peer's `Status` if the response
/// code is `SUCCESS`.
///
/// Returns `Ok(Response::Error)` containing the error code and message if the peer
/// returned a non-success response code.
///
/// # Errors
///
/// Returns `Err` if:
/// - I/O error occurs while reading the response code or payload
/// - Peer's error message cannot be SSZ-decoded (InvalidData)
/// - Peer's Status payload cannot be SSZ-decoded (InvalidData)
async fn decode_status_response<T>(io: &mut T) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
let mut result_byte = 0_u8;
io.read_exact(std::slice::from_mut(&mut result_byte))
.await?;

let code = ResponseCode::from(result_byte);
let payload = decode_payload(io).await?;

if code != ResponseCode::SUCCESS {
let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid error message: {err:?}"),
)
})?;
let error_str = String::from_utf8_lossy(&message).to_string();
trace!(?code, %error_str, "Received error response");
return Ok(Response::error(code, message));
}

let status = Status::from_ssz_bytes(&payload)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?;
Ok(Response::success(ResponsePayload::Status(status)))
}

/// Decodes a BlocksByRoot protocol response from a multi-chunk response stream.
///
/// Reads chunks until EOF, collecting successfully decoded blocks. Each chunk has
/// its own response code - chunks with error codes are logged and skipped rather
/// than terminating the stream. This allows partial success when some requested
/// blocks are unavailable. The stream ends naturally at EOF (peer closes after
/// sending all available blocks).
///
/// # Returns
///
/// Always returns `Ok(Response::Success)` containing a vector of successfully
/// decoded blocks. The vector may be empty if no SUCCESS chunks were received
/// before EOF (either no chunks sent, or all chunks had non-SUCCESS codes)
///
/// # Errors
///
/// Returns `Err` if:
/// - I/O error occurs while reading response codes or payloads (except `UnexpectedEof`
/// which signals normal stream termination)
/// - Block payload cannot be SSZ-decoded into `SignedBlockWithAttestation` (InvalidData)
///
/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this
/// function to return `Err` - they are logged and skipped.
async fn decode_blocks_by_root_response<T>(io: &mut T) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
let mut blocks = Vec::new();

loop {
// Read chunk result code
let mut result_byte = 0_u8;
if let Err(e) = io.read_exact(std::slice::from_mut(&mut result_byte)).await {
if e.kind() == io::ErrorKind::UnexpectedEof {
break;
}
return Err(e);
}

let code = ResponseCode::from(result_byte);
let payload = decode_payload(io).await?;

if code != ResponseCode::SUCCESS {
let error_message = ErrorMessage::from_ssz_bytes(&payload)
.map(|msg| String::from_utf8_lossy(&msg).to_string())
.unwrap_or_else(|_| "<invalid error message>".to_string());
debug!(?code, %error_message, "Skipping block chunk with non-success code");
continue;
}

let block = SignedBlockWithAttestation::from_ssz_bytes(&payload)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}")))?;
blocks.push(block);
}

Ok(Response::success(ResponsePayload::BlocksByRoot(blocks)))
}
Loading