Skip to content

Commit f410d14

Browse files
authored
Merge pull request #266 from marklogic/feature/read-docs-progress
Fixed progress logging bug
2 parents ea631ad + d22ebab commit f410d14

File tree

8 files changed

+94
-45
lines changed

8 files changed

+94
-45
lines changed

CONTRIBUTING.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,20 @@ df2.head()
156156
json.loads(df2.head()['content'])
157157
```
158158

159+
For a quick test of writing documents, use the following:
160+
161+
```
162+
163+
spark.read.option("header", True).csv("src/test/resources/data.csv")\
164+
.repartition(2)\
165+
.write.format("marklogic")\
166+
.option("spark.marklogic.client.uri", "spark-test-user:spark@localhost:8000")\
167+
.option("spark.marklogic.write.permissions", "spark-user-role,read,spark-user-role,update")\
168+
.option("spark.marklogic.write.logProgress", 50)\
169+
.option("spark.marklogic.write.batchSize", 10)\
170+
.mode("append")\
171+
.save()
172+
```
159173

160174
# Testing against a local Spark cluster
161175

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
9393
return new MarkLogicTable(schema, properties);
9494
}
9595

96+
WriteProgressLogger.progressCounter.set(0);
9697
return new MarkLogicTable(new WriteContext(schema, properties));
9798
}
9899

src/main/java/com/marklogic/spark/Options.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public abstract class Options {
9090
public static final String WRITE_THREAD_COUNT_PER_PARTITION = "spark.marklogic.write.threadCountPerPartition";
9191
public static final String WRITE_ABORT_ON_FAILURE = "spark.marklogic.write.abortOnFailure";
9292

93-
// For logging progress when writing documents or processing with custom code.
93+
// For logging progress when writing documents or processing with custom code. Defines the interval at which
94+
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
95+
// being written/processed.
9496
public static final String WRITE_LOG_PROGRESS = "spark.marklogic.write.logProgress";
9597

9698
// For writing via custom code.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.io.Serializable;
7+
8+
public abstract class ProgressLogger implements Serializable {
9+
10+
static final long serialVersionUID = 1;
11+
12+
private final long progressInterval;
13+
private final int batchSize;
14+
private final String message;
15+
16+
protected ProgressLogger(long progressInterval, int batchSize, String message) {
17+
this.progressInterval = progressInterval;
18+
this.batchSize = batchSize;
19+
this.message = message;
20+
}
21+
22+
protected abstract long getNewSum(long itemCount);
23+
24+
public void logProgressIfNecessary(long itemCount) {
25+
if (this.progressInterval > 0) {
26+
long sum = getNewSum(itemCount);
27+
if (sum >= progressInterval) {
28+
long lowerBound = sum / (this.progressInterval);
29+
long upperBound = (lowerBound * this.progressInterval) + this.batchSize;
30+
if (sum >= lowerBound && sum < upperBound) {
31+
Util.MAIN_LOGGER.info(message, sum);
32+
}
33+
}
34+
}
35+
}
36+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
public class WriteProgressLogger extends ProgressLogger {
10+
11+
public static final AtomicLong progressCounter = new AtomicLong(0);
12+
13+
public WriteProgressLogger(long progressInterval, int batchSize, String message) {
14+
super(progressInterval, batchSize, message);
15+
}
16+
17+
@Override
18+
protected long getNewSum(long itemCount) {
19+
return progressCounter.addAndGet(itemCount);
20+
}
21+
}

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import com.marklogic.client.document.ServerTransform;
2525
import com.marklogic.client.impl.GenericDocumentImpl;
2626
import com.marklogic.client.io.Format;
27-
import com.marklogic.spark.ConnectorException;
28-
import com.marklogic.spark.ContextSupport;
29-
import com.marklogic.spark.Options;
30-
import com.marklogic.spark.Util;
27+
import com.marklogic.spark.*;
3128
import com.marklogic.spark.reader.document.DocumentRowSchema;
3229
import com.marklogic.spark.reader.file.TripleRowSchema;
3330
import org.apache.spark.sql.types.StructType;
@@ -36,18 +33,16 @@
3633
import java.util.List;
3734
import java.util.Map;
3835
import java.util.Optional;
39-
import java.util.concurrent.atomic.AtomicInteger;
4036
import java.util.stream.Stream;
4137

4238
public class WriteContext extends ContextSupport {
4339

4440
static final long serialVersionUID = 1;
45-
private static final AtomicInteger progressTracker = new AtomicInteger();
4641

4742
private final StructType schema;
4843
private final boolean usingFileSchema;
4944
private final int batchSize;
50-
private final int logProgress;
45+
private final ProgressLogger progressLogger;
5146

5247
private int fileSchemaContentPosition;
5348
private int fileSchemaPathPosition;
@@ -59,7 +54,8 @@ public WriteContext(StructType schema, Map<String, String> properties) {
5954
super(properties);
6055
this.schema = schema;
6156
this.batchSize = (int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1);
62-
this.logProgress = (int) getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
57+
this.progressLogger = new WriteProgressLogger(getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
58+
batchSize, "Documents written: {}");
6359

6460
// We support the Spark binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html -
6561
// so that reader can be reused for loading files as-is.
@@ -274,22 +270,9 @@ private void logBatchOnSuccess(WriteBatch batch) {
274270
docCount--;
275271
}
276272
}
277-
if (this.logProgress > 0) {
278-
logProgressIfNecessary(docCount);
279-
}
280-
if (logger.isDebugEnabled()) {
281-
logger.debug("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
282-
}
283-
}
284-
285-
private void logProgressIfNecessary(int docCount) {
286-
int sum = progressTracker.addAndGet(docCount);
287-
if (sum >= logProgress) {
288-
int lowerBound = sum / (this.logProgress);
289-
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
290-
if (sum >= lowerBound && sum < upperBound) {
291-
Util.MAIN_LOGGER.info("Documents written: {}", sum);
292-
}
273+
progressLogger.logProgressIfNecessary(docCount);
274+
if (logger.isTraceEnabled()) {
275+
logger.trace("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
293276
}
294277
}
295278

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
import com.marklogic.client.io.JacksonHandle;
1010
import com.marklogic.client.io.StringHandle;
1111
import com.marklogic.client.io.marker.AbstractWriteHandle;
12-
import com.marklogic.spark.ConnectorException;
13-
import com.marklogic.spark.JsonRowSerializer;
14-
import com.marklogic.spark.Options;
15-
import com.marklogic.spark.Util;
12+
import com.marklogic.spark.*;
1613
import com.marklogic.spark.reader.customcode.CustomCodeContext;
1714
import com.marklogic.spark.writer.CommitMessage;
1815
import org.apache.spark.sql.catalyst.InternalRow;
@@ -23,19 +20,18 @@
2320

2421
import java.util.ArrayList;
2522
import java.util.List;
26-
import java.util.concurrent.atomic.AtomicInteger;
2723
import java.util.stream.Collectors;
2824

2925
class CustomCodeWriter implements DataWriter<InternalRow> {
3026

3127
private static final Logger logger = LoggerFactory.getLogger(CustomCodeWriter.class);
32-
private static final AtomicInteger progressTracker = new AtomicInteger();
3328

3429
private final DatabaseClient databaseClient;
3530
private final CustomCodeContext customCodeContext;
3631
private final JsonRowSerializer jsonRowSerializer;
3732
private final int batchSize;
38-
private final int logProgress;
33+
private final ProgressLogger progressLogger;
34+
3935
private final List<String> currentBatch = new ArrayList<>();
4036
private final String externalVariableDelimiter;
4137
private ObjectMapper objectMapper;
@@ -50,7 +46,8 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
5046
this.jsonRowSerializer = new JsonRowSerializer(customCodeContext.getSchema(), customCodeContext.getProperties());
5147

5248
this.batchSize = (int) customCodeContext.getNumericOption(Options.WRITE_BATCH_SIZE, 1, 1);
53-
this.logProgress = (int) customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 10000, 0);
49+
this.progressLogger = new WriteProgressLogger(customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
50+
this.batchSize, "Items processed: {}");
5451

5552
this.externalVariableDelimiter = customCodeContext.optionExists(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) ?
5653
customCodeContext.getProperties().get(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) : ",";
@@ -154,9 +151,7 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
154151
try {
155152
call.evalAs(String.class);
156153
this.successItemCount += itemCount;
157-
if (this.logProgress > 0) {
158-
logProgressIfNecessary(itemCount);
159-
}
154+
this.progressLogger.logProgressIfNecessary(itemCount);
160155
} catch (RuntimeException ex) {
161156
if (customCodeContext.isAbortOnFailure()) {
162157
throw ex;
@@ -165,13 +160,4 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
165160
Util.MAIN_LOGGER.error(String.format("Unable to process row; cause: %s", ex.getMessage()));
166161
}
167162
}
168-
169-
private void logProgressIfNecessary(int itemCount) {
170-
int sum = progressTracker.addAndGet(itemCount);
171-
int lowerBound = sum / (this.logProgress);
172-
int upperBound = (lowerBound * this.logProgress) + this.batchSize;
173-
if (sum >= lowerBound && sum < upperBound) {
174-
Util.MAIN_LOGGER.info("Items processed: {}", sum);
175-
}
176-
}
177163
}

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ void logProgressTest() {
4848
.save();
4949

5050
verifyTwoHundredDocsWereWritten();
51+
52+
// For manual inspection, run it again to ensure that the progress counter was reset.
53+
newWriter(2)
54+
.option(Options.WRITE_BATCH_SIZE, 10)
55+
.option(Options.WRITE_LOG_PROGRESS, 40)
56+
.save();
5157
}
5258

5359
@Test

0 commit comments

Comments
 (0)