fix: stream multipart uploads to avoid OOM on large files#418
fix: stream multipart uploads to avoid OOM on large files#418Acelogic wants to merge 3 commits intogoogleworkspace:mainfrom
Conversation
…space#244) Replace the buffered file read + body copy with a streaming multipart/related body that reads the file in 64 KB chunks. Previously, uploading a file required ~4x the file size in RAM (tokio::fs::read allocates a Vec, then build_multipart_body copies it into a second growing Vec). A 5 GB upload would request ~20 GB of contiguous memory, crashing the process. The new build_multipart_stream function yields preamble, file chunks, and postamble through a futures_util::stream::unfold state machine, keeping memory usage constant regardless of file size. Content-Length is computed from file metadata so Google APIs still receive the correct header. Closes googleworkspace#244
🦋 Changeset detectedLatest commit: 813dd28 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical out-of-memory issue that occurred when uploading large files by refactoring the upload mechanism to use a streaming approach. This change significantly reduces memory consumption and improves the robustness of the application when dealing with substantial data transfers, ensuring a more stable and efficient user experience. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a great improvement, fixing a critical out-of-memory issue by switching to streaming uploads for large files. The implementation is mostly solid, and the addition of tests for the new streaming logic is commendable. I've identified a critical security vulnerability related to path traversal and a high-severity issue regarding the stream implementation's efficiency and maintainability. Addressing these will make the solution more robust and secure.
| let file_size = tokio::fs::metadata(upload_path) | ||
| .await | ||
| .map_err(|e| { | ||
| GwsError::Validation(format!( | ||
| "Failed to read upload file '{}': {}", | ||
| upload_path, e | ||
| )) | ||
| })? | ||
| .len(); |
There was a problem hiding this comment.
The upload_path is used to access the filesystem without prior validation. This could lead to a path traversal vulnerability, allowing an attacker to read arbitrary files by providing a path like ../../etc/passwd. Given that this tool might be used by automated agents (as hinted at in src/validate.rs), this poses a significant security risk.
Before accessing the file system with tokio::fs::metadata, you should validate upload_path to ensure it's a relative path that resolves to a location within the current working directory. You can implement a new validation function similar to validate_safe_dir_path in src/validate.rs for this purpose.
src/executor.rs
Outdated
| // State machine for the streaming body: preamble -> file chunks -> postamble | ||
| enum State { | ||
| Preamble { | ||
| preamble: Vec<u8>, | ||
| file_path: String, | ||
| postamble: Vec<u8>, | ||
| }, | ||
| Streaming { | ||
| file: tokio::fs::File, | ||
| postamble: Vec<u8>, | ||
| }, | ||
| Done, | ||
| } | ||
|
|
||
| let initial = State::Preamble { | ||
| preamble: preamble.into_bytes(), | ||
| file_path: file_path.to_owned(), | ||
| postamble: postamble.into_bytes(), | ||
| }; | ||
|
|
||
| let stream = futures_util::stream::unfold(initial, |state| async move { | ||
| match state { | ||
| State::Preamble { | ||
| preamble, | ||
| file_path, | ||
| postamble, | ||
| } => match tokio::fs::File::open(&file_path).await { | ||
| Ok(file) => Some((Ok(preamble), State::Streaming { file, postamble })), | ||
| Err(e) => Some((Err(e), State::Done)), | ||
| }, | ||
| State::Streaming { | ||
| mut file, | ||
| postamble, | ||
| } => { | ||
| let mut buf = vec![0u8; 64 * 1024]; | ||
| match file.read(&mut buf).await { | ||
| Ok(0) => Some((Ok(postamble), State::Done)), | ||
| Ok(n) => { | ||
| buf.truncate(n); | ||
| Some((Ok(buf), State::Streaming { file, postamble })) | ||
| } | ||
| Err(e) => Some((Err(e), State::Done)), | ||
| } | ||
| } | ||
| State::Done => None, | ||
| } | ||
| }); |
There was a problem hiding this comment.
The current implementation of the streaming body using futures_util::stream::unfold and a manual state machine is quite complex and has a performance drawback: it re-allocates a 64KB buffer for every chunk read from the file. For large files, this results in many allocations, impacting performance.
A more idiomatic and efficient approach in the Tokio ecosystem is to use tokio_util::io::ReaderStream and chain it with streams for the preamble and postamble. This would simplify the code, improve readability, and enhance performance by avoiding repeated buffer allocations. This may require adding tokio-util with the io feature if it's not already a dependency.
Here is a suggested replacement for the state machine and unfold logic:
let stream = {
use futures_util::stream::{self, TryStreamExt};
use tokio_util::io::ReaderStream;
let file_path = file_path.to_owned();
let preamble_bytes: bytes::Bytes = preamble.into_bytes().into();
let postamble_bytes: bytes::Bytes = postamble.into_bytes().into();
let file_stream = stream::once(async move { tokio::fs::File::open(file_path).await })
.map_ok(ReaderStream::new)
.try_flatten_stream();
stream::once(async { Ok(preamble_bytes) })
.chain(file_stream)
.chain(stream::once(async { Ok(postamble_bytes) }))
};Address review feedback: replace the hand-rolled futures_util::stream::unfold state machine with tokio_util::io::ReaderStream chained between preamble and postamble streams. This is more idiomatic, avoids per-chunk buffer re-allocation, and is easier to read.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses an out-of-memory issue with large file uploads by switching from a buffered to a streaming approach. The implementation is solid and introduces a new build_multipart_stream function to handle this. I've included one suggestion to further optimize memory usage within the new streaming logic by avoiding an unnecessary data copy for each file chunk.
| // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes | ||
| let file_path = file_path.to_owned(); | ||
| let preamble_bytes = preamble.into_bytes(); | ||
| let postamble_bytes = postamble.into_bytes(); | ||
|
|
||
| let file_stream = | ||
| futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) | ||
| .map_ok(|f| tokio_util::io::ReaderStream::new(f).map_ok(|b| b.to_vec())) | ||
| .try_flatten(); | ||
|
|
||
| let stream = | ||
| futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) | ||
| .chain(file_stream) | ||
| .chain(futures_util::stream::once(async { | ||
| Ok::<_, std::io::Error>(postamble_bytes) | ||
| })); |
There was a problem hiding this comment.
While this implementation correctly streams the file, it performs an unnecessary allocation and copy for each chunk of the file by converting Bytes to Vec<u8> with .map_ok(|b| b.to_vec()).
Given that the goal of this change is to optimize memory usage for large files, we can further improve efficiency by avoiding this copy. By using reqwest::bytes::Bytes for all parts of the stream, you can achieve a zero-copy implementation for the file chunks. This makes the streaming even more memory-efficient.
| // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes | |
| let file_path = file_path.to_owned(); | |
| let preamble_bytes = preamble.into_bytes(); | |
| let postamble_bytes = postamble.into_bytes(); | |
| let file_stream = | |
| futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) | |
| .map_ok(|f| tokio_util::io::ReaderStream::new(f).map_ok(|b| b.to_vec())) | |
| .try_flatten(); | |
| let stream = | |
| futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) | |
| .chain(file_stream) | |
| .chain(futures_util::stream::once(async { | |
| Ok::<_, std::io::Error>(postamble_bytes) | |
| })); | |
| // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes | |
| let file_path = file_path.to_owned(); | |
| let preamble_bytes = reqwest::bytes::Bytes::from(preamble.into_bytes()); | |
| let postamble_bytes = reqwest::bytes::Bytes::from(postamble.into_bytes()); | |
| let file_stream = | |
| futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) | |
| .map_ok(|f| tokio_util::io::ReaderStream::new(f)) | |
| .try_flatten(); | |
| let stream = | |
| futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) | |
| .chain(file_stream) | |
| .chain(futures_util::stream::once(async { | |
| Ok::<_, std::io::Error>(postamble_bytes) | |
| })); |
Remove the per-chunk .to_vec() copy by using bytes::Bytes throughout the stream chain. ReaderStream already yields Bytes, so the preamble and postamble just need to match the type.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a streaming approach for multipart file uploads to fix out-of-memory issues with large files. The implementation is solid and correctly uses streams to avoid buffering the entire file. I've added one high-severity comment regarding error handling during metadata serialization, which could lead to silent failures. Propagating the error would make the implementation more robust.
| fn build_multipart_stream( | ||
| metadata: &Option<Value>, | ||
| file_path: &str, | ||
| file_size: u64, | ||
| ) -> (reqwest::Body, String, u64) { | ||
| let boundary = format!("gws_boundary_{:016x}", rand::random::<u64>()); | ||
|
|
||
| let media_mime = metadata | ||
| .as_ref() | ||
| .and_then(|m| m.get("mimeType")) | ||
| .and_then(|v| v.as_str()) | ||
| .unwrap_or("application/octet-stream") | ||
| .to_string(); | ||
|
|
||
| let metadata_json = metadata | ||
| .as_ref() | ||
| .map(|m| serde_json::to_string(m).unwrap_or_else(|_| "{}".to_string())) | ||
| .unwrap_or_else(|| "{}".to_string()); | ||
|
|
||
| let preamble = format!( | ||
| "--{boundary}\r\n\ | ||
| Content-Type: application/json; charset=UTF-8\r\n\r\n\ | ||
| {metadata_json}\r\n\ | ||
| --{boundary}\r\n\ | ||
| Content-Type: {media_mime}\r\n\r\n" | ||
| ); | ||
| let postamble = format!("\r\n--{boundary}--\r\n"); | ||
|
|
||
| let content_length = preamble.len() as u64 + file_size + postamble.len() as u64; | ||
| let content_type = format!("multipart/related; boundary={boundary}"); | ||
|
|
||
| // Chain: preamble bytes -> file chunks (via ReaderStream) -> postamble bytes | ||
| // All parts use bytes::Bytes for zero-copy streaming. | ||
| let file_path = file_path.to_owned(); | ||
| let preamble_bytes = bytes::Bytes::from(preamble.into_bytes()); | ||
| let postamble_bytes = bytes::Bytes::from(postamble.into_bytes()); | ||
|
|
||
| let file_stream = | ||
| futures_util::stream::once(async move { tokio::fs::File::open(file_path).await }) | ||
| .map_ok(tokio_util::io::ReaderStream::new) | ||
| .try_flatten(); | ||
|
|
||
| let stream = | ||
| futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) | ||
| .chain(file_stream) | ||
| .chain(futures_util::stream::once(async { | ||
| Ok::<_, std::io::Error>(postamble_bytes) | ||
| })); | ||
|
|
||
| ( | ||
| reqwest::Body::wrap_stream(stream), | ||
| content_type, | ||
| content_length, | ||
| ) | ||
| } |
There was a problem hiding this comment.
The use of unwrap_or_else on lines 798 and 799 can hide a JSON serialization error, causing the upload to proceed with empty metadata ({}). This could lead to silent failures or hard-to-debug server-side errors.
To make this more robust, I recommend changing build_multipart_stream to return a Result and propagating any serialization errors. This aligns with the error handling pattern used elsewhere in the file.
Here's how you could refactor the function:
fn build_multipart_stream(
metadata: &Option<Value>,
file_path: &str,
file_size: u64,
) -> Result<(reqwest::Body, String, u64), GwsError> {
let boundary = format!("gws_boundary_{:016x}", rand::random::<u64>());
let media_mime = metadata
.as_ref()
.and_then(|m| m.get("mimeType"))
.and_then(|v| v.as_str())
.unwrap_or("application/octet-stream")
.to_string();
let metadata_json = match metadata {
Some(m) => serde_json::to_string(m).map_err(|e| {
GwsError::Validation(format!("Failed to serialize upload metadata: {e}"))
})?,
None => "{}".to_string(),
};
let preamble = format!(
"--{boundary}\r\n\
Content-Type: application/json; charset=UTF-8\r\n\r\n\
{metadata_json}\r\n\
--{boundary}\r\n\
Content-Type: {media_mime}\r\n\r\n"
);
let postamble = format!("\r\n--{boundary}--\r\n");
let content_length = preamble.len() as u64 + file_size + postamble.len() as u64;
let content_type = format!("multipart/related; boundary={boundary}");
// ... (rest of the function is fine)
let file_path = file_path.to_owned();
let preamble_bytes = bytes::Bytes::from(preamble.into_bytes());
let postamble_bytes = bytes::Bytes::from(postamble.into_bytes());
let file_stream =
futures_util::stream::once(async move { tokio::fs::File::open(file_path).await })
.map_ok(tokio_util::io::ReaderStream::new)
.try_flatten();
let stream =
futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) })
.chain(file_stream)
.chain(futures_util::stream::once(async {
Ok::<_, std::io::Error>(postamble_bytes)
}));
Ok((
reqwest::Body::wrap_stream(stream),
content_type,
content_length,
))
}You would also need to update the call site in build_http_request to use the ? operator:
// line 196
let (body, content_type, content_length) =
build_multipart_stream(&input.body, upload_path, file_size)?;|
I]have signed the Google CLA. Please re-check. |
Summary
Fixes #244 — uploading large files via
--uploadcauses an out-of-memory crash because the entire file is read into memory twice (once bytokio::fs::read, then copied into a secondVecbybuild_multipart_body). A 5 GB file requests ~20 GB of contiguous RAM.This replaces the buffered approach with a streaming
multipart/relatedbody:build_multipart_streamyields the body in three phases viafutures_util::stream::unfold: preamble (boundary + JSON metadata), file chunks (64 KB reads from disk), and postamble (closing boundary)Content-Lengthis computed fromtokio::fs::metadataso Google APIs still receive the correct header without buffering the fileThe old
build_multipart_bodyis retained under#[cfg(test)]for the existing unit tests.Test plan
cargo clippy -- -D warningspassescargo test— 552/552 pass (2 new tests added)test_build_multipart_stream_content_length— verifies declared Content-Length matches preamble + file size + postambletest_build_multipart_stream_large_file— 256 KB file (larger than 64 KB chunk size) verifies multi-chunk streaming arithmetic