Skip to content

Commit 0c4b1aa

Browse files
authored
Merge pull request #165 from marklogic/feature/12357-metrics
MLE-12357 Improved logging of success/failure counts
2 parents 9c00ab4 + b6d2ed9 commit 0c4b1aa

File tree

6 files changed

+85
-34
lines changed

6 files changed

+85
-34
lines changed

src/main/java/com/marklogic/spark/writer/CommitMessage.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,34 @@
44

55
public class CommitMessage implements WriterCommitMessage {
66

7-
private final String action;
8-
private final int docCount;
7+
private final int successItemCount;
8+
private final int failedItemCount;
99
private final int partitionId;
1010
private final long taskId;
1111
private final long epochId;
1212

13-
public CommitMessage(String action, int docCount, int partitionId, long taskId, long epochId) {
14-
this.action = action;
15-
this.docCount = docCount;
13+
public CommitMessage(int successItemCount, int failedItemCount, int partitionId, long taskId, long epochId) {
14+
this.successItemCount = successItemCount;
15+
this.failedItemCount = failedItemCount;
1616
this.partitionId = partitionId;
1717
this.taskId = taskId;
1818
this.epochId = epochId;
1919
}
2020

21-
public String getAction() {
22-
return action;
21+
public int getSuccessItemCount() {
22+
return successItemCount;
2323
}
2424

25-
public int getDocCount() {
26-
return docCount;
25+
public int getFailedItemCount() {
26+
return failedItemCount;
2727
}
2828

2929
@Override
3030
public String toString() {
3131
return epochId != 0L ?
32-
String.format("[partitionId: %d; taskId: %d; epochId: %d]; docCount: %d", partitionId, taskId, epochId, docCount) :
33-
String.format("[partitionId: %d; taskId: %d]; docCount: %d", partitionId, taskId, docCount);
32+
String.format("[partitionId: %d; taskId: %d; epochId: %d]; docCount: %d; failedItemCount: %d",
33+
partitionId, taskId, epochId, successItemCount, failedItemCount) :
34+
String.format("[partitionId: %d; taskId: %d]; docCount: %d; failedItemCount: %d",
35+
partitionId, taskId, successItemCount, failedItemCount);
3436
}
3537
}

src/main/java/com/marklogic/spark/writer/MarkLogicWrite.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import java.util.Arrays;
32+
import java.util.function.Consumer;
3233

33-
class MarkLogicWrite implements BatchWrite, StreamingWrite {
34+
public class MarkLogicWrite implements BatchWrite, StreamingWrite {
3435

3536
private static final Logger logger = LoggerFactory.getLogger("com.marklogic.spark");
3637

3738
private WriteContext writeContext;
3839

40+
// Used solely for testing. Will never be populated in a real world scenario.
41+
public static Consumer<Integer> successCountConsumer;
42+
public static Consumer<Integer> failureCountConsumer;
43+
3944
MarkLogicWrite(WriteContext writeContext) {
4045
this.writeContext = writeContext;
4146
}
@@ -57,13 +62,24 @@ public void commit(WriterCommitMessage[] messages) {
5762
if (logger.isDebugEnabled()) {
5863
logger.debug("Commit messages received: {}", Arrays.asList(messages));
5964
}
60-
final String action = ((CommitMessage) messages[0]).getAction();
65+
int successCount = 0;
66+
int failureCount = 0;
67+
for (WriterCommitMessage message : messages) {
68+
CommitMessage msg = (CommitMessage)message;
69+
successCount += msg.getSuccessItemCount();
70+
failureCount += msg.getFailedItemCount();
71+
}
72+
if (successCountConsumer != null) {
73+
successCountConsumer.accept(successCount);
74+
}
75+
if (failureCountConsumer != null) {
76+
failureCountConsumer.accept(failureCount);
77+
}
6178
if (Util.MAIN_LOGGER.isInfoEnabled()) {
62-
int count = 0;
63-
for (WriterCommitMessage message : messages) {
64-
count += ((CommitMessage) message).getDocCount();
65-
}
66-
Util.MAIN_LOGGER.info("{} {} documents", action, count);
79+
Util.MAIN_LOGGER.info("Success count: {}", successCount);
80+
}
81+
if (failureCount > 0) {
82+
Util.MAIN_LOGGER.error("Failure count: {}", failureCount);
6783
}
6884
}
6985
}

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.io.IOException;
30+
import java.util.concurrent.atomic.AtomicInteger;
3031
import java.util.concurrent.atomic.AtomicReference;
3132
import java.util.function.Function;
3233

@@ -51,7 +52,9 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
5152

5253
private final Function<InternalRow, DocBuilder.DocumentInputs> rowToDocumentFunction;
5354

54-
private int docCount;
55+
// Updated as batches are processed.
56+
private AtomicInteger successItemCount = new AtomicInteger(0);
57+
private AtomicInteger failedItemCount = new AtomicInteger(0);
5558

5659
WriteBatcherDataWriter(WriteContext writeContext, int partitionId, long taskId, long epochId) {
5760
this.writeContext = writeContext;
@@ -63,12 +66,7 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
6366
this.databaseClient = writeContext.connectToMarkLogic();
6467
this.dataMovementManager = this.databaseClient.newDataMovementManager();
6568
this.writeBatcher = writeContext.newWriteBatcher(this.dataMovementManager);
66-
if (writeContext.isAbortOnFailure()) {
67-
this.writeBatcher.onBatchFailure((batch, failure) ->
68-
// Logging not needed here, as WriteBatcherImpl already logs this at the warning level.
69-
this.writeFailure.compareAndSet(null, failure)
70-
);
71-
}
69+
addBatchListeners();
7270
this.dataMovementManager.startJob(this.writeBatcher);
7371

7472
if (writeContext.isUsingFileSchema()) {
@@ -85,16 +83,15 @@ public void write(InternalRow row) throws IOException {
8583
throwWriteFailureIfExists();
8684
DocBuilder.DocumentInputs inputs = rowToDocumentFunction.apply(row);
8785
this.writeBatcher.add(this.docBuilder.build(inputs));
88-
this.docCount++;
8986
}
9087

9188
@Override
9289
public WriterCommitMessage commit() throws IOException {
93-
CommitMessage message = new CommitMessage("Wrote", docCount, partitionId, taskId, epochId);
90+
this.writeBatcher.flushAndWait();
91+
CommitMessage message = new CommitMessage(successItemCount.get(), failedItemCount.get(), partitionId, taskId, epochId);
9492
if (logger.isDebugEnabled()) {
9593
logger.debug("Committing {}", message);
9694
}
97-
this.writeBatcher.flushAndWait();
9895
throwWriteFailureIfExists();
9996
return message;
10097
}
@@ -113,6 +110,20 @@ public void close() {
113110
stopJobAndRelease();
114111
}
115112

113+
private void addBatchListeners() {
114+
this.writeBatcher.onBatchSuccess(batch -> this.successItemCount.getAndAdd(batch.getItems().length));
115+
if (writeContext.isAbortOnFailure()) {
116+
// Logging not needed here, as WriteBatcherImpl already logs this at the warning level.
117+
this.writeBatcher.onBatchFailure((batch, failure) ->
118+
this.writeFailure.compareAndSet(null, failure)
119+
);
120+
} else {
121+
this.writeBatcher.onBatchFailure((batch, failure) ->
122+
this.failedItemCount.getAndAdd(batch.getItems().length)
123+
);
124+
}
125+
}
126+
116127
private synchronized void throwWriteFailureIfExists() throws IOException {
117128
if (writeFailure.get() != null) {
118129
// Only including the message seems sufficient here, as Spark is logging the stacktrace. And the user

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
4242
private final int partitionId;
4343
private final long taskId;
4444
private final long epochId;
45-
private int itemCount;
45+
46+
// Updated after each call to MarkLogic.
47+
private int successItemCount;
48+
private int failedItemCount;
4649

4750
CustomCodeWriter(CustomCodeContext customCodeContext, int partitionId, long taskId, long epochId) {
4851
this.customCodeContext = customCodeContext;
@@ -68,7 +71,6 @@ public void write(InternalRow row) {
6871
String rowValue = customCodeContext.isCustomSchema() ?
6972
convertRowToJSONString(row) :
7073
row.getString(0);
71-
itemCount++;
7274

7375
this.currentBatch.add(rowValue);
7476

@@ -80,7 +82,7 @@ public void write(InternalRow row) {
8082
@Override
8183
public WriterCommitMessage commit() {
8284
flush();
83-
CommitMessage message = new CommitMessage("Processed", itemCount, partitionId, taskId, epochId);
85+
CommitMessage message = new CommitMessage(successItemCount, failedItemCount, partitionId, taskId, epochId);
8486
if (logger.isDebugEnabled()) {
8587
logger.debug("Committing {}", message);
8688
}
@@ -114,13 +116,14 @@ private void flush() {
114116
if (logger.isDebugEnabled()) {
115117
logger.debug("Calling custom code in MarkLogic");
116118
}
119+
final int itemCount = currentBatch.size();
117120
ServerEvaluationCall call = customCodeContext.buildCall(
118121
this.databaseClient,
119122
new CustomCodeContext.CallOptions(Options.WRITE_INVOKE, Options.WRITE_JAVASCRIPT, Options.WRITE_XQUERY)
120123
);
121124
call.addVariable(determineExternalVariableName(), makeVariableValue());
122125
currentBatch.clear();
123-
executeCall(call);
126+
executeCall(call, itemCount);
124127
}
125128

126129
private String determineExternalVariableName() {
@@ -165,13 +168,15 @@ private String convertRowToJSONString(InternalRow row) {
165168
return jsonObjectWriter.toString();
166169
}
167170

168-
private void executeCall(ServerEvaluationCall call) {
171+
private void executeCall(ServerEvaluationCall call, int itemCount) {
169172
try {
170173
call.evalAs(String.class);
174+
this.successItemCount += itemCount;
171175
} catch (RuntimeException ex) {
172176
if (customCodeContext.isAbortOnFailure()) {
173177
throw ex;
174178
}
179+
this.failedItemCount += itemCount;
175180
Util.MAIN_LOGGER.error(String.format("Unable to process row; cause: %s", ex.getMessage()));
176181
}
177182
}

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525

2626
import java.io.IOException;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728

2829
import static org.junit.jupiter.api.Assertions.*;
2930

@@ -177,6 +178,11 @@ void invalidPermissionsConfig() {
177178

178179
@Test
179180
void dontAbortOnFailure() {
181+
AtomicInteger successCount = new AtomicInteger();
182+
AtomicInteger failureCount = new AtomicInteger();
183+
MarkLogicWrite.successCountConsumer = count -> successCount.set(count);
184+
MarkLogicWrite.failureCountConsumer = count -> failureCount.set(count);
185+
180186
newWriterWithDefaultConfig("temporal-data-with-invalid-rows.csv", 1)
181187
.option(Options.WRITE_TEMPORAL_COLLECTION, TEMPORAL_COLLECTION)
182188
// Force each row in the CSV to be written in its own batch, ensuring that the one row that should succeed
@@ -186,6 +192,8 @@ void dontAbortOnFailure() {
186192
.save();
187193

188194
assertCollectionSize("9 of the batches should have failed, with the 10th batch succeeding", COLLECTION, 1);
195+
assertEquals(9, failureCount.get());
196+
assertEquals(1, successCount.get());
189197
}
190198

191199
private void verifyFailureIsDueToLackOfPermission(SparkException ex) {

src/test/java/com/marklogic/spark/writer/customcode/ProcessWithCustomCodeTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import com.marklogic.junit5.XmlNode;
55
import com.marklogic.spark.Options;
66
import com.marklogic.spark.writer.AbstractWriteTest;
7+
import com.marklogic.spark.writer.MarkLogicWrite;
78
import org.apache.spark.SparkException;
89
import org.apache.spark.sql.SaveMode;
910
import org.junit.jupiter.api.Test;
1011

12+
import java.util.concurrent.atomic.AtomicInteger;
1113
import java.util.stream.Stream;
1214

1315
import static org.junit.jupiter.api.Assertions.*;
@@ -121,15 +123,22 @@ void abortOnFailure() {
121123
"Unexpected error message: " + ex.getMessage());
122124
}
123125

124-
@SuppressWarnings("java:S2699") // The absence of an assertion is fine for this test.
125126
@Test
126127
void dontAbortOnFailure() {
128+
AtomicInteger successCount = new AtomicInteger();
129+
AtomicInteger failureCount = new AtomicInteger();
130+
MarkLogicWrite.successCountConsumer = count -> successCount.set(count);
131+
MarkLogicWrite.failureCountConsumer = count -> failureCount.set(count);
132+
127133
// The lack of an error here indicates that the job did not abort. The connector is expected to have logged
128134
// each error instead.
129135
newWriterWithDefaultConfig("three-uris.csv", 2)
130136
.option(Options.WRITE_JAVASCRIPT, "var URI; throw Error('Boom!');")
131137
.option(Options.WRITE_ABORT_ON_FAILURE, "false")
132138
.save();
139+
140+
assertEquals(3, failureCount.get());
141+
assertEquals(0, successCount.get());
133142
}
134143

135144
private void verifyThreeJsonDocumentsWereWritten() {

0 commit comments

Comments
 (0)