Skip to content

Commit cdbc06e

Browse files
Multipart uploads for large objects
1 parent 5b1a8d3 commit cdbc06e

File tree

4 files changed

+102
-23
lines changed

4 files changed

+102
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ opt-level = 0
1515
[dependencies]
1616
base64 = "0.13.0"
1717
bytes = "0.4.9"
18+
bytes_1 = { version = "1", package = "bytes" }
1819
chrono = { version = "0.4", features = ["serde"] }
1920
crates-index = "0.18"
2021
crossbeam-utils = "0.8"

src/report/archives.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ fn write_all_archive<DB: ReadResults, W: ReportWriter>(
122122
} else {
123123
std::thread::sleep(std::time::Duration::from_secs(2));
124124
warn!(
125-
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes)",
126-
i, RETRIES, len,
125+
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes) (error: {:?})",
126+
i, RETRIES, len, e,
127127
);
128128
continue;
129129
}

src/report/s3.rs

Lines changed: 98 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,28 +81,105 @@ impl ReportWriter for S3Writer {
8181
mime: &Mime,
8282
encoding_type: EncodingType,
8383
) -> Fallible<()> {
84-
let mut request = self
85-
.client
86-
.put_object()
87-
.body(aws_smithy_http::byte_stream::ByteStream::from(s))
88-
.acl(aws_sdk_s3::model::ObjectCannedAcl::PublicRead)
89-
.key(format!(
90-
"{}/{}",
91-
self.prefix,
92-
path.as_ref().to_str().unwrap()
93-
))
94-
.content_type(mime.to_string())
95-
.bucket(self.bucket.clone());
96-
match encoding_type {
97-
EncodingType::Plain => {}
98-
EncodingType::Gzip => {
99-
request = request.content_encoding("gzip");
84+
// At least 50 MB, then use a multipart upload...
85+
if s.len() >= 50 * 1024 * 1024 {
86+
let mut request = self
87+
.client
88+
.create_multipart_upload()
89+
.acl(aws_sdk_s3::model::ObjectCannedAcl::PublicRead)
90+
.key(format!(
91+
"{}/{}",
92+
self.prefix,
93+
path.as_ref().to_str().unwrap()
94+
))
95+
.content_type(mime.to_string())
96+
.bucket(self.bucket.clone());
97+
match encoding_type {
98+
EncodingType::Plain => {}
99+
EncodingType::Gzip => {
100+
request = request.content_encoding("gzip");
101+
}
100102
}
101-
}
102-
match self.runtime.block_on(request.send()) {
103-
Ok(_) => Ok(()),
104-
Err(e) => {
105-
failure::bail!("Failed to upload to {:?}: {:?}", path.as_ref(), e);
103+
let upload = match self.runtime.block_on(request.send()) {
104+
Ok(u) => u,
105+
Err(e) => {
106+
failure::bail!("Failed to upload to {:?}: {:?}", path.as_ref(), e);
107+
}
108+
};
109+
110+
let chunk_size = 20 * 1024 * 1024;
111+
let bytes = bytes_1::Bytes::from(s);
112+
let mut part = 1;
113+
let mut start = 0;
114+
let mut parts = aws_sdk_s3::model::CompletedMultipartUpload::builder();
115+
while start < bytes.len() {
116+
let chunk = bytes.slice(start..std::cmp::min(start + chunk_size, bytes.len()));
117+
118+
let request = self
119+
.client
120+
.upload_part()
121+
.part_number(part)
122+
.body(chunk.into())
123+
.upload_id(upload.upload_id().unwrap())
124+
.key(upload.key().unwrap())
125+
.bucket(self.bucket.clone());
126+
match self.runtime.block_on(request.send()) {
127+
Ok(p) => {
128+
parts = parts.parts(
129+
aws_sdk_s3::model::CompletedPart::builder()
130+
.e_tag(p.e_tag.clone().unwrap())
131+
.part_number(part)
132+
.build(),
133+
)
134+
}
135+
Err(e) => {
136+
failure::bail!("Failed to upload to {:?}: {:?}", path.as_ref(), e);
137+
}
138+
};
139+
140+
start += chunk_size;
141+
part += 1;
142+
}
143+
144+
let request = self
145+
.client
146+
.complete_multipart_upload()
147+
.multipart_upload(parts.build())
148+
.upload_id(upload.upload_id().unwrap())
149+
.key(upload.key().unwrap())
150+
.bucket(self.bucket.clone());
151+
match self.runtime.block_on(request.send()) {
152+
Ok(_) => (),
153+
Err(e) => {
154+
failure::bail!("Failed to upload to {:?}: {:?}", path.as_ref(), e);
155+
}
156+
};
157+
158+
Ok(())
159+
} else {
160+
let mut request = self
161+
.client
162+
.put_object()
163+
.body(aws_smithy_http::byte_stream::ByteStream::from(s))
164+
.acl(aws_sdk_s3::model::ObjectCannedAcl::PublicRead)
165+
.key(format!(
166+
"{}/{}",
167+
self.prefix,
168+
path.as_ref().to_str().unwrap()
169+
))
170+
.content_type(mime.to_string())
171+
.bucket(self.bucket.clone());
172+
match encoding_type {
173+
EncodingType::Plain => {}
174+
EncodingType::Gzip => {
175+
request = request.content_encoding("gzip");
176+
}
177+
}
178+
match self.runtime.block_on(request.send()) {
179+
Ok(_) => Ok(()),
180+
Err(e) => {
181+
failure::bail!("Failed to upload to {:?}: {:?}", path.as_ref(), e);
182+
}
106183
}
107184
}
108185
}

0 commit comments

Comments
 (0)