Skip to content

Commit 2bb55af

Browse files
authored
Merge pull request #272 from marklogic/feature/reprocess-logging
Added progress logging for reading items
2 parents 871649e + 865737e commit 2bb55af

File tree

3 files changed

+15
-2
lines changed

3 files changed

+15
-2
lines changed

src/main/java/com/marklogic/spark/DefaultSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
101101
ReadProgressLogger.initialize(readProgressInterval, "Rows read: {}");
102102
return new MarkLogicTable(schema, properties);
103103
} else if (Util.isReadWithCustomCodeOperation(properties)) {
104-
// Not yet logging progress for reading with custom code, as it's assumed the user will then write with
105-
// custom code.
104+
ReadProgressLogger.initialize(readProgressInterval, "Items read: {}");
106105
return new MarkLogicTable(schema, properties);
107106
}
108107

src/main/java/com/marklogic/spark/reader/customcode/CustomCodePartitionReader.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.marklogic.client.eval.EvalResultIterator;
55
import com.marklogic.client.eval.ServerEvaluationCall;
66
import com.marklogic.spark.Options;
7+
import com.marklogic.spark.ReadProgressLogger;
78
import com.marklogic.spark.reader.JsonRowDeserializer;
89
import org.apache.spark.sql.catalyst.InternalRow;
910
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -19,6 +20,10 @@ class CustomCodePartitionReader implements PartitionReader<InternalRow> {
1920
private final JsonRowDeserializer jsonRowDeserializer;
2021
private final DatabaseClient databaseClient;
2122

23+
// Only needed for logging progress.
24+
private final long batchSize;
25+
private long progressCounter;
26+
2227
public CustomCodePartitionReader(CustomCodeContext customCodeContext, String partition) {
2328
this.databaseClient = customCodeContext.connectToMarkLogic();
2429
this.serverEvaluationCall = customCodeContext.buildCall(
@@ -31,6 +36,8 @@ public CustomCodePartitionReader(CustomCodeContext customCodeContext, String par
3136
this.serverEvaluationCall.addVariable("PARTITION", partition);
3237
}
3338

39+
this.batchSize = customCodeContext.getNumericOption(Options.READ_BATCH_SIZE, 1, 1);
40+
3441
this.isCustomSchema = customCodeContext.isCustomSchema();
3542
this.jsonRowDeserializer = new JsonRowDeserializer(customCodeContext.getSchema());
3643
}
@@ -49,6 +56,11 @@ public InternalRow get() {
4956
if (this.isCustomSchema) {
5057
return this.jsonRowDeserializer.deserializeJson(val);
5158
}
59+
progressCounter++;
60+
if (progressCounter >= batchSize) {
61+
ReadProgressLogger.logProgressIfNecessary(progressCounter);
62+
progressCounter = 0;
63+
}
5264
return new GenericInternalRow(new Object[]{UTF8String.fromString(val)});
5365
}
5466

src/test/java/com/marklogic/spark/reader/customcode/ReadWithCustomCodeTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ void verifyTimeoutWorks() {
197197
private List<Row> readRows(String option, String value) {
198198
return startRead()
199199
.option(option, value)
200+
// Adding these only for manual inspection of logging and to ensure they don't cause errors.
201+
.option(Options.READ_LOG_PROGRESS, "1")
200202
.load()
201203
.collectAsList();
202204
}

0 commit comments

Comments
 (0)