Skip to content

Commit e45ed4d

Browse files
authored
Merge pull request #75 from marklogic/feature/488-batch-size
DEVEXP-488 Setting batch size to zero when pushing down aggregate
2 parents 0665855 + a1c9c4e commit e45ed4d

12 files changed

+169
-65
lines changed

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ information on how data is read from MarkLogic.
8585
| Option | Description |
8686
| --- |---------------------------------------------------------------------------------------------------|
8787
| spark.marklogic.read.opticQuery | Required; the Optic DSL query to run for retrieving rows; must use `op.fromView` as the accessor. |
88-
| spark.marklogic.read.numPartitions | The number of Spark partitions to create; defaults to `spark.default.parallelism` . |
89-
| spark.marklogic.read.batchSize | Approximate number of rows to retrieve in each call to MarkLogic; defaults to 10000. |
88+
| spark.marklogic.read.numPartitions | The number of Spark partitions to create; defaults to `spark.default.parallelism`. |
89+
| spark.marklogic.read.batchSize | Approximate number of rows to retrieve in each call to MarkLogic; defaults to 100000. |
9090
| spark.marklogic.read.pushDownAggregates | Whether to push down aggregate operations to MarkLogic; defaults to `true`. Set to `false` to prevent aggregates from being pushed down to MarkLogic. |
9191
## Write options
9292

docs/reading.md

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,23 @@ The MarkLogic Spark connector allows for data to be retrieved from MarkLogic as
88
[Optic DSL query](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_46710). The
99
sections below provide more detail on configuring how data is retrieved and converted into a Spark DataFrame.
1010

11-
## Query requirements
11+
## Basic read operation
12+
13+
As shown in the [Getting Started with PySpark guide](getting-started/pyspark.md), a basic read operation will define
14+
how the connector should connect to MarkLogic, the MarkLogic Optic query to run, and zero or more other options:
15+
16+
```
17+
df = spark.read.format("com.marklogic.spark") \
18+
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8020") \
19+
.option("spark.marklogic.read.opticQuery", "op.fromView('example', 'employee')") \
20+
.load()
21+
```
22+
23+
As shown above, `format`, `spark.marklogic.client.uri` (or the other `spark.marklogic.client` options
24+
that can be used to define the connection details), and `spark.marklogic.read.opticQuery` are always required. The
25+
following sections provide more details about these and other options that can be set.
26+
27+
## Optic query requirements
1228

1329
As of the 2.0 release of the connector, the Optic query must use the
1430
[op.fromView](https://docs.marklogic.com/op.fromView) accessor function. The query must also adhere to the
@@ -87,7 +103,7 @@ stream.stop()
87103
Micro-batches are constructed based on the number of partitions and user-defined batch size; more information on each
88104
setting can be found in section below on tuning performance. Each request to MarkLogic that is made in "batch read"
89105
mode - i.e. when using Spark's `read` function instead of `readStream` - corresponds to a micro-batch when reading
90-
data via a stream. In the example above, which uses the connector's default batch size of 10,000 rows and 2
106+
data via a stream. In the example above, which uses the connector's default batch size of 100,000 rows and 2
91107
partitions, 2 calls are made to MarkLogic, resulting in two micro-batches.
92108

93109
The number of micro-batches can be determined by enabling info-level logging and looking for a message similar to:
@@ -169,40 +185,46 @@ correct result, please [file an issue with this project](https://github.com/mark
169185

170186
## Tuning performance
171187

172-
The primary factor affecting how quickly the connector can retrieve rows is MarkLogic's ability to process your Optic
173-
query. The
174-
[MarkLogic Optic performance documentation](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_91398) can help with
175-
optimizing your query to maximize performance.
188+
The primary factor affecting connector performance when reading rows is how many requests are made to MarkLogic. In
189+
general, performance will be best when minimizing the number of requests to MarkLogic while ensuring that no single
190+
request attempts to return or process too much data.
176191

177-
Two [configuration options](configuration.md) in the connector will also impact performance. First, the
192+
Two [configuration options](configuration.md) control how many requests are made. First, the
178193
`spark.marklogic.read.numPartitions` option controls how many partitions are created. For each partition, Spark
179194
will use a separate task to send requests to MarkLogic to retrieve rows matching your Optic DSL query. Second, the
180195
`spark.marklogic.read.batchSize` option controls approximately how many rows will be retrieved in each call to
181196
MarkLogic.
182197

183-
These two options impact each other in terms of how many tasks are used to make requests to MarkLogic. For example,
184-
consider an Optic query that matches 1 million rows in MarkLogic, a partition count of 10, and a batch size of
185-
10,000 rows (the default value). This configuration will result in the connector creating 10 Spark partition readers,
186-
each of which will retrieve approximately 100,000 unique rows. And with a batch size of 10,000, each partition
198+
To understand how these options control the number of requests to MarkLogic,
199+
consider an Optic query that matches 10 million rows in MarkLogic, a partition count of 10, and a batch size of
200+
100,000 rows (the default value). This configuration will result in the connector creating 10 Spark partition readers,
201+
each of which will retrieve approximately 1,000,000 unique rows. And with a batch size of 100,000, each partition
187202
reader will make approximately 10 calls to MarkLogic to retrieve these rows, for a total of 100 calls across all
188-
partitions.
203+
partitions.
189204

190-
Performance can thus be tested by varying the number of partitions and the batch size. In general, increasing the
191-
number of partitions should help performance as the number of matching rows increases. A single partition may suffice
192-
for a query that returns thousands of rows or fewer, while a query that returns hundreds of millions of rows will
193-
benefit from dozens of partitions or more. The ideal settings will depend on your Spark and MarkLogic environments
194-
along with the complexity of your Optic query. Testing should be performed with different queries, partition counts,
195-
and batch sizes to determine the optimal settings.
205+
Performance should be tested by varying the number of partitions and the batch size. In general, increasing the
206+
number of partitions should help performance as the number of rows to return increases. Determining the optimal batch
207+
size depends both on the number of columns in each returned row and what kind of Spark operations are being invoked.
208+
The next section describes both how the connector tries to optimize performance when an aggregation is performed
209+
and when the same kind of optimization should be made when not many rows need to be returned.
196210

197211
### Optimizing for smaller result sets
198212

199213
If your Optic query matches a set of rows whose count is a small percentage of the total number of rows in
200-
the view that the query runs against, you may find improved performance by setting `spark.marklogic.read.batchSize`
201-
to zero. Doing so ensures that for each partition, a single request is sent to MarkLogic.
202-
203-
If the result set matching your query is particularly small - such as thousands of rows or less, or possibly tens of
204-
thousands of rows or less - you may find optimal performance by also setting `spark.marklogic.read.numPartitions` to
205-
one. This will result in the connector sending a single request to MarkLogic.
214+
the view that the query runs against, you should find improved performance by setting `spark.marklogic.read.batchSize`
215+
to zero. This setting ensures that for each partition, a single request is sent to MarkLogic.
216+
217+
If your Spark program includes an aggregation that the connector can push down to MarkLogic, then the connector will
218+
automatically use a batch size of zero unless you specify a different value for `spark.marklogic.read.batchSize`. This
219+
optimization should typically be desirable when calculating an aggregation, as MarkLogic will return far fewer rows
220+
per request depending on the type of aggregation.
221+
222+
If the result set matching your query is particularly small - such as tens of thousands of rows or less, or possibly
223+
hundreds of thousands of rows or less - you may find optimal performance by setting
224+
`spark.marklogic.read.numPartitions` to one. This will result in the connector sending a single request to MarkLogic.
225+
The effectiveness of this approach can be evaluated by executing the Optic query via
226+
[MarkLogic's qconsole application](https://docs.marklogic.com/guide/qconsole/intro), which will execute the query in
227+
a single request as well.
206228

207229
### More detail on partitions
208230

docs/writing.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,28 @@ title: Writing Data
44
nav_order: 4
55
---
66

7-
The MarkLogic Spark connector allows for writing rows in a Spark DataFrame to MarkLogic as documents. The sections below
8-
provide more detail about how this process works and how it can be controlled.
7+
The MarkLogic Spark connector allows for writing rows in a Spark DataFrame to MarkLogic as documents.
8+
The sections below provide more detail about how this process works and how it can be controlled.
9+
10+
## Basic write operation
11+
12+
As shown in the [Getting Started with PySpark guide](getting-started/pyspark.md), a basic write operation will define
13+
how the connector should connect to MarkLogic, the Spark mode to use, and zero or more other options:
14+
15+
```
16+
df.write.format("com.marklogic.spark") \
17+
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8020") \
18+
.option("spark.marklogic.write.collections", "write-test") \
19+
.option("spark.marklogic.write.permissions", "rest-reader,read,rest-writer,update") \
20+
.option("spark.marklogic.write.uriPrefix", "/write/") \
21+
.mode("append") \
22+
.save()
23+
```
24+
25+
In the above example, only `format`, `spark.marklogic.client.uri` (or the other `spark.marklogic.client` options
26+
that can be used to define the connection details), and `mode` (which must equal "append") are required;
27+
the collections, permissions , and URI prefix are optional, though it is uncommon to write documents without any
28+
permissions.
929

1030
## Controlling document content
1131

src/main/java/com/marklogic/spark/reader/MarkLogicMicroBatchStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
7171
int index = (int) ((LongOffset) end).offset();
7272
return index >= allBuckets.size() ?
7373
null :
74-
new InputPartition[]{new PlanAnalysis.Partition(index, allBuckets.get(index))};
74+
new InputPartition[]{new PlanAnalysis.Partition(index + "", allBuckets.get(index))};
7575
}
7676

7777
@Override

src/main/java/com/marklogic/spark/reader/PlanAnalysis.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,6 @@ class PlanAnalysis implements Serializable {
4242
this.partitions = partitions;
4343
}
4444

45-
/**
46-
* Copy constructor for creating a new plan analysis with the given plan and a single bucket. Used for pushing down
47-
* aggregate operations that can be efficiently calculated by MarkLogic in a single request.
48-
*
49-
* @param boundedPlan
50-
*/
51-
PlanAnalysis(JsonNode boundedPlan) {
52-
this.boundedPlan = boundedPlan;
53-
final String maxUnsignedLong = "18446744073709551615";
54-
this.partitions = Arrays.asList(new Partition(0, new Bucket("0", maxUnsignedLong)));
55-
}
56-
5745
List<Bucket> getAllBuckets() {
5846
List<PlanAnalysis.Bucket> allBuckets = new ArrayList<>();
5947
partitions.forEach(partition -> allBuckets.addAll(partition.buckets));
@@ -95,16 +83,24 @@ static class Partition implements InputPartition, Serializable {
9583
}
9684
}
9785

86+
Partition(String identifier, Bucket bucket) {
87+
this.identifier = identifier;
88+
this.buckets = bucket != null ? Arrays.asList(bucket) : new ArrayList<>();
89+
}
90+
9891
/**
99-
* For micro-batch reading, where each Spark task is intended to process a single bucket, and thus each
100-
* partition should contain a single bucket.
92+
* Similar to a copy constructor; used to construct a new Partition with a single bucket based on the
93+
* buckets in the given Partition.
10194
*
102-
* @param bucketIndex
103-
* @param bucket
95+
* @return
10496
*/
105-
Partition(int bucketIndex, Bucket bucket) {
106-
this.identifier = bucketIndex + "";
107-
this.buckets = Arrays.asList(bucket);
97+
Partition mergeBuckets() {
98+
if (buckets == null || buckets.isEmpty()) {
99+
return new Partition(identifier, null);
100+
}
101+
String lowerBound = buckets.get(0).lowerBound;
102+
String upperBound = buckets.get(buckets.size() - 1).upperBound;
103+
return new Partition(identifier, new Bucket(lowerBound, upperBound));
108104
}
109105

110106
@Override

src/main/java/com/marklogic/spark/reader/ReadContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ public class ReadContext extends ContextSupport {
6363
final static long serialVersionUID = 1;
6464

6565
private final static Logger logger = LoggerFactory.getLogger(ReadContext.class);
66-
private final static long DEFAULT_BATCH_SIZE = 10000;
66+
67+
// The ideal batch size depends highly on what a user chooses to do after a load() - and of course the user may
68+
// choose to perform multiple operations on the dataset, each of which may benefit from a fairly different batch
69+
// size. 100k has been chosen as the default batch size to strike a reasonable balance for operations that do need
70+
// to collect all the rows, such as writing the dataset to another data source.
71+
private final static long DEFAULT_BATCH_SIZE = 100000;
6772

6873
private PlanAnalysis planAnalysis;
6974
private StructType schema;
@@ -200,6 +205,15 @@ void pushDownAggregation(Aggregation aggregation) {
200205
}
201206
}
202207

208+
if (!getProperties().containsKey(Options.READ_BATCH_SIZE)) {
209+
logger.info("Batch size was not overridden, so modifying each partition to make a single request to improve " +
210+
"performance of pushed down aggregation.");
211+
List<PlanAnalysis.Partition> mergedPartitions = planAnalysis.partitions.stream()
212+
.map(p -> p.mergeBuckets())
213+
.collect(Collectors.toList());
214+
this.planAnalysis = new PlanAnalysis(planAnalysis.boundedPlan, mergedPartitions);
215+
}
216+
203217
this.schema = newSchema;
204218
}
205219

src/test/java/com/marklogic/spark/reader/AbstractPushDownTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ void setup() {
4242
protected DataFrameReader newDefaultReader(SparkSession session) {
4343
return super.newDefaultReader(session)
4444
// Default to a single call to MarkLogic for push down tests to ensure that assertions on row counts are
45-
// accurate. Any tests that care about having more than one partition are expected to override this.
46-
.option(Options.READ_NUM_PARTITIONS, 1)
47-
.option(Options.READ_BATCH_SIZE, 0);
45+
// accurate (and via DEVEXP-488, the batch size is expected to be set to zero when an aggregate is pushed
46+
// down). Any tests that care about having more than one partition are expected to override this.
47+
.option(Options.READ_NUM_PARTITIONS, 1);
4848
}
4949

5050
private synchronized void addToRowCount(long totalRowCount) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.marklogic.spark.reader;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
public class MergeBucketsTest {
8+
9+
@Test
10+
void threeBuckets() {
11+
PlanAnalysis.Partition p = new PlanAnalysis.Partition(1, 0, 1500, 3, 1500);
12+
13+
assertEquals(3, p.buckets.size());
14+
assertEquals("0", p.buckets.get(0).lowerBound);
15+
assertEquals("500", p.buckets.get(0).upperBound);
16+
assertEquals("501", p.buckets.get(1).lowerBound);
17+
assertEquals("1001", p.buckets.get(1).upperBound);
18+
assertEquals("1002", p.buckets.get(2).lowerBound);
19+
assertEquals("1500", p.buckets.get(2).upperBound);
20+
21+
PlanAnalysis.Partition p2 = p.mergeBuckets();
22+
23+
assertEquals(1, p2.buckets.size());
24+
assertEquals("0", p2.buckets.get(0).lowerBound);
25+
assertEquals("1500", p2.buckets.get(0).upperBound);
26+
}
27+
28+
@Test
29+
void oneBucket() {
30+
PlanAnalysis.Partition p = new PlanAnalysis.Partition(1, 0, 1000, 1, 1000);
31+
32+
assertEquals(1, p.buckets.size());
33+
assertEquals("0", p.buckets.get(0).lowerBound);
34+
assertEquals("1000", p.buckets.get(0).upperBound);
35+
36+
PlanAnalysis.Partition p2 = p.mergeBuckets();
37+
38+
assertEquals(1, p2.buckets.size());
39+
assertEquals("0", p2.buckets.get(0).lowerBound);
40+
assertEquals("1000", p2.buckets.get(0).upperBound);
41+
}
42+
}

src/test/java/com/marklogic/spark/reader/PushDownFilterTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,6 @@ void stringEndsWithNoMatch() {
250250
private Dataset<Row> newDataset() {
251251
return newDefaultReader()
252252
.option(Options.READ_OPTIC_QUERY, QUERY_WITH_NO_QUALIFIER)
253-
// Use a single call to MarkLogic so it's easier to verify from the logging
254-
// that only N rows were returned.
255-
.option(Options.READ_NUM_PARTITIONS, 1)
256-
.option(Options.READ_BATCH_SIZE, 0)
257253
.load();
258254
}
259255

src/test/java/com/marklogic/spark/reader/PushDownFilterValueTypesTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ private void verifyOneRowReturned(String filter) {
8888
private Dataset<Row> newDataset() {
8989
return newDefaultReader()
9090
.option(Options.READ_OPTIC_QUERY, "op.fromView('sparkTest', 'allTypes', '')")
91-
.option(Options.READ_NUM_PARTITIONS, 1)
92-
.option(Options.READ_BATCH_SIZE, 0)
9391
.load();
9492
}
9593

0 commit comments

Comments
 (0)