Skip to content

Commit 2ec25bb

Browse files
committed
threadCount now refers to total thread count
Will communicate this change in the release notes, as it's changing how a public option works, but doing so in a way that better matches user expectations. `threadCountPerPartition` is then offered for what is expected to be a rare case where a user wants to configure the number of threads per partition.
1 parent f152830 commit 2ec25bb

File tree

7 files changed

+47
-25
lines changed

7 files changed

+47
-25
lines changed

docs/configuration.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ The following options control how the connector writes rows as documents to Mark
193193
| spark.marklogic.write.fileRows.documentType | Forces a document type when MarkLogic does not recognize a URI extension; must be one of `JSON`, `XML`, or `TEXT`. |
194194
| spark.marklogic.write.jsonRootName | As of 2.3.0, specifies a root field name when writing JSON documents based on arbitrary rows. |
195195
| spark.marklogic.write.temporalCollection | Name of a temporal collection to assign each document to. |
196-
| spark.marklogic.write.threadCount | The number of threads used within each partition to send documents to MarkLogic; defaults to 4. |
196+
| spark.marklogic.write.threadCount | The number of threads used across all partitions to send documents to MarkLogic; defaults to 4. |
197+
| spark.marklogic.write.threadCountPerPartition | New in 2.3.0; the number of threads used per partition to send documents to MarkLogic. |
197198
| spark.marklogic.write.transform | Name of a REST transform to apply to each document. |
198199
| spark.marklogic.write.transformParams | Comma-delimited string of transform parameter names and values - e.g. param1,value1,param2,value2 . |
199200
| spark.marklogic.write.transformParamsDelimiter | Delimiter to use instead of a command for the `transformParams` option. |

docs/writing.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,15 @@ The connector uses MarkLogic's
244244
following options can be set to adjust how the connector performs when writing data:
245245

246246
- `spark.marklogic.write.batchSize` = the number of documents written in one call to MarkLogic; defaults to 100.
247-
- `spark.marklogic.write.threadCount` = the number of threads used by each partition to write documents to MarkLogic;
247+
- `spark.marklogic.write.threadCount` = the number of threads used across all partitions to write documents to MarkLogic;
248248
defaults to 4.
249+
- `spark.marklogic.write.threadCountPerPartition` = the number of threads to use per partition to write documents to
250+
MarkLogic. If set, will be used instead of `spark.marklogic.write.threadCount`.
251+
252+
Prior to the 2.3.0 release, `spark.marklogic.write.threadCount` configured a number of threads per partition. Based on
253+
feedback, this was changed to the number of total threads used across all partitions to match what users typically
254+
expect "thread count" to mean in the context of writing to MarkLogic. `spark.marklogic.write.threadCountPerPartition`
255+
was then added for users who do wish to configure a number of threads per Spark partition.
249256

250257
These options are in addition to the number of partitions within the Spark DataFrame that is being written to
251258
MarkLogic. For each partition in the DataFrame, a separate instance of a MarkLogic batch writer is created, each
@@ -264,7 +271,7 @@ the connector can directly connect to each host in the cluster.
264271

265272
The rule of thumb above can thus be expressed as:
266273

267-
Number of partitions * Value of spark.marklogic.write.threadCount <= Number of hosts * number of app server threads
274+
Value of spark.marklogic.write.threadCount <= Number of hosts * number of app server threads
268275

269276
### Using a load balancer
270277

src/main/java/com/marklogic/spark/Options.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public abstract class Options {
8787

8888
public static final String WRITE_BATCH_SIZE = "spark.marklogic.write.batchSize";
8989
public static final String WRITE_THREAD_COUNT = "spark.marklogic.write.threadCount";
90-
public static final String WRITE_TOTAL_THREAD_COUNT = "spark.marklogic.write.totalThreadCount";
90+
public static final String WRITE_THREAD_COUNT_PER_PARTITION = "spark.marklogic.write.threadCountPerPartition";
9191
public static final String WRITE_ABORT_ON_FAILURE = "spark.marklogic.write.abortOnFailure";
9292

9393
// For logging progress when writing documents or processing with custom code.

src/main/java/com/marklogic/spark/writer/MarkLogicWrite.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,13 @@ private DataWriterFactory determineWriterFactory() {
127127
}
128128

129129
private void logPartitionAndThreadCounts(int numPartitions) {
130-
int totalThreadCount = writeContext.getTotalThreadCount();
131-
if (totalThreadCount > 0) {
132-
int threadCountPerPartition = writeContext.getThreadCountPerPartition();
130+
int userDefinedPartitionThreadCount = writeContext.getUserDefinedThreadCountPerPartition();
131+
if (userDefinedPartitionThreadCount > 0) {
133132
Util.MAIN_LOGGER.info("Number of partitions: {}; total thread count: {}; thread count per partition: {}",
134-
numPartitions, totalThreadCount, threadCountPerPartition);
133+
numPartitions, numPartitions * userDefinedPartitionThreadCount, userDefinedPartitionThreadCount);
135134
} else {
136-
int threadCount = writeContext.getThreadCount();
137-
Util.MAIN_LOGGER.info("Number of partitions: {}; thread count per partition: {}; total threads used for writing: {}",
138-
numPartitions, threadCount, numPartitions * threadCount);
135+
Util.MAIN_LOGGER.info("Number of partitions: {}; total threads used for writing: {}",
136+
numPartitions, writeContext.getTotalThreadCount());
139137
}
140138
}
141139

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,41 @@ public StructType getSchema() {
7777
return schema;
7878
}
7979

80-
int getThreadCount() {
80+
/**
81+
* @return the total number of threads to use across all partitions. This is typically how a user thinks in terms
82+
* of, as they are not likely to know how many partitions will be created. But they will typically know how many
83+
* hosts are in their MarkLogic cluster and how many threads are available to an app server on each host.
84+
*/
85+
int getTotalThreadCount() {
8186
return (int) getNumericOption(Options.WRITE_THREAD_COUNT, 4, 1);
8287
}
8388

84-
int getTotalThreadCount() {
85-
return (int) getNumericOption(Options.WRITE_TOTAL_THREAD_COUNT, 0, 1);
89+
/**
90+
* @return the thread count to use per partition where a user has specified the total thread count across all
91+
* partitions.
92+
*/
93+
int getCalculatedThreadCountPerPartition() {
94+
int threadCount = getTotalThreadCount();
95+
if (this.numPartitions > 0) {
96+
return (int) Math.ceil((double) threadCount / (double) numPartitions);
97+
}
98+
return threadCount;
8699
}
87100

88-
int getThreadCountPerPartition() {
89-
int totalThreadCount = getTotalThreadCount();
90-
if (totalThreadCount > 0 && this.numPartitions > 0) {
91-
return (int) Math.ceil((double) totalThreadCount / (double) numPartitions);
92-
}
93-
return 0;
101+
/**
102+
* @return the thread count to use per partition where a user has used an option to explicitly define how many
103+
* threads should be used by a partition.
104+
*/
105+
int getUserDefinedThreadCountPerPartition() {
106+
return (int) getNumericOption(Options.WRITE_THREAD_COUNT_PER_PARTITION, 0, 1);
94107
}
95108

96109
WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
97-
final int threadCount = getTotalThreadCount() > 0 ?
98-
getThreadCountPerPartition() : getThreadCount();
110+
// If the user told us how many threads they want per partition (we expect this to be rare), then use that.
111+
// Otherwise, use the calculated number of threads per partition based on the total thread count that either
112+
// the user configured or using the default value for that option.
113+
final int threadCount = getUserDefinedThreadCountPerPartition() > 0 ?
114+
getUserDefinedThreadCountPerPartition() : getCalculatedThreadCountPerPartition();
99115

100116
Util.MAIN_LOGGER.info("Creating new batcher with thread count of {} and batch size of {}.", threadCount, batchSize);
101117
WriteBatcher writeBatcher = dataMovementManager

src/main/resources/marklogic-spark-messages.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ spark.marklogic.write.graph=
1212
spark.marklogic.write.graphOverride=
1313
spark.marklogic.write.jsonRootName=
1414
spark.marklogic.write.threadCount=
15-
spark.marklogic.write.totalThreadCount=
15+
spark.marklogic.write.threadCountPerPartition=
1616
spark.marklogic.write.transformParams=
1717
spark.marklogic.write.uriTemplate=
1818
spark.marklogic.write.xmlRootName=

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ void batchSizeGreaterThanNumberOfRowsToWrite() {
6868
@Test
6969
void twoPartitions() {
7070
newWriter(2)
71-
.option(Options.WRITE_TOTAL_THREAD_COUNT, 16)
71+
.option(Options.WRITE_THREAD_COUNT_PER_PARTITION, 8)
7272
.option(Options.WRITE_BATCH_SIZE, 10)
7373
.save();
7474

@@ -80,7 +80,7 @@ void twoPartitions() {
8080
@Test
8181
void insufficientPrivilegeForOtherDatabase() {
8282
DataFrameWriter writer = newWriter(2)
83-
.option(Options.WRITE_TOTAL_THREAD_COUNT, 16)
83+
.option(Options.WRITE_THREAD_COUNT_PER_PARTITION, 8)
8484
.option(Options.WRITE_BATCH_SIZE, 10)
8585
.option(Options.CLIENT_URI, "spark-test-user:spark@localhost:8016/Documents");
8686

0 commit comments

Comments
 (0)