Skip to content

Commit ddbd59d

Browse files
committed
DEVEXP-475 User can now force job to abort if write fails
1 parent 11ef5cc commit ddbd59d

File tree

7 files changed

+47
-14
lines changed

7 files changed

+47
-14
lines changed

docs/configuration.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,12 @@ information on how data is written to MarkLogic.
7777

7878
| Option | Description |
7979
| --- |-----------------------------------------------------------------------------------|
80+
| spark.marklogic.write.abortOnFailure | Whether the Spark job should abort if a batch fails to be written; defaults to `true`. |
8081
| spark.marklogic.write.batchSize | The number of documents written in a call to MarkLogic; defaults to 100. |
81-
| spark.marklogic.write.threadCount | The number of threads used within each partition to send documents to MarkLogic; defaults to 4. |
8282
| spark.marklogic.write.collections | Comma-delimited string of collection names to add to each document |
8383
| spark.marklogic.write.permissions | Comma-delimited string of role names and capabilities to add to each document - e.g. role1,read,role2,update,role3,execute |
8484
| spark.marklogic.write.temporalCollection | Name of a temporal collection to assign each document to |
85+
| spark.marklogic.write.threadCount | The number of threads used within each partition to send documents to MarkLogic; defaults to 4. |
8586
| spark.marklogic.write.transform | Name of a REST transform to apply to each document |
8687
| spark.marklogic.write.transformParams | Comma-delimited string of transform parameter names and values - e.g. param1,value1,param2,value2 |
8788
| spark.marklogic.write.transformParamsDelimiter | Delimiter to use instead of a command for the `transformParams` option |

docs/writing.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,10 @@ and during the writing of data to MarkLogic.
128128
For the first kind of error, the error will be immediately returned to the user and no data will have been written.
129129
Such errors are often due to misconfiguration of the connector options and should be fixable.
130130

131-
For the second kind of error, the error will eventually be returned to the user, usually within seconds of it
132-
occurring. The slight delay is due to the asynchronous nature of data being written by the connector. The error will
133-
be logged by the connector and the write operation will be aborted. Any batches of documents that were written
134-
successfully prior to the error occurring will still exist in the database.
131+
For the second kind of error, the connector defaults to logging the error and asking Spark to abort the entire write
132+
operation. Any batches of documents that were written successfully prior to the error occurring will still exist in the
133+
database. To configure the connector to only log the error and continue writing batches of documents to MarkLogic, set
134+
the `spark.marklogic.write.abortOnFailure` option to a value of `false`.
135135

136136
Similar to errors with reading data, the connector will strive to provide meaningful context when an error occurs to
137137
assist with debugging the cause of the error. Any errors that cannot be fixed via changes to the options passed to the

src/main/java/com/marklogic/spark/Options.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public interface Options {
2525

2626
String WRITE_BATCH_SIZE = "spark.marklogic.write.batchSize";
2727
String WRITE_THREAD_COUNT = "spark.marklogic.write.threadCount";
28+
String WRITE_ABORT_ON_FAILURE = "spark.marklogic.write.abortOnFailure";
2829

2930
String WRITE_COLLECTIONS = "spark.marklogic.write.collections";
3031
String WRITE_PERMISSIONS = "spark.marklogic.write.permissions";

src/main/java/com/marklogic/spark/writer/MarkLogicDataWriter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,12 @@ class MarkLogicDataWriter implements DataWriter<InternalRow> {
6262
this.databaseClient = writeContext.connectToMarkLogic();
6363
this.dataMovementManager = this.databaseClient.newDataMovementManager();
6464
this.writeBatcher = writeContext.newWriteBatcher(this.dataMovementManager);
65-
this.writeBatcher.onBatchFailure((batch, failure) -> {
66-
// Logging not needed here, as WriteBatcherImpl already logs this at the warning level.
67-
this.writeFailure.compareAndSet(null, failure);
68-
});
65+
if (writeContext.isAbortOnFailure()) {
66+
this.writeBatcher.onBatchFailure((batch, failure) -> {
67+
// Logging not needed here, as WriteBatcherImpl already logs this at the warning level.
68+
this.writeFailure.compareAndSet(null, failure);
69+
});
70+
}
6971
this.dataMovementManager.startJob(this.writeBatcher);
7072
}
7173

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public StructType getSchema() {
4242
return schema;
4343
}
4444

45-
public WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
45+
WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
4646
WriteBatcher writeBatcher = dataMovementManager
4747
.newWriteBatcher()
4848
.withBatchSize((int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1))
@@ -62,7 +62,7 @@ public WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
6262
return writeBatcher;
6363
}
6464

65-
public DocBuilder newDocBuilder() {
65+
DocBuilder newDocBuilder() {
6666
DocBuilderFactory factory = new DocBuilderFactory()
6767
.withCollections(getProperties().get(Options.WRITE_COLLECTIONS))
6868
.withPermissions(getProperties().get(Options.WRITE_PERMISSIONS));
@@ -85,6 +85,10 @@ public DocBuilder newDocBuilder() {
8585
return factory.newDocBuilder();
8686
}
8787

88+
boolean isAbortOnFailure() {
89+
return !"false".equalsIgnoreCase(getProperties().get(Options.WRITE_ABORT_ON_FAILURE));
90+
}
91+
8892
private void configureRestTransform(WriteBatcher writeBatcher) {
8993
String transformName = getProperties().get(Options.WRITE_TRANSFORM_NAME);
9094
if (transformName != null && transformName.trim().length() > 0) {

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
public class WriteRowsTest extends AbstractWriteTest {
3232

33+
private final static String TEMPORAL_COLLECTION = "temporal-collection";
34+
3335
@Test
3436
void defaultBatchSizeAndThreadCount() {
3537
newWriter().save();
@@ -58,15 +60,14 @@ void twoPartitions() {
5860

5961
@Test
6062
void temporalTest() {
61-
String temporalCollection = "temporal-collection";
6263
newWriterWithDefaultConfig("temporal-data.csv", 1)
63-
.option(Options.WRITE_TEMPORAL_COLLECTION, temporalCollection)
64+
.option(Options.WRITE_TEMPORAL_COLLECTION, TEMPORAL_COLLECTION)
6465
.save();
6566

6667
String uri = getUrisInCollection(COLLECTION, 1).get(0);
6768
// Temporal doc is written to the temporal collection; "latest" since it's the latest version for that URI;
6869
// and to a collection matching the URI of the document.
69-
assertInCollections(uri, COLLECTION, temporalCollection, "latest", uri);
70+
assertInCollections(uri, COLLECTION, TEMPORAL_COLLECTION, "latest", uri);
7071
}
7172

7273
@Test
@@ -176,6 +177,19 @@ void invalidPermissionsConfig() {
176177
"string: rest-reader,read,rest-writer", ex.getCause().getMessage());
177178
}
178179

180+
@Test
181+
void dontAbortOnFailure() {
182+
newWriterWithDefaultConfig("temporal-data-with-invalid-rows.csv", 1)
183+
.option(Options.WRITE_TEMPORAL_COLLECTION, TEMPORAL_COLLECTION)
184+
// Force each row in the CSV to be written in its own batch, ensuring that the one row that should succeed
185+
// will in fact succeed (if it's stuck in a batch with other bad rows, it'll fail too).
186+
.option(Options.WRITE_BATCH_SIZE, 1)
187+
.option(Options.WRITE_ABORT_ON_FAILURE, false)
188+
.save();
189+
190+
assertCollectionSize("9 of the batches should have failed, with the 10th batch succeeding", COLLECTION, 1);
191+
}
192+
179193
private void verifyFailureIsDueToLackOfPermission(SparkException ex) {
180194
assertTrue(ex.getCause() instanceof IOException, "Unexpected cause: " + ex.getCause().getClass());
181195
assertTrue(ex.getCause().getMessage().contains("Server Message: You do not have permission to this method and URL"),
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
id,content,systemStart,systemEnd,validStart,validEnd,columnWithOnlyWhitespace
2+
1,this should fail to write,,,,," "
3+
2,this should fail to write,,,,," "
4+
3,this should fail to write,,,,," "
5+
4,this should fail to write,,,,," "
6+
5,this should fail to write,,,,," "
7+
6,this should fail to write,,,,," "
8+
7,this should fail to write,,,,," "
9+
8,this should fail to write,,,,," "
10+
9,this should fail to write,,,,," "
11+
10,hello world,2014-04-03T11:00:00,2014-04-03T16:00:00,2014-04-03T11:00:00,2014-04-03T16:00:00," "

0 commit comments

Comments
 (0)