Skip to content

Commit 5c041b6

Browse files
authored
Merge pull request #107 from marklogic/feature/custom-code-partitions
DEVEXP-627 Can now read via user-defined partitions
2 parents 17e0f83 + d218ec9 commit 5c041b6

15 files changed

+184
-102
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ gradle-local.properties
1515
logs
1616
.ipynb_checkpoints
1717
venv
18+
.venv

docs/configuration.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ The following options control how the connector reads rows from MarkLogic via cu
111111
| spark.marklogic.read.vars. | Prefix for user-defined variables to be sent to the custom code. |
112112

113113
If you are using Spark's streaming support with custom code, the following options can also be used to control how
114-
batch identifiers are defined:
114+
partitions are defined:
115115

116116
| Option | Description |
117117
| --- | --- |
118-
| spark.marklogic.read.batchIds.invoke | The path to a module to invoke; the module must be in your application's modules database. |
119-
| spark.marklogic.read.batchIds.javascript | JavaScript code to execute. |
120-
| spark.marklogic.read.batchIds.xquery | XQuery code to execute. |
118+
| spark.marklogic.read.partitions.invoke | The path to a module to invoke; the module must be in your application's modules database. |
119+
| spark.marklogic.read.partitions.javascript | JavaScript code to execute. |
120+
| spark.marklogic.read.partitions.xquery | XQuery code to execute. |
121121

122122
## Write options
123123

docs/reading.md

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -357,36 +357,38 @@ from MarkLogic can be useful when your custom code for reading data may take a l
357357
nature of your custom code, running the query incrementally to produce smaller batches may be a better fit for your
358358
use case.
359359

360+
(TODO This needs to be rewritten, will do so in a follow up PR.)
361+
360362
To stream results from your custom code, the connector must know how batches can be constructed based on the results of
361363
your custom code. Because the connector does not know anything about your code, the connector needs to run an
362-
additional set of custom code that you implement to provide a sequence of "batch identifiers" to the connector. The
363-
connector will then run your custom once for each of your batch identifiers, with the batch identifier being passed as
364+
additional set of custom code that you implement to provide a sequence of partitions to the connector. The
365+
connector will then run your custom once for each of your partitions, with the partition being passed as
364366
an external variable to your custom code.
365367

366-
The code to run for providing a sequence of batch identifiers must be defined via one of the following options:
368+
The code to run for providing a sequence of partitions must be defined via one of the following options:
367369

368-
- `spark.marklogic.read.batchIds.invoke` - a JavaScript or XQuery module path to invoke.
369-
- `spark.marklogic.read.batchIds.javascript` - a JavaScript program to evaluate.
370-
- `spark.marklogic.read.batchIds.xquery` - an XQuery program to evaluate.
370+
- `spark.marklogic.read.partitions.invoke` - a JavaScript or XQuery module path to invoke.
371+
- `spark.marklogic.read.partitions.javascript` - a JavaScript program to evaluate.
372+
- `spark.marklogic.read.partitions.xquery` - an XQuery program to evaluate.
371373

372374
Note that any variables you define via the `spark.marklogic.reads.vars` prefix will also be sent to the above code,
373375
in addition to the code you define for reading rows.
374376

375-
You are free to return any sequence of batch identifiers. For each one, the connector will invoke your regular custom
376-
code with an external variable named `BATCH_ID` of type `String`. You are then free to use this value to return
377-
a set of results associated with the batch.
377+
You are free to return any sequence of partitions. For each one, the connector will invoke your regular custom
378+
code with an external variable named `PARTITION` of type `String`. You are then free to use this value to return
379+
a set of results associated with the partition.
378380

379381
The following examples illustrates how the forest IDs for the `spark-example-content` database can be used as batch
380-
identifiers. The custom code for returning URIs is then constrained to the value of `BATCH_ID` which will be a forest
381-
ID. Spark will invoke the custom code once for each batch identifier, with the returned batch of rows being immediately
382+
identifiers. The custom code for returning URIs is then constrained to the value of `PARTITION` which will be a forest
383+
ID. Spark will invoke the custom code once for each partition, with the returned batch of rows being immediately
382384
sent to the writer, which in this example are then printed to the console:
383385

384386
```
385387
stream = spark.readStream \
386388
.format("com.marklogic.spark") \
387389
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
388-
.option("spark.marklogic.read.batchIds.javascript", "xdmp.databaseForests(xdmp.database('spark-example-content'))") \
389-
.option("spark.marklogic.read.javascript", "cts.uris(null, null, cts.collectionQuery('employee'), null, [BATCH_ID]);") \
390+
.option("spark.marklogic.read.partitions.javascript", "xdmp.databaseForests(xdmp.database('spark-example-content'))") \
391+
.option("spark.marklogic.read.javascript", "cts.uris(null, null, cts.collectionQuery('employee'), null, [PARTITION]);") \
390392
.load() \
391393
.writeStream \
392394
.format("console") \
@@ -396,8 +398,8 @@ stream.stop()
396398
```
397399

398400
For a streaming use case, you may wish to ensure that every query runs
399-
[at the same point in time](https://docs.marklogic.com/guide/app-dev/point_in_time). Because you are free to construct
400-
any batch identifiers you wish, one technique for accomplishing this would be to construct batch identifiers
401+
[at the same point in time](https://docs.marklogic.com/guide/app-dev/point_in_time). Because you are free to return
402+
any partitions you wish, one technique for accomplishing this would be to construct partitions
401403
containing both a forest ID and a server timestamp:
402404

403405
```
@@ -406,7 +408,7 @@ const timestamp = xdmp.requestTimestamp()
406408
Sequence.from(forestIds.toArray().map(forestId => forestId + ":" + timestamp))
407409
```
408410

409-
In your custom code, you would then parse out the forest ID and server timestamp from each batch identifier and use
411+
In your custom code, you would then parse out the forest ID and server timestamp from each partition and use
410412
them accordingly in your queries. The MarkLogic documentation in the link above can provide more details and examples
411413
on how to perform point-in-time queries with server timestamps.
412414

docs/writing.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ import tempfile
329329
stream = spark.readStream \
330330
.format("com.marklogic.spark") \
331331
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
332-
.option("spark.marklogic.read.batchIds.javascript", "xdmp.databaseForests(xdmp.database('spark-example-content'))") \
333-
.option("spark.marklogic.read.javascript", "cts.uris(null, ['limit=10'], cts.collectionQuery('employee'), null, [BATCH_ID]);") \
332+
.option("spark.marklogic.read.partitions.javascript", "xdmp.databaseForests(xdmp.database('spark-example-content'))") \
333+
.option("spark.marklogic.read.javascript", "cts.uris(null, ['limit=10'], cts.collectionQuery('employee'), null, [PARTITION]);") \
334334
.load() \
335335
.writeStream \
336336
.format("com.marklogic.spark") \

src/main/java/com/marklogic/spark/Options.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public interface Options {
2525
String READ_XQUERY = "spark.marklogic.read.xquery";
2626
String READ_VARS_PREFIX = "spark.marklogic.read.vars.";
2727

28-
String READ_BATCH_IDS_INVOKE = "spark.marklogic.read.batchIds.invoke";
29-
String READ_BATCH_IDS_JAVASCRIPT = "spark.marklogic.read.batchIds.javascript";
30-
String READ_BATCH_IDS_XQUERY = "spark.marklogic.read.batchIds.xquery";
28+
String READ_PARTITIONS_INVOKE = "spark.marklogic.read.partitions.invoke";
29+
String READ_PARTITIONS_JAVASCRIPT = "spark.marklogic.read.partitions.javascript";
30+
String READ_PARTITIONS_XQUERY = "spark.marklogic.read.partitions.xquery";
3131

3232
String READ_OPTIC_QUERY = "spark.marklogic.read.opticQuery";
3333
String READ_NUM_PARTITIONS = "spark.marklogic.read.numPartitions";

src/main/java/com/marklogic/spark/reader/CustomCodeBatch.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,30 @@
55
import org.apache.spark.sql.connector.read.InputPartition;
66
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
77

8+
import java.util.List;
9+
810
class CustomCodeBatch implements Batch {
911

1012
private CustomCodeContext customCodeContext;
13+
private List<String> partitions;
1114

12-
public CustomCodeBatch(CustomCodeContext customCodeContext) {
15+
public CustomCodeBatch(CustomCodeContext customCodeContext, List<String> partitions) {
1316
this.customCodeContext = customCodeContext;
17+
this.partitions = partitions;
1418
}
1519

1620
@Override
1721
public InputPartition[] planInputPartitions() {
18-
// We don't yet support partitioning a user's custom code. In the future, we may support this by passing along
19-
// e.g. host and/or forest names, though the burden would then be on the user to utilize those correctly in
20-
// their custom code.
21-
return new InputPartition[]{new CustomCodePartition()};
22+
InputPartition[] inputPartitions;
23+
if (partitions != null && partitions.size() > 1) {
24+
inputPartitions = new InputPartition[partitions.size()];
25+
for (int i = 0; i < partitions.size(); i++) {
26+
inputPartitions[i] = new CustomCodePartition(partitions.get(i));
27+
}
28+
} else {
29+
inputPartitions = new InputPartition[]{new CustomCodePartition()};
30+
}
31+
return inputPartitions;
2232
}
2333

2434
@Override

src/main/java/com/marklogic/spark/reader/CustomCodeMicroBatchStream.java

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.marklogic.spark.reader;
22

3-
import com.marklogic.client.DatabaseClient;
43
import com.marklogic.spark.CustomCodeContext;
5-
import com.marklogic.spark.Options;
64
import org.apache.spark.sql.connector.read.InputPartition;
75
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
86
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
@@ -11,63 +9,46 @@
119
import org.slf4j.Logger;
1210
import org.slf4j.LoggerFactory;
1311

14-
import java.util.ArrayList;
1512
import java.util.List;
1613

1714
class CustomCodeMicroBatchStream implements MicroBatchStream {
1815

1916
private final static Logger logger = LoggerFactory.getLogger(CustomCodeMicroBatchStream.class);
2017

2118
private final CustomCodeContext customCodeContext;
22-
private long batchIndex = 0;
23-
private final List<String> batchIds = new ArrayList<>();
19+
private final List<String> partitions;
20+
private long partitionIndex = 0;
2421

25-
/**
26-
* Invokes the user-defined option for retrieving batch IDs. The list of batch IDs is stored so that it can be
27-
* iterated over via the methods in MicroBatchStream.
28-
*
29-
* @param customCodeContext
30-
*/
31-
CustomCodeMicroBatchStream(CustomCodeContext customCodeContext) {
22+
CustomCodeMicroBatchStream(CustomCodeContext customCodeContext, List<String> partitions) {
3223
this.customCodeContext = customCodeContext;
33-
DatabaseClient client = this.customCodeContext.connectToMarkLogic();
34-
try {
35-
this.customCodeContext
36-
.buildCall(client, new CustomCodeContext.CallOptions(
37-
Options.READ_BATCH_IDS_INVOKE, Options.READ_BATCH_IDS_JAVASCRIPT, Options.READ_BATCH_IDS_XQUERY
38-
))
39-
.eval()
40-
.forEach(result -> batchIds.add(result.getString()));
41-
} finally {
42-
client.release();
43-
}
24+
this.partitions = partitions;
4425
}
4526

4627
/**
4728
* Invoked by Spark to get the next offset for which it should construct a reader; an offset for this class is
48-
* equivalent to a batch ID.
29+
* equivalent to a user-defined partition.
4930
*
5031
* @return
5132
*/
5233
@Override
5334
public Offset latestOffset() {
54-
Offset result = batchIndex >= batchIds.size() ? null : new LongOffset(batchIndex);
35+
Offset result = partitionIndex >= partitions.size() ? null : new LongOffset(partitionIndex);
5536
if (logger.isTraceEnabled()) {
56-
logger.trace("Returning latest offset: {}", batchIndex);
37+
logger.trace("Returning latest offset: {}", partitionIndex);
5738
}
58-
batchIndex++;
39+
partitionIndex++;
5940
return result;
6041
}
6142

6243
/**
6344
* @param start
6445
* @param end
65-
* @return a partition associated with the latest batch ID, which is captured by the "end" offset.
46+
* @return a partition associated with the latest partition, which is captured by the "end" offset.
6647
*/
6748
@Override
6849
public InputPartition[] planInputPartitions(Offset start, Offset end) {
6950
long index = ((LongOffset) end).offset();
70-
return new InputPartition[]{new CustomCodePartition(batchIds.get((int) index))};
51+
return new InputPartition[]{new CustomCodePartition(partitions.get((int) index))};
7152
}
7253

7354
@Override

src/main/java/com/marklogic/spark/reader/CustomCodePartition.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,16 @@ class CustomCodePartition implements InputPartition, Serializable {
88

99
final static long serialVersionUID = 1;
1010

11-
private String batchId;
11+
private String partition;
1212

13-
/**
14-
* Constructor for normal reading, where all rows will be returned in a single call to MarkLogic by a single reader.
15-
*/
1613
public CustomCodePartition() {
1714
}
1815

19-
/**
20-
* Constructor used for streaming reads, when a call is made to the reader (and thus to MarkLogic) for the given
21-
* batch ID.
22-
*
23-
* @param batchId
24-
*/
25-
public CustomCodePartition(String batchId) {
26-
this.batchId = batchId;
16+
public CustomCodePartition(String partition) {
17+
this.partition = partition;
2718
}
2819

29-
public String getBatchId() {
30-
return batchId;
20+
public String getPartition() {
21+
return partition;
3122
}
3223
}

src/main/java/com/marklogic/spark/reader/CustomCodePartitionReader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@ class CustomCodePartitionReader implements PartitionReader {
1919
private final JsonRowDeserializer jsonRowDeserializer;
2020
private final DatabaseClient databaseClient;
2121

22-
public CustomCodePartitionReader(CustomCodeContext customCodeContext, String batchId) {
22+
public CustomCodePartitionReader(CustomCodeContext customCodeContext, String partition) {
2323
this.databaseClient = customCodeContext.connectToMarkLogic();
2424
this.serverEvaluationCall = customCodeContext.buildCall(
2525
this.databaseClient,
2626
new CustomCodeContext.CallOptions(Options.READ_INVOKE, Options.READ_JAVASCRIPT, Options.READ_XQUERY)
2727
);
2828

29-
// For streaming support.
30-
if (batchId != null && batchId.trim().length() > 0) {
31-
this.serverEvaluationCall.addVariable("BATCH_ID", batchId);
29+
if (partition != null) {
30+
this.serverEvaluationCall.addVariable("PARTITION", partition);
3231
}
3332

3433
this.isCustomSchema = customCodeContext.isCustomSchema();

src/main/java/com/marklogic/spark/reader/CustomCodePartitionReaderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ public CustomCodePartitionReaderFactory(CustomCodeContext customCodeContext) {
1616

1717
@Override
1818
public PartitionReader<InternalRow> createReader(InputPartition partition) {
19-
return new CustomCodePartitionReader(customCodeContext, ((CustomCodePartition) partition).getBatchId());
19+
return new CustomCodePartitionReader(customCodeContext, ((CustomCodePartition) partition).getPartition());
2020
}
2121
}

0 commit comments

Comments
 (0)