Skip to content

Commit 7227441

Browse files
committed
Added logging of progress
Works for writing documents (the main use case) and reprocessing data (also a good use case). Not doing anything for exporting yet because in most cases in Flux, we don't have control over the data source used for writing.
1 parent aa15c15 commit 7227441

File tree

6 files changed

+74
-8
lines changed

6 files changed

+74
-8
lines changed

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private void parseConnectionString(String value, Map<String, String> connectionP
121121
}
122122
}
123123

124-
protected final long getNumericOption(String optionName, long defaultValue, long minimumValue) {
124+
public final long getNumericOption(String optionName, long defaultValue, long minimumValue) {
125125
try {
126126
long value = this.getProperties().containsKey(optionName) ?
127127
Long.parseLong(this.getProperties().get(optionName)) :

src/main/java/com/marklogic/spark/Options.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ public abstract class Options {
9090
public static final String WRITE_TOTAL_THREAD_COUNT = "spark.marklogic.write.totalThreadCount";
9191
public static final String WRITE_ABORT_ON_FAILURE = "spark.marklogic.write.abortOnFailure";
9292

93+
// For logging progress when writing documents or processing with custom code.
94+
public static final String WRITE_LOG_PROGRESS = "spark.marklogic.write.logProgress";
95+
9396
// For writing via custom code.
9497
public static final String WRITE_INVOKE = "spark.marklogic.write.invoke";
9598
public static final String WRITE_JAVASCRIPT = "spark.marklogic.write.javascript";

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,18 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Optional;
39+
import java.util.concurrent.atomic.AtomicInteger;
3940
import java.util.stream.Stream;
4041

4142
public class WriteContext extends ContextSupport {
4243

4344
static final long serialVersionUID = 1;
45+
private static final AtomicInteger progressTracker = new AtomicInteger();
4446

4547
private final StructType schema;
4648
private final boolean usingFileSchema;
49+
private final int batchSize;
50+
private final int logProgress;
4751

4852
private int fileSchemaContentPosition;
4953
private int fileSchemaPathPosition;
@@ -54,6 +58,8 @@ public class WriteContext extends ContextSupport {
5458
public WriteContext(StructType schema, Map<String, String> properties) {
5559
super(properties);
5660
this.schema = schema;
61+
this.batchSize = (int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1);
62+
this.logProgress = (int) getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
5763

5864
// We support the Spark binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html -
5965
// so that reader can be reused for loading files as-is.
@@ -90,7 +96,7 @@ int getThreadCountPerPartition() {
9096
WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
9197
final int threadCount = getTotalThreadCount() > 0 ?
9298
getThreadCountPerPartition() : getThreadCount();
93-
final int batchSize = (int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1);
99+
94100
Util.MAIN_LOGGER.info("Creating new batcher with thread count of {} and batch size of {}.", threadCount, batchSize);
95101
WriteBatcher writeBatcher = dataMovementManager
96102
.newWriteBatcher()
@@ -253,9 +259,21 @@ private void logBatchOnSuccess(WriteBatch batch) {
253259
docCount--;
254260
}
255261
}
262+
if (this.logProgress > 0) {
263+
logProgressIfNecessary(docCount);
264+
}
256265
logger.debug("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
257266
}
258267

268+
private void logProgressIfNecessary(int docCount) {
269+
int sum = progressTracker.addAndGet(docCount);
270+
int lowerBound = sum / (this.logProgress);
271+
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
272+
if (sum >= lowerBound && sum < upperBound) {
273+
Util.MAIN_LOGGER.info("Documents written: {}", sum);
274+
}
275+
}
276+
259277
boolean isUsingFileSchema() {
260278
return this.usingFileSchema;
261279
}

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,19 @@
2424
import java.io.StringWriter;
2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.stream.Collectors;
2829

2930
class CustomCodeWriter implements DataWriter<InternalRow> {
3031

3132
private static final Logger logger = LoggerFactory.getLogger(CustomCodeWriter.class);
33+
private static final AtomicInteger progressTracker = new AtomicInteger();
3234

3335
private final DatabaseClient databaseClient;
3436
private final CustomCodeContext customCodeContext;
3537

3638
private final int batchSize;
39+
private final int logProgress;
3740
private final List<String> currentBatch = new ArrayList<>();
3841
private final String externalVariableDelimiter;
3942
private ObjectMapper objectMapper;
@@ -46,8 +49,8 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
4649
this.customCodeContext = customCodeContext;
4750
this.databaseClient = customCodeContext.connectToMarkLogic();
4851

49-
this.batchSize = customCodeContext.optionExists(Options.WRITE_BATCH_SIZE) ?
50-
Integer.parseInt(customCodeContext.getProperties().get(Options.WRITE_BATCH_SIZE)) : 1;
52+
this.batchSize = (int) customCodeContext.getNumericOption(Options.WRITE_BATCH_SIZE, 1, 1);
53+
this.logProgress = (int) customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
5154

5255
this.externalVariableDelimiter = customCodeContext.optionExists(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) ?
5356
customCodeContext.getProperties().get(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) : ",";
@@ -64,7 +67,6 @@ public void write(InternalRow row) {
6467
row.getString(0);
6568

6669
this.currentBatch.add(rowValue);
67-
6870
if (this.currentBatch.size() >= this.batchSize) {
6971
flush();
7072
}
@@ -104,8 +106,8 @@ private void flush() {
104106
return;
105107
}
106108

107-
if (logger.isDebugEnabled()) {
108-
logger.debug("Calling custom code in MarkLogic");
109+
if (logger.isTraceEnabled()) {
110+
logger.trace("Calling custom code in MarkLogic");
109111
}
110112
final int itemCount = currentBatch.size();
111113
ServerEvaluationCall call = customCodeContext.buildCall(
@@ -164,6 +166,9 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
164166
try {
165167
call.evalAs(String.class);
166168
this.successItemCount += itemCount;
169+
if (this.logProgress > 0) {
170+
logProgressIfNecessary(itemCount);
171+
}
167172
} catch (RuntimeException ex) {
168173
if (customCodeContext.isAbortOnFailure()) {
169174
throw ex;
@@ -172,4 +177,13 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
172177
Util.MAIN_LOGGER.error(String.format("Unable to process row; cause: %s", ex.getMessage()));
173178
}
174179
}
180+
181+
private void logProgressIfNecessary(int itemCount) {
182+
int sum = progressTracker.addAndGet(itemCount);
183+
int lowerBound = sum / (this.logProgress);
184+
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
185+
if (sum >= lowerBound && sum < upperBound) {
186+
Util.MAIN_LOGGER.info("Items processed: {}", sum);
187+
}
188+
}
175189
}

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ void defaultBatchSizeAndThreadCount() {
3838
verifyTwoHundredDocsWereWritten();
3939
}
4040

41+
@Test
42+
void logProgressTest() {
43+
newWriter(2)
44+
// Including these options here to ensure they don't cause any issues, though we're not yet able to
45+
// assert on the info-level log entries that they add.
46+
.option(Options.WRITE_BATCH_SIZE, 8)
47+
.option(Options.WRITE_LOG_PROGRESS, 50)
48+
.save();
49+
50+
verifyTwoHundredDocsWereWritten();
51+
}
52+
4153
@Test
4254
void batchSizeGreaterThanNumberOfRowsToWrite() {
4355
newWriter()
@@ -128,7 +140,7 @@ void invalidThreadCount() {
128140
@Test
129141
void invalidBatchSize() {
130142
DataFrameWriter writer = newWriter().option(Options.WRITE_BATCH_SIZE, 0);
131-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
143+
ConnectorException ex = assertThrows(ConnectorException.class, () -> writer.save());
132144
assertEquals("The value of 'spark.marklogic.write.batchSize' must be 1 or greater.", ex.getMessage(),
133145
"Note that batchSize is very different for writing than it is for reading. For writing, it specifies the " +
134146
"exact number of documents to send to MarkLogic in each call. For reading, it used to determine how " +

src/test/java/com/marklogic/spark/writer/customcode/ProcessWithCustomCodeTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,25 @@
1616

1717
class ProcessWithCustomCodeTest extends AbstractWriteTest {
1818

19+
@Test
20+
void logProgressTest() {
21+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
22+
.option(Options.CLIENT_URI, makeClientUri())
23+
.option(Options.READ_XQUERY, "for $i in 1 to 100 return $i")
24+
.load()
25+
.write().format(CONNECTOR_IDENTIFIER)
26+
.option(Options.CLIENT_URI, makeClientUri())
27+
// With "uneven" numbers like this, the user will still see 5 progress entries, but the counts won't even -
28+
// they'll be 24, 40, 64, 80, and 100.
29+
.option(Options.WRITE_BATCH_SIZE, 8)
30+
.option(Options.WRITE_LOG_PROGRESS, 20)
31+
.option(Options.WRITE_JAVASCRIPT, "var URI; console.log('Nothing to do here.')")
32+
.mode(SaveMode.Append)
33+
.save();
34+
35+
assertTrue(true, "No assertion needed, this test is only for manual inspection of the progress log entries.");
36+
}
37+
1938
@Test
2039
void invokeJavaScript() {
2140
newWriterWithDefaultConfig("three-uris.csv", 2)

0 commit comments

Comments
 (0)