Skip to content

Commit 673de1a

Browse files
authored
Merge pull request #288 from marklogic/feature/streaming-file-write
MLE-17041 Can now stream when writing generic files
2 parents f0d58bb + 3370ca7 commit 673de1a

File tree

7 files changed

+101
-15
lines changed

7 files changed

+101
-15
lines changed

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class ContextSupport implements Serializable {
2525
// client. Those two actions are rarely done, so the cost of synchronization will be negligible.
2626
private static final Object CLIENT_LOCK = new Object();
2727

28-
protected ContextSupport(Map<String, String> properties) {
28+
public ContextSupport(Map<String, String> properties) {
2929
this.properties = properties;
3030
this.configuratorWasAdded = addOkHttpConfiguratorIfNecessary();
3131
}

src/main/java/com/marklogic/spark/Options.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ public abstract class Options {
155155
* into the content column instead of the contents of the file. When used during the writer phase when writing rows
156156
* conforming to {@code DocumentRowSchema}, the connector will stream the file using the {@code FileContext} to
157157
* avoid reading its contents into memory.
158+
* <p>
159+
* Similarly, when used in the reader phase when reading documents from MarkLogic, the value of the 'content' column
160+
* in each row will be null. During the writer phase, the connector will retrieve the document corresponding to the
161+
* value in the 'uri' column and stream it to file.
158162
*
159163
* @since 2.4.0
160164
*/

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ Set<DocumentManager.Metadata> getRequestedMetadata() {
4343
}
4444

4545
boolean contentWasRequested() {
46+
if ("true".equals(getStringOption(Options.STREAM_FILES))) {
47+
return false;
48+
}
4649
if (!hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
4750
return true;
4851
}

src/main/java/com/marklogic/spark/reader/document/DocumentScanBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
*/
44
package com.marklogic.spark.reader.document;
55

6+
import com.marklogic.spark.Options;
7+
import com.marklogic.spark.Util;
68
import org.apache.spark.sql.connector.read.Scan;
79
import org.apache.spark.sql.connector.read.ScanBuilder;
810
import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
@@ -15,6 +17,9 @@ class DocumentScanBuilder implements ScanBuilder, SupportsPushDownLimit {
1517

1618
DocumentScanBuilder(CaseInsensitiveStringMap options, StructType schema) {
1719
this.context = new DocumentContext(options, schema);
20+
if ("true".equalsIgnoreCase(this.context.getStringOption(Options.STREAM_FILES)) && Util.MAIN_LOGGER.isInfoEnabled()) {
21+
Util.MAIN_LOGGER.info("Will defer reading documents from MarkLogic so they can be streamed to files during the writer phase.");
22+
}
1823
}
1924

2025
@Override

src/main/java/com/marklogic/spark/writer/file/ContentWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
import com.fasterxml.jackson.databind.JsonNode;
77
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.marklogic.client.document.GenericDocumentManager;
9+
import com.marklogic.client.io.BytesHandle;
810
import com.marklogic.spark.ConnectorException;
11+
import com.marklogic.spark.ContextSupport;
912
import com.marklogic.spark.Options;
1013
import com.marklogic.spark.reader.document.DocumentRowSchema;
1114
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,6 +35,9 @@ class ContentWriter {
3235
private final boolean prettyPrint;
3336
private final Charset encoding;
3437

38+
// Only set when streaming.
39+
private final GenericDocumentManager documentManager;
40+
3541
ContentWriter(Map<String, String> properties) {
3642
this.encoding = determineEncoding(properties);
3743
this.prettyPrint = "true".equalsIgnoreCase(properties.get(Options.WRITE_FILES_PRETTY_PRINT));
@@ -42,19 +48,22 @@ class ContentWriter {
4248
this.transformer = null;
4349
this.objectMapper = null;
4450
}
51+
52+
this.documentManager = "true".equalsIgnoreCase(properties.get(Options.STREAM_FILES)) ?
53+
new ContextSupport(properties).connectToMarkLogic().newDocumentManager() : null;
4554
}
4655

4756
void writeContent(InternalRow row, OutputStream outputStream) throws IOException {
4857
if (this.prettyPrint) {
4958
prettyPrintContent(row, outputStream);
5059
} else {
51-
byte[] bytes = row.getBinary(1);
60+
byte[] bytes = getContentBytes(row);
5261
if (this.encoding != null) {
5362
// We know the string from MarkLogic is UTF-8, so we use getBytes to convert it to the user's
5463
// specified encoding (as opposed to new String(bytes, encoding)).
5564
outputStream.write(new String(bytes).getBytes(this.encoding));
5665
} else {
57-
outputStream.write(row.getBinary(1));
66+
outputStream.write(bytes);
5867
}
5968
}
6069
}
@@ -107,7 +116,7 @@ private Transformer newTransformer() {
107116
}
108117

109118
private void prettyPrintContent(InternalRow row, OutputStream outputStream) throws IOException {
110-
final byte[] content = row.getBinary(1);
119+
final byte[] content = getContentBytes(row);
111120
final String format = row.isNullAt(2) ? null : row.getString(2);
112121
if ("JSON".equalsIgnoreCase(format)) {
113122
prettyPrintJson(content, outputStream);
@@ -141,4 +150,12 @@ private void prettyPrintXml(byte[] content, OutputStream outputStream) {
141150
throw new ConnectorException(String.format("Unable to pretty print XML; cause: %s", e.getMessage()), e);
142151
}
143152
}
153+
154+
private byte[] getContentBytes(InternalRow row) {
155+
if (this.documentManager != null) {
156+
String uri = row.getString(0);
157+
return documentManager.read(uri, new BytesHandle()).get();
158+
}
159+
return row.getBinary(1);
160+
}
144161
}

src/test/java/com/marklogic/spark/writer/file/PrettyPrintFilesTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
2223

2324
class PrettyPrintFilesTest extends AbstractIntegrationTest {
2425

@@ -108,4 +109,28 @@ void notPrettyPrinted(@TempDir Path tempDir) throws IOException {
108109
String doc2 = FileUtils.readFileToString(new File(dir, "doc2.json"), "UTF-8");
109110
assertEquals("{\"hello\":\"world\"}", doc2);
110111
}
112+
113+
@Test
114+
void notSupportedWhenStreaming(@TempDir Path tempDir) throws Exception {
115+
newSparkSession().read()
116+
.format(CONNECTOR_IDENTIFIER)
117+
.option(Options.CLIENT_URI, makeClientUri())
118+
.option(Options.STREAM_FILES, true)
119+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "pretty-print")
120+
.load()
121+
.write()
122+
.format(CONNECTOR_IDENTIFIER)
123+
.option(Options.CLIENT_URI, makeClientUri())
124+
.option(Options.STREAM_FILES, true)
125+
.option(Options.WRITE_FILES_PRETTY_PRINT, "true")
126+
.mode(SaveMode.Append)
127+
.save(tempDir.toFile().getAbsolutePath());
128+
129+
File dir = new File(tempDir.toFile(), "pretty-print");
130+
String doc1 = FileUtils.readFileToString(new File(dir, "doc1.xml"), "UTF-8");
131+
assertTrue(doc1.contains("<root><hello>world</hello></root>"),
132+
"pretty-printed is not supported when streaming documents, as pretty-printing requires reading the " +
133+
"document into memory, which conflicts with streaming. So the XML doc should be on a single line. " +
134+
"Actual doc: " + doc1);
135+
}
111136
}

src/test/java/com/marklogic/spark/writer/file/WriteDocumentFilesTest.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.marklogic.spark.AbstractIntegrationTest;
1212
import com.marklogic.spark.Options;
1313
import com.marklogic.spark.TestUtil;
14+
import org.apache.spark.sql.Dataset;
15+
import org.apache.spark.sql.Row;
1416
import org.apache.spark.sql.SaveMode;
1517
import org.junit.jupiter.api.Test;
1618
import org.junit.jupiter.api.io.TempDir;
@@ -40,18 +42,33 @@ void writeFifteenAuthorFiles(@TempDir Path tempDir) throws Exception {
4042
.mode(SaveMode.Append)
4143
.save(tempDir.toFile().getAbsolutePath());
4244

43-
for (int i = 1; i <= 15; i++) {
44-
File expectedFile = Paths.get(
45-
tempDir.toFile().getAbsolutePath(),
46-
"author", "author" + i + ".json"
47-
).toFile();
48-
assertTrue(expectedFile.exists(), "Expected file at: " + expectedFile);
45+
verifyAuthorFilesWereCorrectlyWritten(tempDir);
46+
}
4947

50-
// Verify the JSON is valid.
51-
JsonNode doc = objectMapper.readTree(expectedFile);
52-
assertTrue(doc.has("CitationID"));
53-
assertTrue(doc.has("LastName"));
54-
}
48+
@Test
49+
void streamAuthorDocuments(@TempDir Path tempDir) throws Exception {
50+
Dataset<Row> dataset = newSparkSession().read()
51+
.format(CONNECTOR_IDENTIFIER)
52+
.option(Options.CLIENT_URI, makeClientUri())
53+
.option(Options.STREAM_FILES, true)
54+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
55+
.load();
56+
57+
assertEquals(15, dataset.count());
58+
59+
dataset.collectAsList().forEach(row -> assertTrue(row.isNullAt(1),
60+
"When the 'stream files' option is used when reading documents, the 'content' column should be null " +
61+
"for each row. When each row is written to a file, the document corresponding to the URI in the " +
62+
"'uri' column should be retrieved and streamed to file, thus avoiding ever reading the entire " +
63+
"document into memory."));
64+
65+
dataset.write().format(CONNECTOR_IDENTIFIER)
66+
.option(Options.CLIENT_URI, makeClientUri())
67+
.option(Options.STREAM_FILES, true)
68+
.mode(SaveMode.Append)
69+
.save(tempDir.toFile().getAbsolutePath());
70+
71+
verifyAuthorFilesWereCorrectlyWritten(tempDir);
5572
}
5673

5774
@Test
@@ -119,4 +136,19 @@ void uriHasSpace(@TempDir Path tempDir) {
119136
"due to a space), the error should be logged and the file should be written with its unaltered " +
120137
"document URI used for the file path.");
121138
}
139+
140+
private void verifyAuthorFilesWereCorrectlyWritten(Path tempDir) throws Exception {
141+
for (int i = 1; i <= 15; i++) {
142+
File expectedFile = Paths.get(
143+
tempDir.toFile().getAbsolutePath(),
144+
"author", "author" + i + ".json"
145+
).toFile();
146+
assertTrue(expectedFile.exists(), "Expected file at: " + expectedFile);
147+
148+
// Verify the JSON is valid.
149+
JsonNode doc = objectMapper.readTree(expectedFile);
150+
assertTrue(doc.has("CitationID"));
151+
assertTrue(doc.has("LastName"));
152+
}
153+
}
122154
}

0 commit comments

Comments
 (0)