Skip to content

Commit 9c00ab4

Browse files
authored
Merge pull request #164 from marklogic/feature/logging-tweak
Added logging of how many documents were written
2 parents 78f7873 + 89cd56d commit 9c00ab4

File tree

5 files changed

+54
-48
lines changed

5 files changed

+54
-48
lines changed

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.marklogic.client.query.QueryDefinition;
1212
import com.marklogic.client.query.SearchQueryDefinition;
1313
import com.marklogic.client.query.StructuredQueryBuilder;
14-
import com.marklogic.spark.Util;
1514
import org.apache.spark.sql.catalyst.InternalRow;
1615
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1716
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -93,10 +92,9 @@ public boolean next() {
9392
closeCurrentDocumentPage();
9493
List<String> uris = getNextBatchOfUris();
9594
if (uris.isEmpty()) {
96-
// TBD on whether this should be info/debug.
97-
if (Util.MAIN_LOGGER.isInfoEnabled()) {
95+
if (logger.isDebugEnabled()) {
9896
long duration = System.currentTimeMillis() - startTime;
99-
Util.MAIN_LOGGER.info("Read {} documents from partition {} in {}ms", docCount, forestPartition, duration);
97+
logger.debug("Read {} documents from partition {} in {}ms", docCount, forestPartition, duration);
10098
}
10199
return false;
102100
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.marklogic.spark.writer;
2+
3+
import org.apache.spark.sql.connector.write.WriterCommitMessage;
4+
5+
public class CommitMessage implements WriterCommitMessage {
6+
7+
private final String action;
8+
private final int docCount;
9+
private final int partitionId;
10+
private final long taskId;
11+
private final long epochId;
12+
13+
public CommitMessage(String action, int docCount, int partitionId, long taskId, long epochId) {
14+
this.action = action;
15+
this.docCount = docCount;
16+
this.partitionId = partitionId;
17+
this.taskId = taskId;
18+
this.epochId = epochId;
19+
}
20+
21+
public String getAction() {
22+
return action;
23+
}
24+
25+
public int getDocCount() {
26+
return docCount;
27+
}
28+
29+
@Override
30+
public String toString() {
31+
return epochId != 0L ?
32+
String.format("[partitionId: %d; taskId: %d; epochId: %d]; docCount: %d", partitionId, taskId, epochId, docCount) :
33+
String.format("[partitionId: %d; taskId: %d]; docCount: %d", partitionId, taskId, docCount);
34+
}
35+
}

src/main/java/com/marklogic/spark/writer/MarkLogicWrite.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,18 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
5353

5454
@Override
5555
public void commit(WriterCommitMessage[] messages) {
56-
if (messages != null && messages.length > 0 && logger.isDebugEnabled()) {
57-
logger.debug("Commit messages received: {}", Arrays.asList(messages));
56+
if (messages != null && messages.length > 0) {
57+
if (logger.isDebugEnabled()) {
58+
logger.debug("Commit messages received: {}", Arrays.asList(messages));
59+
}
60+
final String action = ((CommitMessage) messages[0]).getAction();
61+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
62+
int count = 0;
63+
for (WriterCommitMessage message : messages) {
64+
count += ((CommitMessage) message).getDocCount();
65+
}
66+
Util.MAIN_LOGGER.info("{} {} documents", action, count);
67+
}
5868
}
5969
}
6070

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void write(InternalRow row) throws IOException {
9090

9191
@Override
9292
public WriterCommitMessage commit() throws IOException {
93-
CommitMessage message = new CommitMessage(docCount, partitionId, taskId, epochId);
93+
CommitMessage message = new CommitMessage("Wrote", docCount, partitionId, taskId, epochId);
9494
if (logger.isDebugEnabled()) {
9595
logger.debug("Committing {}", message);
9696
}
@@ -129,25 +129,4 @@ private void stopJobAndRelease() {
129129
this.databaseClient.release();
130130
}
131131
}
132-
133-
private static class CommitMessage implements WriterCommitMessage {
134-
private int docCount;
135-
private int partitionId;
136-
private long taskId;
137-
private long epochId;
138-
139-
public CommitMessage(int docCount, int partitionId, long taskId, long epochId) {
140-
this.docCount = docCount;
141-
this.partitionId = partitionId;
142-
this.taskId = taskId;
143-
this.epochId = epochId;
144-
}
145-
146-
@Override
147-
public String toString() {
148-
return epochId != 0L ?
149-
String.format("[partitionId: %d; taskId: %d; epochId: %d]; docCount: %d", partitionId, taskId, epochId, docCount) :
150-
String.format("[partitionId: %d; taskId: %d]; docCount: %d", partitionId, taskId, docCount);
151-
}
152-
}
153132
}

src/main/java/com/marklogic/spark/writer/customcode/CustomCodeWriter.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.marklogic.spark.Options;
1414
import com.marklogic.spark.Util;
1515
import com.marklogic.spark.reader.customcode.CustomCodeContext;
16+
import com.marklogic.spark.writer.CommitMessage;
1617
import org.apache.spark.sql.catalyst.InternalRow;
1718
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
1819
import org.apache.spark.sql.connector.write.DataWriter;
@@ -41,6 +42,7 @@ class CustomCodeWriter implements DataWriter<InternalRow> {
4142
private final int partitionId;
4243
private final long taskId;
4344
private final long epochId;
45+
private int itemCount;
4446

4547
CustomCodeWriter(CustomCodeContext customCodeContext, int partitionId, long taskId, long epochId) {
4648
this.customCodeContext = customCodeContext;
@@ -66,6 +68,7 @@ public void write(InternalRow row) {
6668
String rowValue = customCodeContext.isCustomSchema() ?
6769
convertRowToJSONString(row) :
6870
row.getString(0);
71+
itemCount++;
6972

7073
this.currentBatch.add(rowValue);
7174

@@ -77,7 +80,7 @@ public void write(InternalRow row) {
7780
@Override
7881
public WriterCommitMessage commit() {
7982
flush();
80-
CommitMessage message = new CommitMessage(partitionId, taskId, epochId);
83+
CommitMessage message = new CommitMessage("Processed", itemCount, partitionId, taskId, epochId);
8184
if (logger.isDebugEnabled()) {
8285
logger.debug("Committing {}", message);
8386
}
@@ -172,23 +175,4 @@ private void executeCall(ServerEvaluationCall call) {
172175
Util.MAIN_LOGGER.error(String.format("Unable to process row; cause: %s", ex.getMessage()));
173176
}
174177
}
175-
176-
private static class CommitMessage implements WriterCommitMessage {
177-
private int partitionId;
178-
private long taskId;
179-
private long epochId;
180-
181-
public CommitMessage(int partitionId, long taskId, long epochId) {
182-
this.partitionId = partitionId;
183-
this.taskId = taskId;
184-
this.epochId = epochId;
185-
}
186-
187-
@Override
188-
public String toString() {
189-
return epochId != 0L ?
190-
String.format("[partitionId: %d; taskId: %d; epochId: %d]", partitionId, taskId, epochId) :
191-
String.format("[partitionId: %d; taskId: %d]", partitionId, taskId);
192-
}
193-
}
194178
}

0 commit comments

Comments
 (0)