Skip to content

Commit 848b99f

Browse files
authored
Merge pull request #304 from marklogic/feature/17142-stream-gzip-files
MLE-17142 Can now stream gzip files on import
2 parents 59dc9e8 + e243894 commit 848b99f

File tree

8 files changed

+124
-32
lines changed

8 files changed

+124
-32
lines changed

docs/reading-data/reading-files/generic-file-support.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ The connector also supports the following
4646

4747
## Reading and writing large binary files
4848

49-
The 2.3.2 connector introduces a fix for reading and writing large binary files to MarkLogic, allowing for the contents
49+
The 2.4.0 connector introduces support for reading and writing large binary files to MarkLogic, allowing for the contents
5050
of each file to be streamed from its source to MarkLogic. This avoids an issue where the Spark environment runs out
5151
of memory while trying to fit the contents of a file into an in-memory row.
5252

@@ -62,6 +62,8 @@ Files read from the MarkLogic Spark connector with the above option can then be
6262
with the same option above being passed to the writer. The connector will then stream the contents of each file to
6363
MarkLogic, submitting one request to MarkLogic per document.
6464

65+
The `spark.marklogic.streamFiles` option can also be used when reading GZIP, ZIP, and archive files.
66+
6567
## Reading any file
6668

6769
If you wish to read files without any special handling provided by the connector, you can use the

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public boolean isZip() {
4444
return "zip".equalsIgnoreCase(getStringOption(Options.READ_FILES_COMPRESSION));
4545
}
4646

47-
boolean isGzip() {
47+
public boolean isGzip() {
4848
return "gzip".equalsIgnoreCase(getStringOption(Options.READ_FILES_COMPRESSION));
4949
}
5050

src/main/java/com/marklogic/spark/reader/file/FilePartition.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import org.apache.spark.sql.connector.read.InputPartition;
77

8+
import java.util.Arrays;
89
import java.util.List;
910

1011
public class FilePartition implements InputPartition {
@@ -13,6 +14,10 @@ public class FilePartition implements InputPartition {
1314

1415
private final List<String> paths;
1516

17+
public FilePartition(String path) {
18+
this.paths = Arrays.asList(path);
19+
}
20+
1621
public FilePartition(List<String> paths) {
1722
this.paths = paths;
1823
}

src/main/java/com/marklogic/spark/reader/file/GzipFileReader.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.spark.reader.file;
55

6+
import com.marklogic.client.io.InputStreamHandle;
67
import com.marklogic.spark.ConnectorException;
78
import com.marklogic.spark.Util;
89
import org.apache.commons.io.IOUtils;
@@ -19,17 +20,26 @@
1920
* Expects to read a single gzipped file and return a single row. May expand the scope of this later to expect multiple
2021
* files and to thus return multiple rows.
2122
*/
22-
class GzipFileReader implements PartitionReader<InternalRow> {
23+
public class GzipFileReader implements PartitionReader<InternalRow> {
2324

2425
private final FilePartition filePartition;
2526
private final FileContext fileContext;
27+
private final StreamingMode streamingMode;
2628

2729
private int nextFilePathIndex;
2830
private InternalRow rowToReturn;
2931

30-
GzipFileReader(FilePartition filePartition, FileContext fileContext) {
32+
// Only set if streaming during the writer phase.
33+
private InputStreamHandle streamingContentHandle;
34+
35+
public GzipFileReader(FilePartition filePartition, FileContext fileContext) {
36+
this(filePartition, fileContext, fileContext.isStreamingFiles() ? StreamingMode.STREAM_DURING_READER_PHASE : null);
37+
}
38+
39+
public GzipFileReader(FilePartition filePartition, FileContext fileContext, StreamingMode streamingMode) {
3140
this.filePartition = filePartition;
3241
this.fileContext = fileContext;
42+
this.streamingMode = streamingMode;
3343
}
3444

3545
@Override
@@ -40,32 +50,47 @@ public boolean next() {
4050

4151
String currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
4252
nextFilePathIndex++;
43-
InputStream gzipInputStream = null;
44-
try {
45-
gzipInputStream = fileContext.openFile(currentFilePath);
46-
byte[] content = extractGZIPContents(currentFilePath, gzipInputStream);
47-
String uri = makeURI(currentFilePath);
48-
this.rowToReturn = new GenericInternalRow(new Object[]{
49-
UTF8String.fromString(uri), ByteArray.concat(content),
50-
null, null, null, null, null, null
51-
});
52-
return true;
53-
} catch (RuntimeException ex) {
54-
if (fileContext.isReadAbortOnFailure()) {
55-
throw ex;
53+
String uri = makeURI(currentFilePath);
54+
55+
Object contentValue;
56+
if (StreamingMode.STREAM_DURING_READER_PHASE.equals(streamingMode)) {
57+
contentValue = FileUtil.serializeFileContext(fileContext, currentFilePath);
58+
uri = currentFilePath;
59+
} else if (StreamingMode.STREAM_DURING_WRITER_PHASE.equals(streamingMode)) {
60+
streamingContentHandle = new InputStreamHandle(fileContext.openFile(currentFilePath));
61+
contentValue = null;
62+
} else {
63+
InputStream gzipInputStream = null;
64+
try {
65+
gzipInputStream = fileContext.openFile(currentFilePath);
66+
byte[] content = extractGZIPContents(currentFilePath, gzipInputStream);
67+
contentValue = ByteArray.concat(content);
68+
} catch (RuntimeException ex) {
69+
if (fileContext.isReadAbortOnFailure()) {
70+
throw ex;
71+
}
72+
Util.MAIN_LOGGER.warn("Unable to read file at {}; cause: {}", currentFilePath, ex.getMessage());
73+
return next();
74+
} finally {
75+
IOUtils.closeQuietly(gzipInputStream);
5676
}
57-
Util.MAIN_LOGGER.warn("Unable to read file at {}; cause: {}", currentFilePath, ex.getMessage());
58-
return next();
59-
} finally {
60-
IOUtils.closeQuietly(gzipInputStream);
6177
}
78+
79+
this.rowToReturn = new GenericInternalRow(new Object[]{
80+
UTF8String.fromString(uri), contentValue, null, null, null, null, null, null
81+
});
82+
return true;
6283
}
6384

6485
@Override
6586
public InternalRow get() {
6687
return rowToReturn;
6788
}
6889

90+
public InputStreamHandle getStreamingContentHandle() {
91+
return streamingContentHandle;
92+
}
93+
6994
@Override
7095
public void close() {
7196
// Nothing to close.

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.IOException;
2323
import java.io.ObjectInputStream;
2424
import java.util.ArrayList;
25-
import java.util.Arrays;
2625
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.stream.Stream;
@@ -54,7 +53,7 @@ public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
5453
return Stream.of(new DocBuilder.DocumentInputs(uri, null, null, metadata)).iterator();
5554
}
5655

57-
return this.isStreamingFromFiles ? readContentFromFile(uri, row) : readContentFromRow(uri, row);
56+
return this.isStreamingFromFiles ? streamContentFromFile(uri, row) : readContentFromRow(uri, row);
5857
}
5958

6059
@Override
@@ -94,7 +93,7 @@ private JsonNode deserializeContentToJson(String initialUri, BytesHandle content
9493
* In a scenario where the user wants to stream a file into MarkLogic, the content column will contain a serialized
9594
* instance of {@code FileContext}, which is used to stream the file into a {@code InputStreamHandle}.
9695
*/
97-
private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath, InternalRow row) {
96+
private Iterator<DocBuilder.DocumentInputs> streamContentFromFile(String filePath, InternalRow row) {
9897
byte[] bytes = row.getBinary(1);
9998
FileContext fileContext;
10099
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
@@ -107,9 +106,13 @@ private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath,
107106
return buildIteratorForArchiveFile(filePath, fileContext);
108107
} else if (fileContext.isZip()) {
109108
return buildIteratorForZipFile(filePath, fileContext);
109+
} else if (fileContext.isGzip()) {
110+
return buildIteratorForGzipFile(filePath, fileContext);
110111
}
112+
return buildIteratorForGenericFile(row, filePath, fileContext);
113+
}
111114

112-
// If it's not an archive or normal zip file, we just have generic files that the user wants to stream.
115+
private Iterator<DocBuilder.DocumentInputs> buildIteratorForGenericFile(InternalRow row, String filePath, FileContext fileContext) {
113116
final String decodedPath = fileContext.decodeFilePath(filePath);
114117
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
115118
if (this.documentFormat != null) {
@@ -120,16 +123,27 @@ private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath,
120123
}
121124

122125
private Iterator<DocBuilder.DocumentInputs> buildIteratorForArchiveFile(String filePath, FileContext fileContext) {
123-
FilePartition filePartition = new FilePartition(Arrays.asList(filePath));
126+
FilePartition filePartition = new FilePartition(filePath);
124127
ArchiveFileReader archiveFileReader = new ArchiveFileReader(
125128
filePartition, fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE
126129
);
127130
return new ArchiveFileIterator(archiveFileReader, this.documentFormat);
128131
}
129132

130133
private Iterator<DocBuilder.DocumentInputs> buildIteratorForZipFile(String filePath, FileContext fileContext) {
131-
FilePartition filePartition = new FilePartition(Arrays.asList(filePath));
134+
FilePartition filePartition = new FilePartition(filePath);
132135
ZipFileReader reader = new ZipFileReader(filePartition, fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE);
133136
return new ZipFileIterator(reader, this.documentFormat);
134137
}
138+
139+
private Iterator<DocBuilder.DocumentInputs> buildIteratorForGzipFile(String filePath, FileContext fileContext) {
140+
GzipFileReader reader = new GzipFileReader(new FilePartition(filePath), fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE);
141+
reader.next();
142+
String uri = reader.get().getString(0);
143+
InputStreamHandle contentHandle = reader.getStreamingContentHandle();
144+
if (this.documentFormat != null) {
145+
contentHandle.withFormat(this.documentFormat);
146+
}
147+
return Stream.of(new DocBuilder.DocumentInputs(uri, contentHandle, null, null)).iterator();
148+
}
135149
}

src/main/java/com/marklogic/spark/writer/file/DocumentFileWriterFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
3232
if (this.schema.equals(TripleRowSchema.SCHEMA)) {
3333
return new RdfFileWriter(properties, hadoopConfiguration, partitionId);
3434
}
35+
3536
String compression = this.properties.get(Options.WRITE_FILES_COMPRESSION);
3637
if (compression != null && compression.length() > 0) {
37-
if ("zip".equalsIgnoreCase(compression)) {
38-
return new ZipFileWriter(properties, hadoopConfiguration, partitionId);
39-
}
40-
return new GzipFileWriter(properties, hadoopConfiguration);
38+
return "zip".equalsIgnoreCase(compression) ?
39+
new ZipFileWriter(properties, hadoopConfiguration, partitionId) :
40+
new GzipFileWriter(properties, hadoopConfiguration);
4141
}
42+
4243
return new DocumentFileWriter(properties, hadoopConfiguration);
4344
}
4445
}

src/test/java/com/marklogic/spark/reader/file/ReadGzipFilesTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@
33
*/
44
package com.marklogic.spark.reader.file;
55

6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.marklogic.junit5.XmlNode;
68
import com.marklogic.spark.AbstractIntegrationTest;
79
import com.marklogic.spark.ConnectorException;
810
import com.marklogic.spark.Options;
911
import org.apache.spark.SparkException;
1012
import org.apache.spark.sql.Dataset;
1113
import org.apache.spark.sql.Row;
14+
import org.apache.spark.sql.SaveMode;
1215
import org.junit.jupiter.api.Test;
1316

17+
import java.io.ByteArrayInputStream;
18+
import java.io.ObjectInputStream;
1419
import java.util.List;
1520

1621
import static org.junit.jupiter.api.Assertions.*;
@@ -30,7 +35,7 @@ void readThreeGZIPFiles() {
3035

3136
verifyRow(rows.get(0), "/src/test/resources/gzip-files/hello.xml", "<hello>world</hello>\n");
3237
verifyRow(rows.get(1), "/src/test/resources/gzip-files/level1/hello.txt", "hello world\n");
33-
verifyRow(rows.get(2), "/src/test/resources/gzip-files/level1/level2/hello.json", "{\"hello\":\"world\"}\n");
38+
verifyRow(rows.get(2), "/src/test/resources/gzip-files/level1/level2/hello world.json", "{\"hello\":\"world\"}\n");
3439
}
3540

3641
@Test
@@ -73,6 +78,46 @@ void dontAbortOnFailure() {
7378
"error for the non-gzipped mixed-files.zip file being logged as a warning but not causing a failure.");
7479
}
7580

81+
@Test
82+
void streamThreeGZIPFiles() throws Exception {
83+
Dataset<Row> dataset = newSparkSession().read()
84+
.format(CONNECTOR_IDENTIFIER)
85+
.option(Options.READ_FILES_COMPRESSION, "gzip")
86+
.option("recursiveFileLookup", "true")
87+
.option(Options.STREAM_FILES, true)
88+
.load("src/test/resources/gzip-files");
89+
90+
List<Row> rows = dataset.collectAsList();
91+
assertEquals(3, rows.size());
92+
for (Row row : rows) {
93+
assertFalse(row.isNullAt(0), "The URI column should be populated.");
94+
byte[] content = (byte[]) row.get(1);
95+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content))) {
96+
FileContext fileContext = (FileContext) ois.readObject();
97+
assertNotNull(fileContext);
98+
}
99+
}
100+
101+
// Write the streaming files to MarkLogic.
102+
dataset.write().format(CONNECTOR_IDENTIFIER)
103+
.option(Options.STREAM_FILES, true)
104+
.option(Options.CLIENT_URI, makeClientUri())
105+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
106+
.option(Options.WRITE_COLLECTIONS, "streamed-files")
107+
.option(Options.WRITE_URI_REPLACE, ".*gzip-files,'/gzip-files'")
108+
.mode(SaveMode.Append)
109+
.save();
110+
111+
assertCollectionSize("streamed-files", 3);
112+
XmlNode doc = readXmlDocument("/gzip-files/hello.xml");
113+
doc.assertElementValue("/hello", "world");
114+
115+
// Because each streamed file has to be sent via a PUT request, and the PUT endpoint does not allow spaces -
116+
// see MLE-17088 - the URI will be encoded.
117+
JsonNode node = readJsonDocument("/gzip-files/level1/level2/hello%20world.json");
118+
assertEquals("world", node.get("hello").asText());
119+
}
120+
76121
private void verifyRow(Row row, String expectedUriSuffix, String expectedContent) {
77122
String uri = row.getString(0);
78123
assertTrue(uri.endsWith(expectedUriSuffix), "Unexpected URI: " + uri);

0 commit comments

Comments
 (0)