-
Notifications
You must be signed in to change notification settings - Fork 1
Description
[Claude Code]
Problem
Large JSON-RPC responses (e.g., eth_getLogs returning thousands of entries) currently materialize the entire serialized result as a Box<RawValue> — a complete in-memory string — before any bytes are sent to the client. For a 500MB response, that's 500MB of heap.
Proposal
Add streaming serialization — writing the JSON-RPC response incrementally to the HTTP body via chunked transfer encoding. The response is still a complete, spec-compliant JSON-RPC 2.0 response. The chunks are a transport-level concern, invisible at the protocol level.
User-facing change: route_streaming() instead of route() for methods that may produce large responses. Same handler signatures. The framework handles everything else.
use ajj::{Router, StreamArray};
async fn get_logs(
params: GetLogsParams,
state: DbHandle,
) -> Result<StreamArray<impl Iterator<Item = Log>>, MyError> {
let logs = state.query_logs(¶ms)?;
Ok(StreamArray(logs.into_iter()))
}
let router = Router::<DbHandle>::new()
.route("eth_blockNumber", get_block_number) // normal
.route_streaming("eth_getLogs", get_logs) // streaming
.with_state(db);Over HTTP: eth_getLogs writes the response incrementally via chunked transfer encoding. Over WebSocket/IPC: it materializes normally (WS requires complete messages per JSON-RPC spec).
Design
Core idea
- Add
write_to(&mut dyn Write)toRpcSend— write-based serialization with a default impl that round-trips throughinto_raw_value. The blanket impl forSerializetypes overrides this to useserde_json::to_writer(no intermediate String). - Add
StreamArray<I>helper — wraps an iterator, serializes elements incrementally to the writer. - Add
StreamingHandler<T, S>trait — parallel toHandler, returns a deferred (unserialized) payload instead ofBox<RawValue>. - Extend
Method<S>andRouter—route_streaming()registration, streaming-aware dispatch. - Axum handler — detects streaming methods on single requests, writes JSON-RPC envelope + payload incrementally to chunked HTTP body via
DuplexStream+spawn_blocking+SyncIoBridge. - Non-HTTP transports — streaming methods fall back to materialization automatically.
Why a separate StreamingHandler trait
The existing Handler<T, S> trait constrains Future::Output = Option<Box<RawValue>>. Serialization happens inside the handler's future (in impl_handler_call!(@finish) which calls Response::build_response()), so by the time the transport sees the result, it's fully materialized. To defer serialization, we need a different future output type, which requires a separate trait.
The user's handler closures are identical — same signatures, same return types. Only the registration call changes (route_streaming vs route).
What does NOT change
- The
Handler<T, S>trait - The
Route/RouteFuture/BatchFuturetypes - All existing handler blanket impls
- All existing transport code paths
- The
ResponsePayload/ErrorPayloadtypes Cargo.tomldependencies (tokio-utilwith["io", "rt"]is already non-optional)
Implementation plan
Phase 1: Core types (no behavior change)
src/primitives.rs — write_to on RpcSend + StreamArray<I>
Add provided method to RpcSend:
fn write_to(self, writer: &mut dyn std::io::Write) -> serde_json::Result<()>
where Self: Sized
{
let rv = self.into_raw_value()?;
writer.write_all(rv.get().as_bytes()).map_err(serde_json::Error::io)
}Override in blanket impl to use serde_json::to_writer. Add StreamArray<I> with custom write_to that iterates and serializes one element at a time.
src/types/resp/streaming.rs (new) — DeferredPayload + StreamingOutput
pub(crate) trait DeferredPayload: Send {
fn write_response(self: Box<Self>, id: &RawValue, writer: &mut dyn Write) -> serde_json::Result<()>;
fn materialize(self: Box<Self>, id: &RawValue) -> Box<RawValue>;
}
pub(crate) enum StreamingOutput {
Deferred { id: Box<RawValue>, payload: Box<dyn DeferredPayload> },
Materialized(Box<RawValue>),
}DeferredPayload impl for ResponsePayload<T, E>:
write_response: writes{"jsonrpc":"2.0","id":ID,"result":then callsT::write_to(writer), then}. For errors, serializes error payload normally.materialize: delegates to existingResponse::build_response.
Phase 2: Streaming handler infrastructure
src/macros.rs — impl_streaming_handler_call!
New macro parallel to impl_handler_call!. Identical structure but @finish wraps ResponsePayload<T, E> in Box<dyn DeferredPayload> and returns Option<StreamingOutput> instead of calling Response::build_response().
src/routes/streaming.rs (new) — StreamingHandler trait + type erasure
pub(crate) trait StreamingHandler<T, S>: Clone + Send + Sync + Sized + 'static {
type Future: Future<Output = Option<StreamingOutput>> + Send + 'static;
fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future;
}8 blanket impls (4 argument patterns × 2 return types):
| Arguments | Result |
ResponsePayload |
|---|---|---|
(P) |
✓ | ✓ |
(HandlerCtx, P) |
✓ | ✓ |
(P, S) |
✓ | ✓ |
(HandlerCtx, P, S) |
✓ | ✓ |
Type erasure following existing erased.rs pattern: ErasedIntoStreamingRoute<S>, BoxedIntoStreamingRoute<S>, MakeErasedStreamingHandler<H, S>.
StreamingRoute is NOT a tower Service — no middleware benefit for an HTTP-only streaming path. Simple cloneable boxed async fn.
src/routes/method.rs — Extend Method<S>
pub(crate) enum Method<S> {
Needs(BoxedIntoRoute<S>),
Ready(Route),
StreamingNeeds(BoxedIntoStreamingRoute<S>),
StreamingReady(StreamingRoute),
}is_streaming(),with_state()update,call_streaming_with_state()- Existing
call_with_state()for streaming variants materializes the deferred output (ensures WS/IPC work automatically)
Phase 3: Router integration
src/router.rs — route_streaming() + streaming dispatch
pub fn route_streaming<H, T>(self, method: impl Into<Cow<'static, str>>, handler: H) -> Self
where
H: StreamingHandler<T, S>,
T: Send + 'static,
S: Clone + Send + Sync + 'static,is_streaming_method()andcall_streaming_with_state()onRouterInner- Update
nest(),merge(),with_state()to handle newMethodvariants
Phase 4: Axum HTTP streaming
src/axum.rs — Streaming response path
After parsing InboundData:
- Peek:
req.single()+ iterate to check method name +is_streaming_method().InboundData::iter()borrows, soreqremains available for fallback. - If streaming:
- Re-parse single request, create
HandlerArgs, callcall_streaming_with_state - For
StreamingOutput::Deferred { id, payload }:tokio::io::duplex(65536)creates bounded pipespawn_blocking:SyncIoBridgewraps write half,payload.write_response(&id, &mut bridge)ReaderStreamwraps read half →Body::from_stream()→ chunked HTTP response
- For
StreamingOutput::Materialized(rv): existingBox<str>response path
- Re-parse single request, create
- If not streaming: existing
call_batch_with_statepath unchanged
Phase 5: Tests and documentation
- Unit tests:
StreamArray::write_to,StreamArray::into_raw_value,DeferredPayload::write_responsevsmaterializeequivalence - Integration test (axum feature): streaming handler end-to-end over HTTP
- Rustdoc for
StreamArray,route_streaming(),RpcSend::write_to
Notes
- Batch requests: streaming methods in batch requests materialize (batch responses require a complete JSON array). Only single HTTP requests get streaming.
- Error mid-stream: if
write_responsefails, theDuplexStreamwriter drops, the HTTP response terminates with incomplete JSON (no final zero-length chunk). The client detects this as a failed transfer. Same behavior as any server crash mid-response. - Backpressure:
DuplexStreambuffer (64KB) backpressures the serializer viaSyncIoBridge. If the client reads slowly, thespawn_blockingthread blocks. This is expected and acceptable. - OTEL metrics:
@sentbyte count for streaming responses is deferred/omitted initially. Can be added via aCountingWriterwrapper in a follow-up.