Skip to content

Commit 4cf9c5f

Browse files
authored
fix microsoft OOOM: don't stream into memory (#1392)
I'm fixing a bug here where the implementation of the microsoft adapter was reading an arbitrary stream into memory (using DataChunk class), and instead "just stream directly to the endpoint". It's not _quite_ that simple though: we actually stream twice, for the very reason that the old implementation was streaming into memory: we need to know the filesize. while we're here I'm doing some "leave the campground better" tasks, so here's a more nitty-gritty breakdown of this PR: - microsoft adapter: marking duplicated code Microsoft{Photo,Video}* as needing to rely on newly refactored MicrosoftMedia (so hese kinds of bug fixes are easier to maintain, for example). - DTP: make testability (via DI) easier with for java.net.URL streamers (this has already become a pattern, so I just formalized it and dropped a TODO in the places that are doing this locally; this way, in the future it'll be easier/more obvious what "the pattern" is and how to stop maintaining disparate copies of it) - microsoft adapter: remainder of DataChunk code is now just a POJO, so switching to autovalue and letting the test live in the primary user: the new `StreamChunker` (as a mini "integrated" test). - microsoft adapter: all size-related operations are a `long` now
1 parent 9e92ef7 commit 4cf9c5f

File tree

20 files changed

+518
-254
lines changed

20 files changed

+518
-254
lines changed

extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/media/FlickrMediaImporter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ private static String cleanString(String string) {
318318
return Strings.isNullOrEmpty(string) ? "" : string;
319319
}
320320

321+
// TODO migrate this testability-surface to newly shared JobFileStreamer and RemoteFileStreamer
322+
// of org.datatransferproject.spi.api.transport package.
321323
@VisibleForTesting
322324
class ImageStreamProvider {
323325

extensions/data-transfer/portability-data-transfer-microsoft/build.gradle

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,24 @@ plugins {
2121
dependencies {
2222
compile project(':portability-spi-cloud')
2323
compile project(':portability-spi-transfer')
24+
compile project(':portability-spi-api')
2425

2526
compile "com.squareup.okhttp3:okhttp:${okHttpVersion}"
2627
compile "com.squareup.okhttp3:logging-interceptor:${okHttpVersion}"
2728
compile("com.google.api-client:google-api-client:${googleApiClient}")
2829

30+
compileOnly "com.google.auto.value:auto-value-annotations:${autoValueVersion}"
31+
annotationProcessor "com.google.auto.value:auto-value:${autoValueVersion}"
32+
2933
// REVIEW: We could standardize the version in gradle.propertoes but this would mean all dependent extensions must be revved at the same time
3034
compile 'com.googlecode.ez-vcard:ez-vcard:0.10.3'
3135

36+
testImplementation "org.mockito:mockito-inline:${mockitoInlineVersion}"
37+
testCompile "org.mockito:mockito-core:${mockitoCoreVersion}"
3238
testCompile project(':extensions:auth:portability-auth-harness-microsoft')
3339
testCompile group: 'com.squareup.okhttp', name: 'mockwebserver', version: '2.7.5'
3440
testCompile group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '3.2.0'
3541
testCompile("com.google.http-client:google-http-client-gson:${googleHttpClientVersion}")
36-
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"
37-
3842
}
3943

4044
configurePublication(project)
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,38 @@
11
package org.datatransferproject.transfer.microsoft;
22

3-
import java.io.IOException;
4-
import java.io.InputStream;
5-
import java.util.ArrayList;
6-
import java.util.List;
3+
import com.google.auto.value.AutoValue;
74

8-
/**
9-
This utility class allows us to break up an InputStream into multiple chunks
10-
for part-by-part upload to a service, for example to be consumed in an upload session.
11-
*/
12-
public class DataChunk {
13-
private static final int CHUNK_SIZE = 32000 * 1024; // 32000KiB
5+
/** Describe small buffers of bytes captured from a large java.io Stream. */
6+
@AutoValue
7+
public abstract class DataChunk {
8+
/** Bytes being held in this buffer. */
9+
public abstract byte[] chunk();
1410

15-
private final byte[] data;
16-
private final int size;
17-
private final int rangeStart;
18-
public DataChunk(byte[] data, int size, int rangeStart) {
19-
this.data = data;
20-
this.size = size;
21-
this.rangeStart = rangeStart;
11+
/** Byte count of {@link chunk}. */
12+
public int size() {
13+
return chunk().length;
2214
}
2315

24-
public int getSize() {
25-
return size;
26-
}
16+
/** Index-offset within the original java.io Stream at which {@link chunk} had started. */
17+
public abstract long streamByteOffset();
2718

28-
public byte[] getData() {
29-
return data;
19+
/**
20+
* Index-offset within the original java.io Stream at which the final byte of {@link chunk} lived.
21+
*/
22+
public long finalByteOffset() {
23+
return streamByteOffset() + size() - 1;
3024
}
3125

32-
public int getStart() {
33-
return rangeStart;
26+
public static Builder builder() {
27+
return new org.datatransferproject.transfer.microsoft.AutoValue_DataChunk.Builder();
3428
}
3529

36-
public int getEnd() {
37-
return rangeStart + size - 1;
38-
}
30+
@AutoValue.Builder
31+
public abstract static class Builder {
32+
public abstract Builder setChunk(byte[] value);
3933

40-
public static List<DataChunk> splitData(InputStream inputStream) throws IOException {
41-
ArrayList<DataChunk> chunksToSend = new ArrayList();
42-
byte[] data = new byte[CHUNK_SIZE];
43-
int totalFileSize = 0;
44-
int quantityToSend;
45-
int roomLeft = CHUNK_SIZE;
46-
int offset = 0;
47-
int chunksRead = 0;
34+
public abstract Builder setStreamByteOffset(long value);
4835

49-
// start timing
50-
while ((quantityToSend = inputStream.read(data, offset, roomLeft)) != -1) {
51-
offset += quantityToSend;
52-
roomLeft -= quantityToSend;
53-
if (roomLeft == 0) {
54-
chunksToSend.add(new DataChunk(data, CHUNK_SIZE, chunksRead * CHUNK_SIZE));
55-
chunksRead++;
56-
roomLeft = CHUNK_SIZE;
57-
offset = 0;
58-
totalFileSize += CHUNK_SIZE;
59-
data = new byte[CHUNK_SIZE];
60-
}
61-
}
62-
if (offset != 0) {
63-
chunksToSend.add(new DataChunk(data, offset, chunksRead * CHUNK_SIZE));
64-
totalFileSize += offset;
65-
chunksRead++;
66-
}
67-
return chunksToSend;
36+
public abstract DataChunk build();
6837
}
69-
70-
}
38+
}

extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/MicrosoftTransferExtension.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import okhttp3.OkHttpClient;
1616
import org.datatransferproject.api.launcher.ExtensionContext;
1717
import org.datatransferproject.api.launcher.Monitor;
18+
import org.datatransferproject.spi.api.transport.JobFileStream;
1819
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
1920
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
2021
import org.datatransferproject.types.common.models.DataVertical;
@@ -120,6 +121,7 @@ public void initialize(ExtensionContext context) {
120121
// Create the MicrosoftCredentialFactory with the given {@link AppCredentials}.
121122
MicrosoftCredentialFactory credentialFactory =
122123
new MicrosoftCredentialFactory(httpTransport, jsonFactory, appCredentials);
124+
JobFileStream jobFileStream = new JobFileStream();
123125

124126
Monitor monitor = context.getMonitor();
125127

@@ -132,9 +134,9 @@ public void initialize(ExtensionContext context) {
132134
new MicrosoftCalendarImporter(BASE_GRAPH_URL, client, mapper, transformerService));
133135
importBuilder.put(
134136
PHOTOS, new MicrosoftPhotosImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor,
135-
credentialFactory));
137+
credentialFactory, jobFileStream));
136138
importBuilder.put(MEDIA, new MicrosoftMediaImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor,
137-
credentialFactory));
139+
credentialFactory, jobFileStream));
138140
importerMap = importBuilder.build();
139141

140142
ImmutableMap.Builder<DataVertical, Exporter> exporterBuilder = ImmutableMap.builder();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.datatransferproject.transfer.microsoft;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.Optional;
6+
7+
/**
8+
* Allows tracking reads across a stream.
9+
*
10+
* <p>Does not close the held input stream.
11+
*/
12+
public class StreamChunker {
13+
private final int chunkSizeBytes;
14+
private final InputStream inputStream;
15+
16+
private long streamByteOffset = 0;
17+
18+
public StreamChunker(int chunkSizeBytes, InputStream inputStream) {
19+
this.inputStream = inputStream;
20+
this.chunkSizeBytes = chunkSizeBytes;
21+
}
22+
23+
/**
24+
* Constructs a new DataChunk from just {@code chunkSizeBytes} bytes of the stream.
25+
*
26+
* <p>Returned chunk will be less than or equal to chunkSizeBytes, or absent if no bytes were
27+
* remaining in the stream.
28+
*/
29+
public Optional<DataChunk> nextChunk() throws IOException {
30+
byte[] chunkOfData = inputStream.readNBytes(chunkSizeBytes);
31+
Optional<DataChunk> resp =
32+
chunkOfData.length == 0
33+
? Optional.empty()
34+
: Optional.of(
35+
DataChunk.builder()
36+
.setChunk(chunkOfData)
37+
.setStreamByteOffset(streamByteOffset)
38+
.build());
39+
streamByteOffset += chunkOfData.length;
40+
return resp;
41+
}
42+
}

0 commit comments

Comments
 (0)