Conversation
We were already testing this in the cli tests, so not a huge win but I mostly run integration-tests so well.
| pub struct MultipartStreamRequest(pub(super) super::ExecutionRequest); | ||
|
|
||
| impl MultipartStreamRequest { | ||
| pub async fn collect<B>(self) -> B |
There was a problem hiding this comment.
⚡ Suggestion
The collect method in both MultipartStreamRequest and SseStreamRequest is identical. Consider refactoring this method into a shared utility function to avoid code duplication.
| use gateway_core::StreamingFormat; | ||
| use headers::HeaderMapExt; | ||
|
|
||
| pub struct MultipartStreamRequest(pub(super) super::ExecutionRequest); |
There was a problem hiding this comment.
⚡ Suggestion
Consider using a more descriptive name for the MultipartStreamRequest and SseStreamRequest structs to clearly indicate their purpose and usage.
| where | ||
| B: Default + Extend<serde_json::Value>, | ||
| { | ||
| self.await.stream.collect().await |
There was a problem hiding this comment.
🐛 Bug
In both the MultipartStreamRequest::collect and SseStreamRequest::collect methods, the line self.await.stream.collect().await is incorrect because self is not a future and cannot be awaited. The correct approach would be to access the stream field of the ExecutionRequest and then collect it.
| self.await.stream.collect().await | |
| self.0.stream.collect().await |
| let request = BatchRequest::Single(self.0.request.into_engine_request()); | ||
| Box::pin(async move { | ||
| let response = self.0.engine.execute(headers, request).await; | ||
| let stream = multipart_stream::parse(response.body.into_stream().map_ok(Into::into), "-") |
There was a problem hiding this comment.
🐛 Bug
The use of unwrap in serde_json::from_slice(&result.unwrap().body).unwrap() and serde_json::from_slice(msg.data()).unwrap() can lead to panics if the result is an error or if the data is not valid JSON. It is important to handle these cases more gracefully to improve robustness and prevent potential panics. Consider handling the Err case properly to ensure the application can handle unexpected data without crashing.
Suggestion:
| let stream = multipart_stream::parse(response.body.into_stream().map_ok(Into::into), "-") | |
| Ok(async_sse::Event::Message(msg)) => serde_json::from_slice(msg.data()).unwrap_or_else(|_| serde_json::Value::String("Invalid JSON".into())) |
devslovecoffee
left a comment
There was a problem hiding this comment.
PR Review Summary
This pull request has been reviewed. Please check the comments and suggestions provided.
| Box::pin(async move { | ||
| let response = self.0.engine.execute(headers, request).await; | ||
| let stream = multipart_stream::parse(response.body.into_stream().map_ok(Into::into), "-") | ||
| .map(|result| serde_json::from_slice(&result.unwrap().body).unwrap()); |
There was a problem hiding this comment.
⚡ Suggestion
The unwrap() calls in the MultipartStreamRequest and SseStreamRequest implementations can lead to panics if the Result is an Err. Consider handling the error more gracefully, perhaps by using ? to propagate the error or by providing a default value.
| let stream = async_sse::decode(stream.into_async_read()) | ||
| .into_stream() | ||
| .try_take_while(|event| { | ||
| let take = if let async_sse::Event::Message(msg) = event { |
There was a problem hiding this comment.
⚡ Suggestion
The use of string literals like "complete" and "Got retry?" in the SseStreamRequest implementation can be considered as excessive use of literals. Consider defining these as constants to improve maintainability and readability.
| let mut headers = http::HeaderMap::new(); | ||
|
|
||
| for (key, value) in &self.headers { | ||
| let key = HeaderName::from_str(key).unwrap(); |
There was a problem hiding this comment.
⚡ Suggestion
Consider handling potential errors when using unwrap() to avoid panics. Specifically, unwrap() is used on HeaderName::from_str, HeaderValue::from_str, Box::pin, and serde_json::to_string_pretty. In each of these cases, if the operation fails, it could lead to a panic. Handling these errors more gracefully would improve the robustness of the code.
✨
Description by Cal
PR Description
This PR introduces a new feature for SSE client subscription testing. It includes configuration changes, dependency updates, and new test cases for SSE and multipart streaming in a federated GraphQL environment.
Key Issues
None
Files Changed
File: /.callstack.yml
Added configuration for Callstack.ai PR review with modules for description, bug hunting, code suggestions, performance, and security.File: /.github/workflows/callstack-reviewer.yml
Created a GitHub Actions workflow for Callstack.ai PR review with inputs for configuration and commit SHAs.File: /Cargo.lock
Updated dependencies to include 'async-sse' and 'bytes'.File: /Cargo.toml
Added 'async-sse' and 'multipart-stream' as dependencies.File: /engine/crates/gateway-core/Cargo.toml
Updated dependencies to use workspace versions for 'async-sse', 'async-trait', and 'multipart-stream'.File: /engine/crates/integration-tests/Cargo.toml
Updated dependencies to use workspace versions for 'async-sse', 'bytes', and 'multipart-stream'.File: /engine/crates/integration-tests/docker-compose.yml
Modified Docker Compose file to add a new service 'sse-subgraph' and changed restart policies for existing services.File: /engine/crates/integration-tests/src/federation/mod.rs
Refactored federation module to move request handling to a separate file.File: /engine/crates/integration-tests/src/federation/request.rs
Introduced a new module for handling GraphQL request execution and response processing.File: /engine/crates/integration-tests/src/federation/request/stream.rs
Added new module for handling multipart and SSE streaming requests.File: /engine/crates/integration-tests/tests/federation/subscriptions/mod.rs
Created a new module for subscription tests, including multipart and SSE tests.File: /engine/crates/integration-tests/tests/federation/subscriptions/sse.rs
Added test cases for SSE subscriptions in a federated GraphQL environment.Please include a summary of the change and which issue is fixed.
Please also include relevant motivation and context.
Type of change