Skip to content

Commit b28c471

Browse files
authored
Merge pull request #268 from marklogic/feature/fixing-progress
Fixed progress logging
2 parents 134d274 + 5a03397 commit b28c471

File tree

10 files changed

+62
-89
lines changed

10 files changed

+62
-89
lines changed

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,29 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
8686
);
8787
}
8888

89+
final ContextSupport tempContext = new ContextSupport(properties);
90+
8991
// The appropriate progress logger is reset here so that when the connector is used repeatedly in an
9092
// environment like PySpark, the counts start with zero on each new Spark job.
93+
final long readProgressInterval = tempContext.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0);
9194
if (isReadDocumentsOperation(properties)) {
92-
ReadProgressLogger.progressCounter.set(0);
95+
ReadProgressLogger.initialize(readProgressInterval, "Documents read: {}");
9396
return new DocumentTable(DocumentRowSchema.SCHEMA);
9497
} else if (isReadTriplesOperation(properties)) {
95-
ReadProgressLogger.progressCounter.set(0);
98+
ReadProgressLogger.initialize(readProgressInterval, "Triples read: {}");
9699
return new DocumentTable(TripleRowSchema.SCHEMA);
97-
} else if (properties.get(Options.READ_OPTIC_QUERY) != null || Util.isReadWithCustomCodeOperation(properties)) {
98-
ReadProgressLogger.progressCounter.set(0);
100+
} else if (properties.get(Options.READ_OPTIC_QUERY) != null) {
101+
ReadProgressLogger.initialize(readProgressInterval, "Rows read: {}");
102+
return new MarkLogicTable(schema, properties);
103+
} else if (Util.isReadWithCustomCodeOperation(properties)) {
104+
// Not yet logging progress for reading with custom code, as it's assumed the user will then write with
105+
// custom code.
99106
return new MarkLogicTable(schema, properties);
100107
}
101108

102-
WriteProgressLogger.progressCounter.set(0);
109+
final long writeProgressInterval = tempContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0);
110+
String message = Util.isReadWithCustomCodeOperation(properties) ? "Items processed: {}" : "Documents written: {}";
111+
WriteProgressLogger.initialize(writeProgressInterval, message);
103112
return new MarkLogicTable(new WriteContext(schema, properties));
104113
}
105114

src/main/java/com/marklogic/spark/ProgressLogger.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/main/java/com/marklogic/spark/ReadProgressLogger.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,36 @@
33
*/
44
package com.marklogic.spark;
55

6+
import java.io.Serializable;
67
import java.util.concurrent.atomic.AtomicLong;
78

89
/**
910
* Handles the progress counter for any operation involving reading from MarkLogic. A Spark job/application can only have
1011
* one reader, and thus DefaultSource handles resetting this counter before a new read job starts up. A static counter
1112
* is used so that all reader partitions in the same JVM can have their progress aggregated and logged.
1213
*/
13-
public class ReadProgressLogger extends ProgressLogger {
14+
public class ReadProgressLogger implements Serializable {
1415

15-
public static final AtomicLong progressCounter = new AtomicLong(0);
16+
static final long serialVersionUID = 1L;
1617

17-
public ReadProgressLogger(long progressInterval, int batchSize, String message) {
18-
super(progressInterval, batchSize, message);
18+
private static final AtomicLong progressCounter = new AtomicLong(0);
19+
private static long progressInterval;
20+
private static long nextProgressInterval;
21+
private static String message;
22+
23+
public static void initialize(long progressInterval, String message) {
24+
progressCounter.set(0);
25+
ReadProgressLogger.progressInterval = progressInterval;
26+
nextProgressInterval = progressInterval;
27+
ReadProgressLogger.message = message;
1928
}
2029

21-
@Override
22-
protected long getNewSum(long itemCount) {
23-
return progressCounter.addAndGet(itemCount);
30+
public static void logProgressIfNecessary(long itemCount) {
31+
if (progressInterval > 0 && progressCounter.addAndGet(itemCount) >= nextProgressInterval) {
32+
synchronized (progressCounter) {
33+
Util.MAIN_LOGGER.info(message, nextProgressInterval);
34+
nextProgressInterval += progressInterval;
35+
}
36+
}
2437
}
2538
}

src/main/java/com/marklogic/spark/WriteProgressLogger.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,36 @@
33
*/
44
package com.marklogic.spark;
55

6+
import java.io.Serializable;
67
import java.util.concurrent.atomic.AtomicLong;
78

89
/**
910
* Handles the progress counter for any operation involving writing to MarkLogic. A Spark job/application can only have
1011
* one writer, and thus DefaultSource handles resetting this counter before a new write job starts up. A static counter
1112
* is used so that all writer partitions in the same JVM can have their progress aggregated and logged.
1213
*/
13-
public class WriteProgressLogger extends ProgressLogger {
14+
public class WriteProgressLogger implements Serializable {
1415

15-
public static final AtomicLong progressCounter = new AtomicLong(0);
16+
static final long serialVersionUID = 1L;
1617

17-
public WriteProgressLogger(long progressInterval, int batchSize, String message) {
18-
super(progressInterval, batchSize, message);
18+
private static final AtomicLong progressCounter = new AtomicLong(0);
19+
private static long progressInterval;
20+
private static long nextProgressInterval;
21+
private static String message;
22+
23+
public static void initialize(long progressInterval, String message) {
24+
progressCounter.set(0);
25+
WriteProgressLogger.progressInterval = progressInterval;
26+
nextProgressInterval = progressInterval;
27+
WriteProgressLogger.message = message;
1928
}
2029

21-
@Override
22-
protected long getNewSum(long itemCount) {
23-
return progressCounter.addAndGet(itemCount);
30+
public static void logProgressIfNecessary(long itemCount) {
31+
if (progressInterval > 0 && progressCounter.addAndGet(itemCount) >= nextProgressInterval) {
32+
synchronized (progressCounter) {
33+
Util.MAIN_LOGGER.info(message, nextProgressInterval);
34+
nextProgressInterval += progressInterval;
35+
}
36+
}
2437
}
2538
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import com.marklogic.client.query.SearchQueryDefinition;
1313
import com.marklogic.client.query.StructuredQueryBuilder;
1414
import com.marklogic.spark.Options;
15-
import com.marklogic.spark.ProgressLogger;
1615
import com.marklogic.spark.ReadProgressLogger;
1716
import org.apache.spark.sql.catalyst.InternalRow;
1817
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -40,7 +39,6 @@ class ForestReader implements PartitionReader<InternalRow> {
4039
private final Integer limit;
4140

4241
// Only used for logging.
43-
private final ProgressLogger progressLogger;
4442
private final ForestPartition forestPartition;
4543
private long startTime;
4644

@@ -74,11 +72,6 @@ class ForestReader implements PartitionReader<InternalRow> {
7472
this.requestedMetadata = context.getRequestedMetadata();
7573
this.documentManager.setMetadataCategories(this.requestedMetadata);
7674
this.queryBuilder = client.newQueryManager().newStructuredQueryBuilder();
77-
78-
this.progressLogger = new ReadProgressLogger(
79-
context.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
80-
context.getBatchSize(), "Read documents: {}"
81-
);
8275
}
8376

8477
@Override
@@ -153,7 +146,7 @@ private DocumentPage readPage(List<String> uris) {
153146
if (logger.isTraceEnabled()) {
154147
logger.trace("Retrieved page of documents in {}ms from partition {}", (System.currentTimeMillis() - start), this.forestPartition);
155148
}
156-
this.progressLogger.logProgressIfNecessary(page.getPageSize());
149+
ReadProgressLogger.logProgressIfNecessary(page.getPageSize());
157150
return page;
158151
}
159152

src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.marklogic.client.row.RowRecord;
88
import com.marklogic.client.type.PlanColumn;
99
import com.marklogic.spark.Options;
10-
import com.marklogic.spark.ProgressLogger;
1110
import com.marklogic.spark.ReadProgressLogger;
1211
import org.apache.spark.sql.catalyst.InternalRow;
1312
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -39,7 +38,6 @@ class OpticTriplesReader implements PartitionReader<InternalRow> {
3938
// Only for logging
4039
private final long batchSize;
4140
private long progressCounter;
42-
private final ProgressLogger progressLogger;
4341

4442
private Iterator<RowRecord> currentRowIterator;
4543

@@ -60,10 +58,6 @@ public OpticTriplesReader(ForestPartition forestPartition, DocumentContext conte
6058
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered);
6159

6260
this.batchSize = context.getBatchSize();
63-
this.progressLogger = new ReadProgressLogger(
64-
context.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
65-
(int) this.batchSize, "Read triples: {}"
66-
);
6761
}
6862

6963
@Override
@@ -86,7 +80,7 @@ public InternalRow get() {
8680
Object[] row = convertNextTripleIntoRow();
8781
progressCounter++;
8882
if (progressCounter >= batchSize) {
89-
progressLogger.logProgressIfNecessary(progressCounter);
83+
ReadProgressLogger.logProgressIfNecessary(this.progressCounter);
9084
progressCounter = 0;
9185
}
9286
return new GenericInternalRow(row);

src/main/java/com/marklogic/spark/reader/optic/OpticPartitionReader.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.marklogic.client.row.RowManager;
23-
import com.marklogic.spark.Options;
24-
import com.marklogic.spark.ProgressLogger;
2523
import com.marklogic.spark.ReadProgressLogger;
2624
import com.marklogic.spark.reader.JsonRowDeserializer;
2725
import org.apache.spark.sql.catalyst.InternalRow;
@@ -51,7 +49,6 @@ class OpticPartitionReader implements PartitionReader<InternalRow> {
5149
private long totalDuration;
5250
private long progressCounter;
5351
private final long batchSize;
54-
private final ProgressLogger progressLogger;
5552

5653
// Used solely for testing purposes; is never expected to be used in production. Intended to provide a way for
5754
// a test to get the count of rows returned from MarkLogic, which is important for ensuring that pushdown operations
@@ -67,11 +64,6 @@ class OpticPartitionReader implements PartitionReader<InternalRow> {
6764
// be in the rows.
6865
this.rowManager.setDatatypeStyle(RowManager.RowSetPart.HEADER);
6966
this.jsonRowDeserializer = new JsonRowDeserializer(opticReadContext.getSchema());
70-
71-
this.progressLogger = new ReadProgressLogger(
72-
opticReadContext.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
73-
(int) opticReadContext.getBatchSize(), "Read rows: {}"
74-
);
7567
}
7668

7769
@Override
@@ -115,7 +107,7 @@ public InternalRow get() {
115107
this.totalRowCount++;
116108
this.progressCounter++;
117109
if (this.progressCounter >= this.batchSize) {
118-
progressLogger.logProgressIfNecessary(this.progressCounter);
110+
ReadProgressLogger.logProgressIfNecessary(this.progressCounter);
119111
this.progressCounter = 0;
120112
}
121113
JsonNode row = rowIterator.next();

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class WriteContext extends ContextSupport {
4242
private final StructType schema;
4343
private final boolean usingFileSchema;
4444
private final int batchSize;
45-
private final ProgressLogger progressLogger;
4645

4746
private int fileSchemaContentPosition;
4847
private int fileSchemaPathPosition;
@@ -54,8 +53,6 @@ public WriteContext(StructType schema, Map<String, String> properties) {
5453
super(properties);
5554
this.schema = schema;
5655
this.batchSize = (int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1);
57-
this.progressLogger = new WriteProgressLogger(getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
58-
batchSize, "Documents written: {}");
5956

6057
// We support the Spark binaryFile schema - https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html -
6158
// so that reader can be reused for loading files as-is.
@@ -270,7 +267,7 @@ private void logBatchOnSuccess(WriteBatch batch) {
270267
docCount--;
271268
}
272269
}
273-
progressLogger.logProgressIfNecessary(docCount);
270+
WriteProgressLogger.logProgressIfNecessary(docCount);
274271
if (logger.isTraceEnabled()) {
275272
logger.trace("Wrote batch; length: {}; job batch number: {}", docCount, batch.getJobBatchNumber());
276273
}

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
3030
private final CustomCodeContext customCodeContext;
3131
private final JsonRowSerializer jsonRowSerializer;
3232
private final int batchSize;
33-
private final ProgressLogger progressLogger;
3433

3534
private final List<String> currentBatch = new ArrayList<>();
3635
private final String externalVariableDelimiter;
@@ -46,8 +45,6 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
4645
this.jsonRowSerializer = new JsonRowSerializer(customCodeContext.getSchema(), customCodeContext.getProperties());
4746

4847
this.batchSize = (int) customCodeContext.getNumericOption(Options.WRITE_BATCH_SIZE, 1, 1);
49-
this.progressLogger = new WriteProgressLogger(customCodeContext.getNumericOption(Options.WRITE_LOG_PROGRESS, 0, 0),
50-
this.batchSize, "Items processed: {}");
5148

5249
this.externalVariableDelimiter = customCodeContext.optionExists(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) ?
5350
customCodeContext.getProperties().get(Options.WRITE_EXTERNAL_VARIABLE_DELIMITER) : ",";
@@ -151,7 +148,7 @@ private void executeCall(ServerEvaluationCall call, int itemCount) {
151148
try {
152149
call.evalAs(String.class);
153150
this.successItemCount += itemCount;
154-
this.progressLogger.logProgressIfNecessary(itemCount);
151+
WriteProgressLogger.logProgressIfNecessary(itemCount);
155152
} catch (RuntimeException ex) {
156153
if (customCodeContext.isAbortOnFailure()) {
157154
throw ex;

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ void defaultBatchSizeAndThreadCount() {
4040

4141
@Test
4242
void logProgressTest() {
43-
newWriter(2)
43+
newWriter(4)
4444
// Including these options here to ensure they don't cause any issues, though we're not yet able to
4545
// assert on the info-level log entries that they add.
4646
.option(Options.WRITE_BATCH_SIZE, 8)
47-
.option(Options.WRITE_LOG_PROGRESS, 50)
47+
.option(Options.WRITE_THREAD_COUNT, 8)
48+
.option(Options.WRITE_LOG_PROGRESS, 20)
4849
.save();
4950

5051
verifyTwoHundredDocsWereWritten();

0 commit comments

Comments
 (0)