Skip to content

Commit 033a9b5

Browse files
committed
Removed queryFormat and improved docs on performance
Also changed default number of partitions per forest to 4 based on the default setup of 3 forests per host and 32 app server threads.
1 parent d6fe65a commit 033a9b5

File tree

6 files changed

+36
-29
lines changed

6 files changed

+36
-29
lines changed

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ The following options control how the connector reads document rows from MarkLog
153153
| spark.marklogic.read.documents.transform | Name of a [MarkLogic REST transform](https://docs.marklogic.com/guide/rest-dev/transforms) to apply to each matching document. |
154154
| spark.marklogic.read.documents.transformParams | Comma-delimited sequence of transform parameter names and values - e.g. `param1,value1,param2,value`. |
155155
| spark.marklogic.read.documents.transformParamsDelimiter | Delimiter for transform parameters; defaults to a comma. |
156+
| spark.marklogic.read.documents.partitionsPerForest | Number of Spark partition readers to create per forest; defaults to 4. |
156157

157158
## Write options
158159

docs/reading-data/documents.md

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ df = spark.read.format("marklogic") \
3131
df.show()
3232
```
3333

34-
You can also submit structured queries, serialized CTS queries, and combined queries via
34+
You can also submit a [structured query](https://docs.marklogic.com/guide/search-dev/structured-query#), a
35+
[serialized CTS query](https://docs.marklogic.com/guide/rest-dev/search#id_30577), or a
36+
[combined query](https://docs.marklogic.com/guide/rest-dev/search#id_69918) via
3537
`spark.marklogic.read.documents.query`, which can be combined with a string query as well:
3638

3739
```
@@ -185,15 +187,26 @@ doc['Department']
185187
## Understanding performance
186188

187189
The connector mimics the behavior of the [MarkLogic Data Movement SDK](https://docs.marklogic.com/guide/java/data-movement)
188-
by creating a Spark partition per forest in the database associated with your REST API app server. Each partition reader
189-
will return all matching documents from its associated forest. The option `spark.marklogic.read.batchSize` controls how
190-
many documents will be returned in each call to MarkLogic; its value defaults to 500. For smaller documents,
191-
particularly those with 10 elements or fewer, you may find a batch size of 1,000 or even 10,000 to provide better
192-
performance.
193-
194-
The `spark.marklogic.read.numPartitions` option does not impact performance when reading document rows, as 1 partition
195-
is always created for each forest. It is not possible for 2 or more partition readers to read from the same forest.
196-
197-
You can adjust the level of parallelism by controlling how many threads Spark uses for executing partition reads.
190+
by creating Spark partition readers that are assigned to a specific forest. By default, the connector will create
191+
4 readers per forest. You can use the `spark.marklogic.read.documents.partitionsPerForest` option to control
192+
the number of readers. You should adjust this based on your cluster configuration. For example,a default REST API app
193+
server will have 32 server threads and 3 forests per host. 4 partition readers will thus consume 12 of the 32 server
194+
threads. If the app server is not servicing any other requests, performance will typically be improved by configuring
195+
8 partitions per forest. Note that the `spark.marklogic.read.numPartitions` option does not have any impact;
196+
that is only used when reading via an Optic query.
197+
198+
Each partition reader will make one to many calls to MarkLogic to retrieve documents. The
199+
`spark.marklogic.read.batchSize` option controls how many documents will be retrieved in a call. The value defaults
200+
to 500. For smaller documents, particularly those with 10 elements or fewer, you may find a batch size of 1,000 or
201+
even 10,000 to provide better performance.
202+
203+
As an example, consider a query that matches 120,000 documents in a cluster with 3 hosts and 2 forests on each host.
204+
The connector will default to creating 24 partitions - 4 for each of the 6 forests. Each partition reader will read
205+
approximately 5,000 documents. With a default batch size of 500, each partition reader will make approximately 10
206+
calls to MarkLogic (these numbers are all approximate as a forest may have slightly more or less than 20,000 documents).
207+
Depending on the size of the documents and whether the cluster is servicing other requests, performance may improve
208+
with more partition readers and a higher batch size.
209+
210+
You can also adjust the level of parallelism by controlling how many threads Spark uses for executing partition reads.
198211
Please see your Spark distribution's documentation for further information.
199212

src/main/java/com/marklogic/spark/Options.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public abstract class Options {
4848
public static final String READ_DOCUMENTS_STRING_QUERY = "spark.marklogic.read.documents.stringQuery";
4949
// Corresponds to the complex query submitted via the request body at https://docs.marklogic.com/REST/POST/v1/search .
5050
public static final String READ_DOCUMENTS_QUERY = "spark.marklogic.read.documents.query";
51-
public static final String READ_DOCUMENTS_QUERY_FORMAT = "spark.marklogic.read.documents.queryFormat";
5251
public static final String READ_DOCUMENTS_OPTIONS = "spark.marklogic.read.documents.options";
5352
public static final String READ_DOCUMENTS_DIRECTORY = "spark.marklogic.read.documents.directory";
5453
public static final String READ_DOCUMENTS_TRANSFORM = "spark.marklogic.read.documents.transform";

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ SearchQueryDefinition buildSearchQuery(DatabaseClient client) {
5151
return new SearchQueryBuilder()
5252
.withStringQuery(props.get(Options.READ_DOCUMENTS_STRING_QUERY))
5353
.withQuery(props.get(Options.READ_DOCUMENTS_QUERY))
54-
.withQueryFormat(props.get(Options.READ_DOCUMENTS_QUERY_FORMAT))
5554
.withCollections(props.get(Options.READ_DOCUMENTS_COLLECTIONS))
5655
.withDirectory(props.get(Options.READ_DOCUMENTS_DIRECTORY))
5756
.withOptionsName(props.get(Options.READ_DOCUMENTS_OPTIONS))
@@ -70,7 +69,7 @@ int getBatchSize() {
7069
}
7170

7271
int getPartitionsPerForest() {
73-
int defaultPartitionsPerForest = 2;
72+
int defaultPartitionsPerForest = 4;
7473
return (int) getNumericOption(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, defaultPartitionsPerForest, 1);
7574
}
7675
}

src/main/java/com/marklogic/spark/reader/document/SearchQueryBuilder.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ public class SearchQueryBuilder {
1414

1515
private String stringQuery;
1616
private String query;
17-
private Format queryFormat;
1817
private String[] collections;
1918
private String directory;
2019
private String optionsName;
@@ -51,13 +50,6 @@ public SearchQueryBuilder withQuery(String query) {
5150
return this;
5251
}
5352

54-
public SearchQueryBuilder withQueryFormat(String format) {
55-
if (format != null) {
56-
this.queryFormat = Format.valueOf(format.toUpperCase());
57-
}
58-
return this;
59-
}
60-
6153
public SearchQueryBuilder withCollections(String value) {
6254
if (value != null) {
6355
this.collections = value.split(",");
@@ -92,13 +84,15 @@ public SearchQueryBuilder withTransformParamsDelimiter(String delimiter) {
9284

9385
private QueryDefinition buildQueryDefinition(DatabaseClient client) {
9486
final QueryManager queryManager = client.newQueryManager();
95-
// The Java Client misleadingly suggests a distinction amongst the 3 complex queries - structured,
96-
// serialized CTS, and combined - but the REST API does not.
9787
if (query != null) {
9888
StringHandle queryHandle = new StringHandle(query);
99-
if (queryFormat != null) {
100-
queryHandle.withFormat(queryFormat);
89+
// v1/search assumes XML by default, so only need to set to JSON if the query is JSON.
90+
if (queryIsJSON()) {
91+
queryHandle.withFormat(Format.JSON);
10192
}
93+
// The Java Client misleadingly suggests a distinction amongst the 3 complex queries - structured,
94+
// serialized CTS, and combined - but the REST API does not. Thus, a RawStructuredQueryDefinition will work
95+
// for any of the 3 query types.
10296
RawStructuredQueryDefinition queryDefinition = queryManager.newRawStructuredQueryDefinition(queryHandle);
10397
if (stringQuery != null && stringQuery.length() > 0) {
10498
queryDefinition.withCriteria(stringQuery);
@@ -112,6 +106,10 @@ private QueryDefinition buildQueryDefinition(DatabaseClient client) {
112106
return queryDefinition;
113107
}
114108

109+
private boolean queryIsJSON() {
110+
return query != null && query.trim().startsWith("{");
111+
}
112+
115113
private void applyCommonQueryConfig(QueryDefinition queryDefinition) {
116114
if (optionsName != null && optionsName.trim().length() > 0) {
117115
queryDefinition.setOptionsName(optionsName);

src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ void structuredQueryJSON() {
136136
String query = "{ \"query\": { \"queries\": [{ \"term-query\": { \"text\": [ \"Moria\" ] } }] } }";
137137
List<Row> rows = startRead()
138138
.option(Options.READ_DOCUMENTS_QUERY, query)
139-
.option(Options.READ_DOCUMENTS_QUERY_FORMAT, "jsON")
140139
.load()
141140
.collectAsList();
142141

@@ -164,7 +163,6 @@ void serializedCTSQueryJSON() {
164163

165164
List<Row> rows = startRead()
166165
.option(Options.READ_DOCUMENTS_QUERY, query)
167-
.option(Options.READ_DOCUMENTS_QUERY_FORMAT, "JSON")
168166
.load()
169167
.collectAsList();
170168

@@ -197,7 +195,6 @@ void combinedQueryJSON() {
197195

198196
List<Row> rows = startRead()
199197
.option(Options.READ_DOCUMENTS_QUERY, combinedQuery.toString())
200-
.option(Options.READ_DOCUMENTS_QUERY_FORMAT, "json")
201198
.load()
202199
.collectAsList();
203200

0 commit comments

Comments
 (0)