Skip to content

Commit 8e82315

Browse files
authored
Merge pull request #289 from marklogic/feature/streaming-put
MLE-17041 Making PUT call when streaming
2 parents 673de1a + 20379a9 commit 8e82315

File tree

2 files changed

+66
-10
lines changed

2 files changed

+66
-10
lines changed

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import com.marklogic.client.datamovement.DataMovementManager;
88
import com.marklogic.client.datamovement.WriteBatcher;
99
import com.marklogic.client.document.DocumentWriteOperation;
10+
import com.marklogic.client.document.GenericDocumentManager;
1011
import com.marklogic.client.impl.HandleAccessor;
1112
import com.marklogic.client.io.DocumentMetadataHandle;
1213
import com.marklogic.client.io.marker.AbstractWriteHandle;
14+
import com.marklogic.client.io.marker.GenericWriteHandle;
1315
import com.marklogic.spark.ConnectorException;
1416
import com.marklogic.spark.Options;
1517
import com.marklogic.spark.Util;
@@ -55,6 +57,10 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
5557

5658
private final RowConverter rowConverter;
5759

60+
private final boolean isStreamingFiles;
61+
// Only initialized if streaming files.
62+
private final GenericDocumentManager documentManager;
63+
5864
// Updated as batches are processed.
5965
private final AtomicInteger successItemCount = new AtomicInteger(0);
6066
private final AtomicInteger failedItemCount = new AtomicInteger(0);
@@ -65,6 +71,8 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
6571
this.docBuilder = this.writeContext.newDocBuilder();
6672
this.databaseClient = writeContext.connectToMarkLogic();
6773
this.rowConverter = determineRowConverter();
74+
this.isStreamingFiles = "true".equals(writeContext.getStringOption(Options.STREAM_FILES));
75+
this.documentManager = this.isStreamingFiles ? databaseClient.newDocumentManager() : null;
6876

6977
if (writeContext.isAbortOnFailure()) {
7078
this.batchRetrier = null;
@@ -86,7 +94,12 @@ public void write(InternalRow row) {
8694
throwWriteFailureIfExists();
8795
Optional<DocBuilder.DocumentInputs> document = rowConverter.convertRow(row);
8896
if (document.isPresent()) {
89-
this.writeBatcher.add(this.docBuilder.build(document.get()));
97+
DocumentWriteOperation writeOp = this.docBuilder.build(document.get());
98+
if (this.isStreamingFiles) {
99+
writeDocumentViaPutOperation(writeOp);
100+
} else {
101+
this.writeBatcher.add(writeOp);
102+
}
90103
}
91104
}
92105

@@ -183,8 +196,7 @@ private BatchRetrier makeBatchRetrier() {
183196
writeContext.getStringOption(Options.WRITE_TEMPORAL_COLLECTION),
184197
successfulBatch -> successItemCount.getAndAdd(successfulBatch.size()),
185198
(failedDoc, failure) -> {
186-
Util.MAIN_LOGGER.error("Unable to write document with URI: {}; cause: {}", failedDoc.getUri(), failure.getMessage());
187-
failedItemCount.incrementAndGet();
199+
captureFailure(failure.getMessage(), failedDoc.getUri());
188200
if (this.archiveWriter != null) {
189201
writeFailedDocumentToArchive(failedDoc);
190202
}
@@ -234,4 +246,28 @@ private void closeArchiveWriter() {
234246
archiveWriter.close();
235247
}
236248
}
249+
250+
/**
251+
* A user typically chooses to stream a document due to its size. A PUT call to v1/documents can handle a document
252+
* of any size. But a POST call seems to have a limitation due to the multipart nature of the request - the body
253+
* part appears to be read into memory, which can cause the server to run out of memory. So for streaming, a PUT
254+
* call is made, which means we don't use the WriteBatcher.
255+
*
256+
* @param writeOp
257+
*/
258+
private void writeDocumentViaPutOperation(DocumentWriteOperation writeOp) {
259+
final String uri = writeOp.getUri();
260+
try {
261+
this.documentManager.write(uri, writeOp.getMetadata(), (GenericWriteHandle) writeOp.getContent());
262+
this.successItemCount.incrementAndGet();
263+
} catch (RuntimeException ex) {
264+
captureFailure(ex.getMessage(), uri);
265+
this.writeFailure.compareAndSet(null, ex);
266+
}
267+
}
268+
269+
private void captureFailure(String message, String documentUri) {
270+
Util.MAIN_LOGGER.error("Unable to write document with URI: {}; cause: {}", documentUri, message);
271+
failedItemCount.incrementAndGet();
272+
}
237273
}

src/test/java/com/marklogic/spark/reader/file/ReadGenericFilesStreamingTest.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,26 @@
44
package com.marklogic.spark.reader.file;
55

66
import com.marklogic.spark.AbstractIntegrationTest;
7+
import com.marklogic.spark.ConnectorException;
78
import com.marklogic.spark.Options;
9+
import org.apache.spark.sql.DataFrameWriter;
810
import org.apache.spark.sql.Dataset;
911
import org.apache.spark.sql.Row;
12+
import org.apache.spark.sql.SaveMode;
1013
import org.junit.jupiter.api.Test;
1114

1215
import java.io.ByteArrayInputStream;
1316
import java.io.ObjectInputStream;
1417

15-
import static org.junit.jupiter.api.Assertions.assertEquals;
16-
import static org.junit.jupiter.api.Assertions.assertNotNull;
18+
import static org.junit.jupiter.api.Assertions.*;
1719

20+
/**
21+
* In this context, "streaming" != Spark Structured Streaming, but rather avoiding reading the contents of a file
22+
* into memory by postponing reading of the file until the writer phase, where it can then be streamed from disk into
23+
* MarkLogic.
24+
*/
1825
class ReadGenericFilesStreamingTest extends AbstractIntegrationTest {
1926

20-
/**
21-
* In this context, "streaming" != Spark Structured Streaming, but rather avoiding reading the contents of a file
22-
* into memory by postponing reading of the file until the writer phase, where it can then be streamed from disk into
23-
* MarkLogic.
24-
*/
2527
@Test
2628
void stream() throws Exception {
2729
Dataset<Row> dataset = newSparkSession().read().format(CONNECTOR_IDENTIFIER)
@@ -41,6 +43,24 @@ void stream() throws Exception {
4143
"of tests.", "streamed-files", 4);
4244
}
4345

46+
@Test
47+
void handleFailureWhileStreaming() {
48+
DataFrameWriter writer = newSparkSession()
49+
.read().format(CONNECTOR_IDENTIFIER)
50+
.option(Options.STREAM_FILES, true)
51+
.load("src/test/resources/mixed-files/hello.json")
52+
.write().format(CONNECTOR_IDENTIFIER)
53+
.option(Options.STREAM_FILES, true)
54+
.option(Options.CLIENT_URI, makeClientUri())
55+
.option(Options.WRITE_PERMISSIONS, "not-an-actual-role,read")
56+
.mode(SaveMode.Append);
57+
58+
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
59+
assertTrue(ex.getMessage().contains("SEC-ROLEDNE: xdmp:role(\"not-an-actual-role\")"),
60+
"This verifies that when the connector uses GenericDocumentManager to PUT a single document, any error " +
61+
"is still wrapped in a ConnectorException. Actual error message: " + ex.getMessage());
62+
}
63+
4464
private void verifyEachRowHasFileContextAsItsContent(Dataset<Row> dataset) throws Exception {
4565
for (Row row : dataset.collectAsList()) {
4666
byte[] content = (byte[]) row.get(1);

0 commit comments

Comments
 (0)