Skip to content

Commit 050ec7a

Browse files
committed
Now logging progress for read operations
Had to make two separate loggers so we can have a static counter for reading and a static counter for writing. Both get reset when DefaultSource is created. Did some quick manual testing of this in Flux, and it "just works". Note that progress for reading/writing files is not being addressed yet. Just focusing on progress for reading from MarkLogic and writing to MarkLogic.
1 parent f410d14 commit 050ec7a

File tree

10 files changed

+110
-8
lines changed

10 files changed

+110
-8
lines changed

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,23 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
7979
@Override
8080
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
8181
if (isFileOperation(properties)) {
82+
// Not yet supporting progress logging for file operations.
8283
return new MarkLogicFileTable(SparkSession.active(),
8384
new CaseInsensitiveStringMap(properties),
8485
JavaConverters.asScalaBuffer(getPaths(properties)), schema
8586
);
8687
}
8788

89+
// The appropriate progress logger is reset here so that when the connector is used repeatedly in an
90+
// environment like PySpark, the counts start with zero on each new Spark job.
8891
if (isReadDocumentsOperation(properties)) {
92+
ReadProgressLogger.progressCounter.set(0);
8993
return new DocumentTable(DocumentRowSchema.SCHEMA);
9094
} else if (isReadTriplesOperation(properties)) {
95+
ReadProgressLogger.progressCounter.set(0);
9196
return new DocumentTable(TripleRowSchema.SCHEMA);
92-
} else if (isReadOperation(properties)) {
97+
} else if (properties.get(Options.READ_OPTIC_QUERY) != null || Util.isReadWithCustomCodeOperation(properties)) {
98+
ReadProgressLogger.progressCounter.set(0);
9399
return new MarkLogicTable(schema, properties);
94100
}
95101

@@ -112,10 +118,6 @@ private boolean isFileOperation(Map<String, String> properties) {
112118
return properties.containsKey("path") || properties.containsKey("paths");
113119
}
114120

115-
private boolean isReadOperation(Map<String, String> properties) {
116-
return properties.get(Options.READ_OPTIC_QUERY) != null || Util.isReadWithCustomCodeOperation(properties);
117-
}
118-
119121
private boolean isReadDocumentsOperation(Map<String, String> properties) {
120122
return properties.containsKey(Options.READ_DOCUMENTS_QUERY) ||
121123
properties.containsKey(Options.READ_DOCUMENTS_STRING_QUERY) ||

src/main/java/com/marklogic/spark/Options.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public abstract class Options {
7171
public static final String READ_TRIPLES_FILTERED = "spark.marklogic.read.triples.filtered";
7272
public static final String READ_TRIPLES_BASE_IRI = "spark.marklogic.read.triples.baseIri";
7373

74+
// For logging progress when reading documents, rows, or items via custom code. Defines the interval at which
75+
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
76+
// being written/processed.
77+
public static final String READ_LOG_PROGRESS = "spark.marklogic.read.logProgress";
78+
7479
public static final String READ_FILES_TYPE = "spark.marklogic.read.files.type";
7580
public static final String READ_FILES_COMPRESSION = "spark.marklogic.read.files.compression";
7681
public static final String READ_FILES_ENCODING = "spark.marklogic.read.files.encoding";
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import java.util.concurrent.atomic.AtomicLong;
7+
8+
/**
9+
* Handles the progress counter for any operation involving reading from MarkLogic. A Spark job/application can only have
10+
* one reader, and thus DefaultSource handles resetting this counter before a new read job starts up. A static counter
11+
* is used so that all reader partitions in the same JVM can have their progress aggregated and logged.
12+
*/
13+
public class ReadProgressLogger extends ProgressLogger {
14+
15+
public static final AtomicLong progressCounter = new AtomicLong(0);
16+
17+
public ReadProgressLogger(long progressInterval, int batchSize, String message) {
18+
super(progressInterval, batchSize, message);
19+
}
20+
21+
@Override
22+
protected long getNewSum(long itemCount) {
23+
return progressCounter.addAndGet(itemCount);
24+
}
25+
}

src/main/java/com/marklogic/spark/WriteProgressLogger.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
*/
44
package com.marklogic.spark;
55

6-
import java.util.concurrent.atomic.AtomicInteger;
76
import java.util.concurrent.atomic.AtomicLong;
87

8+
/**
9+
* Handles the progress counter for any operation involving writing to MarkLogic. A Spark job/application can only have
10+
* one writer, and thus DefaultSource handles resetting this counter before a new write job starts up. A static counter
11+
* is used so that all writer partitions in the same JVM can have their progress aggregated and logged.
12+
*/
913
public class WriteProgressLogger extends ProgressLogger {
1014

1115
public static final AtomicLong progressCounter = new AtomicLong(0);

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import com.marklogic.client.query.SearchQueryDefinition;
1313
import com.marklogic.client.query.StructuredQueryBuilder;
1414
import com.marklogic.spark.Options;
15+
import com.marklogic.spark.ProgressLogger;
16+
import com.marklogic.spark.ReadProgressLogger;
1517
import org.apache.spark.sql.catalyst.InternalRow;
1618
import org.apache.spark.sql.connector.read.PartitionReader;
1719
import org.slf4j.Logger;
@@ -38,6 +40,7 @@ class ForestReader implements PartitionReader<InternalRow> {
3840
private final Integer limit;
3941

4042
// Only used for logging.
43+
private final ProgressLogger progressLogger;
4144
private final ForestPartition forestPartition;
4245
private long startTime;
4346

@@ -71,6 +74,11 @@ class ForestReader implements PartitionReader<InternalRow> {
7174
this.requestedMetadata = context.getRequestedMetadata();
7275
this.documentManager.setMetadataCategories(this.requestedMetadata);
7376
this.queryBuilder = client.newQueryManager().newStructuredQueryBuilder();
77+
78+
this.progressLogger = new ReadProgressLogger(
79+
context.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
80+
context.getBatchSize(), "Read documents: {}"
81+
);
7482
}
7583

7684
@Override
@@ -145,6 +153,7 @@ private DocumentPage readPage(List<String> uris) {
145153
if (logger.isTraceEnabled()) {
146154
logger.trace("Retrieved page of documents in {}ms from partition {}", (System.currentTimeMillis() - start), this.forestPartition);
147155
}
156+
this.progressLogger.logProgressIfNecessary(page.getPageSize());
148157
return page;
149158
}
150159

src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import com.marklogic.client.row.RowRecord;
88
import com.marklogic.client.type.PlanColumn;
99
import com.marklogic.spark.Options;
10+
import com.marklogic.spark.ProgressLogger;
11+
import com.marklogic.spark.ReadProgressLogger;
1012
import org.apache.spark.sql.catalyst.InternalRow;
1113
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1214
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -34,6 +36,11 @@ class OpticTriplesReader implements PartitionReader<InternalRow> {
3436
private final PlanBuilder op;
3537
private final String graphBaseIri;
3638

39+
// Only for logging
40+
private final long batchSize;
41+
private long progressCounter;
42+
private final ProgressLogger progressLogger;
43+
3744
private Iterator<RowRecord> currentRowIterator;
3845

3946
public OpticTriplesReader(ForestPartition forestPartition, DocumentContext context) {
@@ -51,6 +58,12 @@ public OpticTriplesReader(ForestPartition forestPartition, DocumentContext conte
5158
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_TRIPLES_FILTERED));
5259
}
5360
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered);
61+
62+
this.batchSize = context.getBatchSize();
63+
this.progressLogger = new ReadProgressLogger(
64+
context.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
65+
(int) this.batchSize, "Read triples: {}"
66+
);
5467
}
5568

5669
@Override
@@ -71,6 +84,11 @@ public boolean next() throws IOException {
7184
@Override
7285
public InternalRow get() {
7386
Object[] row = convertNextTripleIntoRow();
87+
progressCounter++;
88+
if (progressCounter >= batchSize) {
89+
progressLogger.logProgressIfNecessary(progressCounter);
90+
progressCounter = 0;
91+
}
7492
return new GenericInternalRow(row);
7593
}
7694

src/main/java/com/marklogic/spark/reader/optic/OpticPartitionReader.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.marklogic.client.row.RowManager;
23+
import com.marklogic.spark.Options;
24+
import com.marklogic.spark.ProgressLogger;
25+
import com.marklogic.spark.ReadProgressLogger;
2326
import com.marklogic.spark.reader.JsonRowDeserializer;
2427
import org.apache.spark.sql.catalyst.InternalRow;
2528
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -46,6 +49,9 @@ class OpticPartitionReader implements PartitionReader<InternalRow> {
4649
// Used solely for logging metrics
4750
private long totalRowCount;
4851
private long totalDuration;
52+
private long progressCounter;
53+
private final long batchSize;
54+
private final ProgressLogger progressLogger;
4955

5056
// Used solely for testing purposes; is never expected to be used in production. Intended to provide a way for
5157
// a test to get the count of rows returned from MarkLogic, which is important for ensuring that pushdown operations
@@ -54,12 +60,18 @@ class OpticPartitionReader implements PartitionReader<InternalRow> {
5460

5561
OpticPartitionReader(OpticReadContext opticReadContext, PlanAnalysis.Partition partition) {
5662
this.opticReadContext = opticReadContext;
63+
this.batchSize = opticReadContext.getBatchSize();
5764
this.partition = partition;
5865
this.rowManager = opticReadContext.connectToMarkLogic().newRowManager();
5966
// Nested values won't work with the JacksonParser used by JsonRowDeserializer, so we ask for type info to not
6067
// be in the rows.
6168
this.rowManager.setDatatypeStyle(RowManager.RowSetPart.HEADER);
6269
this.jsonRowDeserializer = new JsonRowDeserializer(opticReadContext.getSchema());
70+
71+
this.progressLogger = new ReadProgressLogger(
72+
opticReadContext.getNumericOption(Options.READ_LOG_PROGRESS, 0, 0),
73+
(int) opticReadContext.getBatchSize(), "Read rows: {}"
74+
);
6375
}
6476

6577
@Override
@@ -101,6 +113,11 @@ public boolean next() {
101113
public InternalRow get() {
102114
this.currentBucketRowCount++;
103115
this.totalRowCount++;
116+
this.progressCounter++;
117+
if (this.progressCounter >= this.batchSize) {
118+
progressLogger.logProgressIfNecessary(this.progressCounter);
119+
this.progressCounter = 0;
120+
}
104121
JsonNode row = rowIterator.next();
105122
return this.jsonRowDeserializer.deserializeJson(row.toString());
106123
}

src/main/java/com/marklogic/spark/reader/optic/OpticReadContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,14 @@ public class OpticReadContext extends ContextSupport {
6565
private StructType schema;
6666
private long serverTimestamp;
6767
private List<OpticFilter> opticFilters;
68+
private final long batchSize;
6869

6970
public OpticReadContext(Map<String, String> properties, StructType schema, int defaultMinPartitions) {
7071
super(properties);
7172
this.schema = schema;
7273

7374
final long partitionCount = getNumericOption(Options.READ_NUM_PARTITIONS, defaultMinPartitions, 1);
74-
final long batchSize = getNumericOption(Options.READ_BATCH_SIZE, DEFAULT_BATCH_SIZE, 0);
75+
this.batchSize = getNumericOption(Options.READ_BATCH_SIZE, DEFAULT_BATCH_SIZE, 0);
7576

7677
final String dslQuery = properties.get(Options.READ_OPTIC_QUERY);
7778
if (dslQuery == null || dslQuery.trim().length() < 1) {
@@ -283,4 +284,8 @@ PlanAnalysis getPlanAnalysis() {
283284
long getBucketCount() {
284285
return planAnalysis != null ? planAnalysis.getAllBuckets().size() : 0;
285286
}
287+
288+
long getBatchSize() {
289+
return batchSize;
290+
}
286291
}

src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.spark.AbstractIntegrationTest;
77
import com.marklogic.spark.ConnectorException;
88
import com.marklogic.spark.Options;
9+
import com.marklogic.spark.writer.AbstractWriteTest;
910
import org.apache.spark.SparkException;
1011
import org.apache.spark.sql.DataFrameReader;
1112
import org.apache.spark.sql.Dataset;
@@ -18,7 +19,7 @@
1819

1920
import static org.junit.jupiter.api.Assertions.*;
2021

21-
class ReadDocumentRowsTest extends AbstractIntegrationTest {
22+
class ReadDocumentRowsTest extends AbstractWriteTest {
2223

2324
@Test
2425
void readByCollection() {
@@ -37,6 +38,20 @@ void readByCollection() {
3738
assertEquals("Vivianne", doc.get("ForeName").asText());
3839
}
3940

41+
@Test
42+
void logProgress() {
43+
newWriter().save();
44+
45+
Dataset<Row> rows = startRead()
46+
.option(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, 1)
47+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "write-test")
48+
.option(Options.READ_BATCH_SIZE, 10)
49+
.option(Options.READ_LOG_PROGRESS, 50)
50+
.load();
51+
52+
assertEquals(200, rows.count());
53+
}
54+
4055
@Test
4156
void readViaDirectConnect() {
4257
Dataset<Row> rows = startRead()

src/test/java/com/marklogic/spark/reader/triples/ReadTriplesTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ void collections() {
9090
void twoCollections() {
9191
long count = startRead()
9292
.option(Options.READ_TRIPLES_COLLECTIONS, "http://example.org/graph,other-graph")
93+
.option(Options.READ_BATCH_SIZE, 5)
94+
.option(Options.READ_LOG_PROGRESS, 10)
9395
.load().count();
9496

9597
assertEquals(32, count, "Since both test triples files belong to 'test-config', and each also belongs to " +

0 commit comments

Comments
 (0)