Skip to content

Commit f0d58bb

Browse files
authored
Merge pull request #286 from marklogic/feature/streaming-file-read
MLE-17041 Can now stream when reading generic files
2 parents 945fac2 + 1cb97b0 commit f0d58bb

File tree

5 files changed

+159
-18
lines changed

5 files changed

+159
-18
lines changed

src/main/java/com/marklogic/spark/MarkLogicFileTable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ class MarkLogicFileTable extends FileTable {
4242

4343
@Override
4444
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
45+
if ("true".equalsIgnoreCase(options.get(Options.STREAM_FILES)) && Util.MAIN_LOGGER.isInfoEnabled()) {
46+
Util.MAIN_LOGGER.info("Will defer reading of file contents so they can be streamed during the writer phase.");
47+
}
4548
return new FileScanBuilder(options.asCaseSensitiveMap(), super.fileIndex());
4649
}
4750

src/main/java/com/marklogic/spark/Options.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ public abstract class Options {
150150
public static final String WRITE_RDF_FILES_FORMAT = "spark.marklogic.write.files.rdf.format";
151151
public static final String WRITE_RDF_FILES_GRAPH = "spark.marklogic.write.files.rdf.graph";
152152

153+
/**
154+
* When used in the reader phase while reading generic files, the connector will put a serialized {@code FileContext}
155+
* into the content column instead of the contents of the file. When used during the writer phase when writing rows
156+
* conforming to {@code DocumentRowSchema}, the connector will stream the file using the {@code FileContext} to
157+
* avoid reading its contents into memory.
158+
*
159+
* @since 2.4.0
160+
*/
161+
public static final String STREAM_FILES = "spark.marklogic.files.stream";
162+
153163
private Options() {
154164
}
155165
}

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
package com.marklogic.spark.reader.file;
55

66
import com.marklogic.spark.ConnectorException;
7+
import com.marklogic.spark.Options;
78
import com.marklogic.spark.Util;
89
import org.apache.spark.sql.catalyst.InternalRow;
910
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1011
import org.apache.spark.sql.connector.read.PartitionReader;
1112
import org.apache.spark.unsafe.types.ByteArray;
1213
import org.apache.spark.unsafe.types.UTF8String;
1314

15+
import java.io.ByteArrayOutputStream;
1416
import java.io.IOException;
1517
import java.io.InputStream;
18+
import java.io.ObjectOutputStream;
1619

1720
/**
1821
* "Generic" = read each file as-is with no special processing.
@@ -21,13 +24,15 @@ class GenericFileReader implements PartitionReader<InternalRow> {
2124

2225
private final FilePartition filePartition;
2326
private final FileContext fileContext;
27+
private final boolean isStreaming;
2428

2529
private InternalRow nextRowToReturn;
2630
private int filePathIndex;
2731

2832
GenericFileReader(FilePartition filePartition, FileContext fileContext) {
2933
this.filePartition = filePartition;
3034
this.fileContext = fileContext;
35+
this.isStreaming = "true".equalsIgnoreCase(fileContext.getStringOption(Options.STREAM_FILES));
3136
}
3237

3338
@Override
@@ -39,14 +44,13 @@ public boolean next() {
3944
final String path = filePartition.getPaths().get(filePathIndex);
4045
filePathIndex++;
4146
try {
42-
try (InputStream inputStream = fileContext.openFile(path)) {
43-
byte[] content = fileContext.readBytes(inputStream);
44-
nextRowToReturn = new GenericInternalRow(new Object[]{
45-
UTF8String.fromString(path),
46-
ByteArray.concat(content),
47-
null, null, null, null, null, null
48-
});
49-
}
47+
byte[] content = this.isStreaming ? serializeFileContext() : readFileIntoByteArray(path);
48+
49+
nextRowToReturn = new GenericInternalRow(new Object[]{
50+
UTF8String.fromString(path),
51+
ByteArray.concat(content),
52+
null, null, null, null, null, null
53+
});
5054
} catch (Exception ex) {
5155
String message = String.format("Unable to read file at %s; cause: %s", path, ex.getMessage());
5256
if (fileContext.isReadAbortOnFailure()) {
@@ -67,4 +71,18 @@ public InternalRow get() {
6771
public void close() throws IOException {
6872
// Nothing to close.
6973
}
74+
75+
private byte[] readFileIntoByteArray(String path) throws IOException {
76+
try (InputStream inputStream = fileContext.openFile(path)) {
77+
return fileContext.readBytes(inputStream);
78+
}
79+
}
80+
81+
private byte[] serializeFileContext() throws IOException {
82+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
83+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
84+
oos.writeObject(fileContext);
85+
}
86+
return baos.toByteArray();
87+
}
7088
}

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@
99
import com.marklogic.client.io.BytesHandle;
1010
import com.marklogic.client.io.DocumentMetadataHandle;
1111
import com.marklogic.client.io.Format;
12+
import com.marklogic.client.io.InputStreamHandle;
13+
import com.marklogic.client.io.marker.AbstractWriteHandle;
14+
import com.marklogic.spark.ConnectorException;
1215
import com.marklogic.spark.Options;
1316
import com.marklogic.spark.reader.document.DocumentRowSchema;
17+
import com.marklogic.spark.reader.file.FileContext;
1418
import org.apache.spark.sql.catalyst.InternalRow;
1519

20+
import java.io.ByteArrayInputStream;
1621
import java.io.IOException;
22+
import java.io.ObjectInputStream;
1723
import java.util.ArrayList;
1824
import java.util.List;
1925
import java.util.Optional;
@@ -26,11 +32,13 @@ class DocumentRowConverter implements RowConverter {
2632
private final ObjectMapper objectMapper;
2733
private final String uriTemplate;
2834
private final Format documentFormat;
35+
private final boolean isStreamingFromFiles;
2936

3037
DocumentRowConverter(WriteContext writeContext) {
3138
this.uriTemplate = writeContext.getStringOption(Options.WRITE_URI_TEMPLATE);
3239
this.documentFormat = writeContext.getDocumentFormat();
3340
this.objectMapper = new ObjectMapper();
41+
this.isStreamingFromFiles = writeContext.hasOption(Options.STREAM_FILES);
3442
}
3543

3644
@Override
@@ -43,25 +51,34 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
4351
return Optional.of(new DocBuilder.DocumentInputs(uri, null, null, metadata));
4452
}
4553

46-
final BytesHandle content = new BytesHandle(row.getBinary(1));
47-
if (this.documentFormat != null) {
48-
content.withFormat(this.documentFormat);
49-
}
54+
Content content = this.isStreamingFromFiles ?
55+
readContentFromFile(uri, row) :
56+
readContentFromRow(uri, row);
5057

51-
JsonNode uriTemplateValues = null;
52-
if (this.uriTemplate != null && this.uriTemplate.trim().length() > 0) {
53-
String format = row.isNullAt(2) ? null : row.getString(2);
54-
uriTemplateValues = deserializeContentToJson(uri, content, format);
55-
}
5658
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
57-
return Optional.of(new DocBuilder.DocumentInputs(uri, content, uriTemplateValues, metadata));
59+
return Optional.of(new DocBuilder.DocumentInputs(
60+
uri, content.contentHandle, content.uriTemplateValues, metadata)
61+
);
5862
}
5963

6064
@Override
6165
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
6266
return new ArrayList<>();
6367
}
6468

69+
private Content readContentFromRow(String uri, InternalRow row) {
70+
BytesHandle bytesHandle = new BytesHandle(row.getBinary(1));
71+
if (this.documentFormat != null) {
72+
bytesHandle.withFormat(this.documentFormat);
73+
}
74+
JsonNode uriTemplateValues = null;
75+
if (this.uriTemplate != null && this.uriTemplate.trim().length() > 0) {
76+
String format = row.isNullAt(2) ? null : row.getString(2);
77+
uriTemplateValues = deserializeContentToJson(uri, bytesHandle, format);
78+
}
79+
return new Content(bytesHandle, uriTemplateValues);
80+
}
81+
6582
private JsonNode deserializeContentToJson(String initialUri, BytesHandle contentHandle, String format) {
6683
try {
6784
return objectMapper.readTree(contentHandle.get());
@@ -75,4 +92,41 @@ private JsonNode deserializeContentToJson(String initialUri, BytesHandle content
7592
return values;
7693
}
7794
}
95+
96+
/**
97+
* In a scenario where the user wants to stream a file into MarkLogic, the content column will contain a serialized
98+
* instance of {@code FileContext}, which is used to stream the file into a {@code InputStreamHandle}.
99+
*/
100+
private Content readContentFromFile(String uri, InternalRow row) {
101+
byte[] bytes = row.getBinary(1);
102+
try {
103+
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
104+
FileContext fileContext = (FileContext) ois.readObject();
105+
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(uri));
106+
if (this.documentFormat != null) {
107+
streamHandle.withFormat(this.documentFormat);
108+
}
109+
return new Content(streamHandle, null);
110+
} catch (Exception e) {
111+
throw new ConnectorException(String.format("Unable to read from file %s; cause: %s", uri, e.getMessage()));
112+
}
113+
}
114+
115+
private static class Content {
116+
private final AbstractWriteHandle contentHandle;
117+
private final JsonNode uriTemplateValues;
118+
119+
public Content(AbstractWriteHandle contentHandle, JsonNode uriTemplateValues) {
120+
this.contentHandle = contentHandle;
121+
this.uriTemplateValues = uriTemplateValues;
122+
}
123+
124+
AbstractWriteHandle getContentHandle() {
125+
return contentHandle;
126+
}
127+
128+
JsonNode getUriTemplateValues() {
129+
return uriTemplateValues;
130+
}
131+
}
78132
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
import com.marklogic.spark.AbstractIntegrationTest;
7+
import com.marklogic.spark.Options;
8+
import org.apache.spark.sql.Dataset;
9+
import org.apache.spark.sql.Row;
10+
import org.junit.jupiter.api.Test;
11+
12+
import java.io.ByteArrayInputStream;
13+
import java.io.ObjectInputStream;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertNotNull;
17+
18+
class ReadGenericFilesStreamingTest extends AbstractIntegrationTest {
19+
20+
/**
21+
* In this context, "streaming" != Spark Structured Streaming, but rather avoiding reading the contents of a file
22+
* into memory by postponing reading of the file until the writer phase, where it can then be streamed from disk into
23+
* MarkLogic.
24+
*/
25+
@Test
26+
void stream() throws Exception {
27+
Dataset<Row> dataset = newSparkSession().read().format(CONNECTOR_IDENTIFIER)
28+
.option(Options.STREAM_FILES, true)
29+
.load("src/test/resources/mixed-files");
30+
31+
assertEquals(4, dataset.count());
32+
verifyEachRowHasFileContextAsItsContent(dataset);
33+
34+
defaultWrite(dataset.write().format(CONNECTOR_IDENTIFIER)
35+
.option(Options.STREAM_FILES, true)
36+
.option(Options.WRITE_COLLECTIONS, "streamed-files")
37+
.option(Options.WRITE_URI_REPLACE, ".*/mixed-files,''"));
38+
39+
assertCollectionSize("This verifies that enabling streaming does not break any functionality. We don't " +
40+
"have a test for a file large enough to warrant streaming as that would drastically slow down the suite " +
41+
"of tests.", "streamed-files", 4);
42+
}
43+
44+
private void verifyEachRowHasFileContextAsItsContent(Dataset<Row> dataset) throws Exception {
45+
for (Row row : dataset.collectAsList()) {
46+
byte[] content = (byte[]) row.get(1);
47+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content))) {
48+
FileContext fileContext = (FileContext) ois.readObject();
49+
assertNotNull(fileContext, "To enable streaming of files, the content column should not " +
50+
"contain the contents of the file, which forces reading the entire file into memory. " +
51+
"Instead, the associated FileContext - containing the Hadoop SerializableConfiguration class - " +
52+
"should be serialized so that it can be used to read the file during the writer phase.");
53+
}
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)