Skip to content

Commit d22ebab

Browse files
committed
Fixed progress logging bug
Realized in a Spark environment, the static for counting written docs wasn't getting reset. Did some refactoring to avoid duplicated code as well, and also to setup for potentially logging progress on reads. Also defaulting the progress interval to 0 instead of 10000. This doesn't seem as useful in a Spark environment. Flux will default it to 10000 instead.
1 parent ea631ad commit d22ebab

File tree

8 files changed

+94
-45
lines changed

8 files changed

+94
-45
lines changed

CONTRIBUTING.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,20 @@ df2.head()
156156
json.loads(df2.head()['content'])
157157
```
158158

159+
For a quick test of writing documents, use the following:
160+
161+
```
162+
163+
spark.read.option("header", True).csv("src/test/resources/data.csv")\
164+
.repartition(2)\
165+
.write.format("marklogic")\
166+
.option("spark.marklogic.client.uri", "spark-test-user:spark@localhost:8000")\
167+
.option("spark.marklogic.write.permissions", "spark-user-role,read,spark-user-role,update")\
168+
.option("spark.marklogic.write.logProgress", 50)\
169+
.option("spark.marklogic.write.batchSize", 10)\
170+
.mode("append")\
171+
.save()
172+
```
159173

160174
# Testing against a local Spark cluster
161175

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
9393
return new MarkLogicTable(schema, properties);
9494
}
9595

96+
WriteProgressLogger.progressCounter.set(0);
9697
return new MarkLogicTable(new WriteContext(schema, properties));
9798
}
9899

src/main/java/com/marklogic/spark/Options.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public abstract class Options {
9090
public static final String WRITE_THREAD_COUNT_PER_PARTITION = "spark.marklogic.write.threadCountPerPartition";
9191
public static final String WRITE_ABORT_ON_FAILURE = "spark.marklogic.write.abortOnFailure";
9292

93-
// For logging progress when writing documents or processing with custom code.
93+
// For logging progress when writing documents or processing with custom code. Defines the interval at which
94+
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
95+
// being written/processed.
9496
public static final String WRITE_LOG_PROGRESS = "spark.marklogic.write.logProgress";
9597

9698
// For writing via custom code.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.io.Serializable;
7+
8+
public abstract class ProgressLogger implements Serializable {
9+
10+
static final long serialVersionUID = 1;
11+
12+
private final long progressInterval;
13+
private final int batchSize;
14+
private final String message;
15+
16+
protected ProgressLogger(long progressInterval, int batchSize, String message) {
17+
this.progressInterval = progressInterval;
18+
this.batchSize = batchSize;
19+
this.message = message;
20+
}
21+
22+
protected abstract long getNewSum(long itemCount);
23+
24+
public void logProgressIfNecessary(long itemCount) {
25+
if (this.progressInterval > 0) {
26+
long sum = getNewSum(itemCount);
27+
if (sum >= progressInterval) {
28+
long lowerBound = sum / (this.progressInterval);
29+
long upperBound = (lowerBound * this.progressInterval) + this.batchSize;
30+
if (sum >= lowerBound && sum < upperBound) {
31+
Util.MAIN_LOGGER.info(message, sum);
32+
}
33+
}
34+
}
35+
}
36+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
public class WriteProgressLogger extends ProgressLogger {
10+
11+
public static final AtomicLong progressCounter = new AtomicLong(0);
12+
13+
public WriteProgressLogger(long progressInterval, int batchSize, String message) {
14+
super(progressInterval, batchSize, message);
15+
}
16+
17+
@Override
18+
protected long getNewSum(long itemCount) {
19+
return progressCounter.addAndGet(itemCount);
20+
}
21+
}

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import com.marklogic.client.document.ServerTransform;
2525
import com.marklogic.client.impl.GenericDocumentImpl;
2626
import com.marklogic.client.io.Format;
27-
import com.marklogic.spark.ConnectorException;
28-
import com.marklogic.spark.ContextSupport;
29-
import com.marklogic.spark.Options;
30-
import com.marklogic.spark.Util;
27+
import com.marklogic.spark.*;
3128
import com.marklogic.spark.reader.document.DocumentRowSchema;
3229
import com.marklogic.spark.reader.file.TripleRowSchema;
3330
import org.apache.spark.sql.types.StructType;
@@ -36,18 +33,16 @@
3633
import java.util.List;
3734
import java.util.Map;
3835
import java.util.Optional;
39-
import java.util.concurrent.atomic.AtomicInteger;
4036
import java.util.stream.Stream;
4137

4238
public class WriteContext extends ContextSupport {
4339

4440
static final long serialVersionUID = 1;
45-
private static final AtomicInteger progressTracker = new AtomicInteger();
4641

4742
private final StructType schema;
4843
private final boolean usingFileSchema;
4944
private final int batchSize;
50-
private final int logProgress;
45+
private final ProgressLogger progressLogger;
5146

5247
private int fileSchemaContentPosition;
5348
private int fileSchemaPathPosition;
@@ -59,7 +54,8 @@ public WriteContext(StructType schema, Map<String, String> properties) {
5954
super(properties);
6055
this.schema = schema;
6156
this.batchSize = (int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1);
62-
this.logProgress = (int) getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
57+
this.progressLogger = new WriteProgressLogger(getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
58+
batchSize, "Documents written: {}");
6359

6460
// We support the Spark binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html -
6561
// so that reader can be reused for loading files as-is.
@@ -274,22 +270,9 @@ private void logBatchOnSuccess(WriteBatch batch) {
274270
docCount--;
275271
}
276272
}
277-
if (this.logProgress > 0) {
278-
logProgressIfNecessary(docCount);
279-
}
280-
if (logger.isDebugEnabled()) {
281-
logger.debug("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
282-
}
283-
}
284-
285-
private void logProgressIfNecessary(int docCount) {
286-
int sum = progressTracker.addAndGet(docCount);
287-
if (sum >= logProgress) {
288-
int lowerBound = sum / (this.logProgress);
289-
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
290-
if (sum >= lowerBound && sum < upperBound) {
291-
Util.MAIN_LOGGER.info("Documents written: {}", sum);
292-
}
273+
progressLogger.logProgressIfNecessary(docCount);
274+
if (logger.isTraceEnabled()) {
275+
logger.trace("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
293276
}
294277
}
295278

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
import com.marklogic.client.io.JacksonHandle;
1010
import com.marklogic.client.io.StringHandle;
1111
import com.marklogic.client.io.marker.AbstractWriteHandle;
12-
import com.marklogic.spark.ConnectorException;
13-
import com.marklogic.spark.JsonRowSerializer;
14-
import com.marklogic.spark.Options;
15-
import com.marklogic.spark.Util;
12+
import com.marklogic.spark.*;
1613
import com.marklogic.spark.reader.customcode.CustomCodeContext;
1714
import com.marklogic.spark.writer.CommitMessage;
1815
import org.apache.spark.sql.catalyst.InternalRow;
@@ -23,19 +20,18 @@
2320

2421
import java.util.ArrayList;
2522
import java.util.List;
26-
import java.util.concurrent.atomic.AtomicInteger;
2723
import java.util.stream.Collectors;
2824

2925
class CustomCodeWriter implements DataWriter<InternalRow> {
3026

3127
private static final Logger logger = LoggerFactory.getLogger(CustomCodeWriter.class);
32-
private static final AtomicInteger progressTracker = new AtomicInteger();
3328

3429
private final DatabaseClient databaseClient;
3530
private final CustomCodeContext customCodeContext;
3631
private final JsonRowSerializer jsonRowSerializer;
3732
private final int batchSize;
38-
private final int logProgress;
33+
private final ProgressLogger progressLogger;
34+
3935
private final List<String> currentBatch = new ArrayList<>();
4036
private final String externalVariableDelimiter;
4137
private ObjectMapper objectMapper;
@@ -50,7 +46,8 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
5046
this.jsonRowSerializer = new JsonRowSerializer(customCodeContext.getSchema(), customCodeContext.getProperties());
5147

5248
this.batchSize = (int) customCodeContext.getNumericOption(Options.WRITE_BATCH_SIZE, 1, 1);
53-
this.logProgress = (int) customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
49+
this.progressLogger = new WriteProgressLogger(customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
50+
this.batchSize, "Items processed: {}");
5451

5552
this.externalVariableDelimiter = customCodeContext.optionExists(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) ?
5653
customCodeContext.getProperties().get(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) : ",";
@@ -154,9 +151,7 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
154151
try {
155152
call.evalAs(String.class);
156153
this.successItemCount += itemCount;
157-
if (this.logProgress > 0) {
158-
logProgressIfNecessary(itemCount);
159-
}
154+
this.progressLogger.logProgressIfNecessary(itemCount);
160155
} catch (RuntimeException ex) {
161156
if (customCodeContext.isAbortOnFailure()) {
162157
throw ex;
@@ -165,13 +160,4 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
165160
Util.MAIN_LOGGER.error(String.format("Unable to process row; cause: %s", ex.getMessage()));
166161
}
167162
}
168-
169-
private void logProgressIfNecessary(int itemCount) {
170-
int sum = progressTracker.addAndGet(itemCount);
171-
int lowerBound = sum / (this.logProgress);
172-
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
173-
if (sum >= lowerBound && sum < upperBound) {
174-
Util.MAIN_LOGGER.info("Items processed: {}", sum);
175-
}
176-
}
177163
}

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ void logProgressTest() {
4848
.save();
4949

5050
verifyTwoHundredDocsWereWritten();
51+
52+
// For manual inspection, run it again to ensure that the progress counter was reset.
53+
newWriter(2)
54+
.option(Options.WRITE_BATCH_SIZE, 10)
55+
.option(Options.WRITE_LOG_PROGRESS, 40)
56+
.save();
5157
}
5258

5359
@Test

0 commit comments

Comments
 (0)