Skip to content

Commit a3ea84e

Browse files
authored
Merge pull request #173 from marklogic/feature/logging-improvement
Added some logging for writing documents
2 parents fe76f56 + 944b72e commit a3ea84e

File tree

4 files changed

+11
-4
lines changed

4 files changed

+11
-4
lines changed

src/main/java/com/marklogic/spark/reader/document/DocumentBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class DocumentBatch implements Batch {
4747
this.partitions = planner.makePartitions(estimate, serverTimestamp, forests);
4848

4949
if (Util.MAIN_LOGGER.isInfoEnabled()) {
50-
logger.info("Created {} partitions; query estimate: {}", partitions.length, estimate);
50+
Util.MAIN_LOGGER.info("Created {} partitions; query estimate: {}", partitions.length, estimate);
5151
}
5252
}
5353

src/main/java/com/marklogic/spark/writer/MarkLogicWrite.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public boolean useCommitCoordinator() {
6060

6161
@Override
6262
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
63-
Util.MAIN_LOGGER.info("Number of partitions: {}", info.numPartitions());
63+
int partitions = info.numPartitions();
64+
int threadCount = writeContext.getThreadCount();
65+
Util.MAIN_LOGGER.info("Number of partitions: {}; thread count per partition: {}; total threads used for writing: {}",
66+
partitions, threadCount, partitions * threadCount);
6467
return (DataWriterFactory) determineWriterFactory();
6568
}
6669

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,15 @@ public StructType getSchema() {
6161
return schema;
6262
}
6363

64+
int getThreadCount() {
65+
return (int)getNumericOption(Options.WRITE_THREAD_COUNT, 4, 1);
66+
}
67+
6468
WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
6569
WriteBatcher writeBatcher = dataMovementManager
6670
.newWriteBatcher()
6771
.withBatchSize((int) getNumericOption(Options.WRITE_BATCH_SIZE, 100, 1))
68-
.withThreadCount((int) getNumericOption(Options.WRITE_THREAD_COUNT, 4, 1))
72+
.withThreadCount(getThreadCount())
6973
// WriteBatcherImpl has its own warn-level logging which is a bit verbose, including more than just the
7074
// message from the server. This is intended to always show up and be associated with our Spark connector
7175
// and also to be more brief, just capturing the main message from the server.

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void testWithCustomConfig() {
9898
@Test
9999
void invalidThreadCount() {
100100
DataFrameWriter writer = newWriter().option(Options.WRITE_THREAD_COUNT, 0);
101-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
101+
ConnectorException ex = assertThrows(ConnectorException.class, () -> writer.save());
102102
assertEquals("Value of 'spark.marklogic.write.threadCount' option must be 1 or greater.", ex.getMessage());
103103
verifyNoDocsWereWritten();
104104
}

0 commit comments

Comments
 (0)