@@ -25,6 +25,7 @@ df = spark.read.format("com.marklogic.spark") \
25
25
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
26
26
.option("spark.marklogic.read.opticQuery", "op.fromView('example', 'employee')") \
27
27
.load()
28
+ df.show()
28
29
```
29
30
30
31
As demonstrated above, ` format ` , ` spark.marklogic.client.uri ` (or the other ` spark.marklogic.client ` options
@@ -45,6 +46,7 @@ df = spark.read.format("com.marklogic.spark") \
45
46
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
46
47
.option("spark.marklogic.read.opticQuery", query) \
47
48
.load()
49
+ df.show()
48
50
```
49
51
50
52
The ` where ` clause in the example above can include any of the query features supported by MarkLogic, such as
@@ -88,6 +90,7 @@ df = spark.read.format("com.marklogic.spark") \
88
90
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
89
91
.option("spark.marklogic.read.opticQuery", "op.fromView('example', 'employee')") \
90
92
.load()
93
+ df.show()
91
94
```
92
95
93
96
### Accessing documents
@@ -289,6 +292,7 @@ df = spark.read.format("com.marklogic.spark") \
289
292
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
290
293
.option("spark.marklogic.read.javascript", "cts.uris(null, null, cts.collectionQuery('employee'))") \
291
294
.load()
295
+ df.show()
292
296
```
293
297
294
298
Or code can be [ written in XQuery] ( https://docs.marklogic.com/guide/getting-started/XQueryTutorial ) by configuring the
@@ -299,6 +303,7 @@ df = spark.read.format("com.marklogic.spark") \
299
303
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
300
304
.option("spark.marklogic.read.xquery", "cts:uris((), (), cts:collection-query('employee'))") \
301
305
.load()
306
+ df.show()
302
307
```
303
308
304
309
You can also invoke a JavaScript or XQuery module in your application's modules database via the
@@ -309,6 +314,7 @@ df = spark.read.format("com.marklogic.spark") \
309
314
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
310
315
.option("spark.marklogic.read.invoke", "/read.sjs") \
311
316
.load()
317
+ df.show()
312
318
```
313
319
314
320
### Custom code schemas
@@ -330,6 +336,7 @@ df = spark.read.format("com.marklogic.spark") \
330
336
.option("spark.marklogic.read.invoke", "/read-custom-schema.sjs") \
331
337
.schema(StructType([StructField("id", IntegerType()), StructField("name", StringType())])) \
332
338
.load()
339
+ df.show()
333
340
```
334
341
335
342
### Custom external variables
@@ -348,46 +355,77 @@ df = spark.read.format("com.marklogic.spark") \
348
355
.option("spark.marklogic.read.vars.var2", "Marketing") \
349
356
.option("spark.marklogic.read.javascript", "var var1, var2; cts.uris(null, null, cts.wordQuery([var1, var2]))") \
350
357
.load()
358
+ df.show()
351
359
```
352
360
353
- ### Streaming support
361
+ ### Defining partitions for custom code
354
362
355
- Spark's support for [ streaming reads] ( https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html )
356
- from MarkLogic can be useful when your custom code for reading data may take a long time to execute. Or, based on the
357
- nature of your custom code, running the query incrementally to produce smaller batches may be a better fit for your
358
- use case.
363
+ By default, the connector will send a single request to MarkLogic to execute custom code for reading rows. If your
364
+ custom code returns a large amount of data and is at risk of timing out, or if you seek better performance by breaking
365
+ your query into many smaller queries, you can use one of the following options to define partitions for your custom code:
359
366
360
- (TODO This needs to be rewritten, will do so in a follow up PR.)
367
+ - ` spark.marklogic.read.partitions.invoke `
368
+ - ` spark.marklogic.read.partitions.javascript `
369
+ - ` spark.marklogic.read.partitions.xquery `
361
370
362
- To stream results from your custom code, the connector must know how batches can be constructed based on the results of
363
- your custom code. Because the connector does not know anything about your code, the connector needs to run an
364
- additional set of custom code that you implement to provide a sequence of partitions to the connector. The
365
- connector will then run your custom once for each of your partitions, with the partition being passed as
366
- an external variable to your custom code.
371
+ If one of the above options is defined, the connector will execute the code associated with the option and expect a
372
+ sequence of values to be returned. You can return any values you want to define partitions; the connector does not care
373
+ what the values represent. The connector will then execute your custom code - defined by ` spark.marklogic.read.invoke ` ,
374
+ ` spark.marklogic.read.javascript ` , or ` spark.marklogic.read.xquery ` - once for each partition value. The partition value
375
+ will be defined in an external variable named ` PARTITION ` . Note as well that any external variables you define via the
376
+ ` spark.marklogic.read.vars ` prefix will also be sent to the code for returning partitions.
367
377
368
- The code to run for providing a sequence of partitions must be defined via one of the following options :
378
+ The following example shows a common use case for using MarkLogic forest IDs as partitions :
369
379
370
- - ` spark.marklogic.read.partitions.invoke ` - a JavaScript or XQuery module path to invoke.
371
- - ` spark.marklogic.read.partitions.javascript ` - a JavaScript program to evaluate.
372
- - ` spark.marklogic.read.partitions.xquery ` - an XQuery program to evaluate.
380
+ ```
381
+ df = spark.read.format("com.marklogic.spark") \
382
+ .option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
383
+ .option("spark.marklogic.read.partitions.javascript", "xdmp.databaseForests(xdmp.database())") \
384
+ .option("spark.marklogic.read.javascript", "cts.uris(null, null, cts.collectionQuery('employee'), 0, [PARTITION])") \
385
+ .load()
386
+ df.show()
387
+ ```
373
388
374
- Note that any variables you define via the ` spark.marklogic.reads.vars ` prefix will also be sent to the above code,
375
- in addition to the code you define for reading rows.
389
+ In the example application used by this documentation the "spark-example-content" database has 3 forests. Thus, the
390
+ partitions code above will return a sequence of 3 forest IDs. The connector will then invoke the custom
391
+ JavaScript code 3 times, once for each forest ID, with the ` PARTITION ` variable populated with a forest ID.
376
392
377
- You are free to return any sequence of partitions. For each one, the connector will invoke your regular custom
378
- code with an external variable named ` PARTITION ` of type ` String ` . You are then free to use this value to return
379
- a set of results associated with the partition.
393
+ For the above scenario, it is common to run these queries
394
+ [ at the same point in time] ( https://docs.marklogic.com/guide/app-dev/point_in_time ) . Because you are free to return
395
+ any partition values you wish, one technique for this scenario would be to construct partitions containing both a
396
+ forest ID and a server timestamp:
380
397
381
- The following examples illustrates how the forest IDs for the ` spark-example-content ` database can be used as batch
382
- identifiers. The custom code for returning URIs is then constrained to the value of ` PARTITION ` which will be a forest
383
- ID. Spark will invoke the custom code once for each partition, with the returned batch of rows being immediately
384
- sent to the writer, which in this example are then printed to the console:
398
+ ```
399
+ const forestIds = xdmp.databaseForests(xdmp.database())
400
+ const timestamp = xdmp.requestTimestamp()
401
+ Sequence.from(forestIds.toArray().map(forestId => forestId + ":" + timestamp))
402
+ ```
403
+
404
+ In the custom code for returning rows, you can then obtain both a forest ID and a server timestamp from the partition
405
+ value and use them to ensure each of your queries runs at the same point in time.
406
+
407
+ ### Streaming support
408
+
409
+ Just like for reading rows with Optic, the connector supports
410
+ [ streaming reads] ( https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html )
411
+ from MarkLogic via micro-batches. The connector configuration does not change; instead, different Spark APIs are used
412
+ to read a stream of data from MarkLogic. This can be useful for when you wish to obtain a batch of results from
413
+ MarkLogic and immediately send them to a Spark writer.
414
+
415
+ When streaming results from your custom code, you will need to set one of the options described above - either
416
+ ` spark.marklogic.read.partitions.invoke ` , ` spark.marklogic.read.partitions.javascript ` , or
417
+ ` spark.marklogic.read.partitions.xquery ` - for defining partitions.
418
+
419
+ The following example shows how the same connector configuration can be used for defining partitions and the custom
420
+ code for returning rows, just with different Spark APIs. In this example, Spark will invoke the custom code once
421
+ for each partition, with the returned batch of rows being immediately streamed to the writer, which prints the
422
+ batch of rows to the console:
385
423
386
424
```
387
425
stream = spark.readStream \
388
426
.format("com.marklogic.spark") \
389
427
.option("spark.marklogic.client.uri", "spark-example-user:password@localhost:8003") \
390
- .option("spark.marklogic.read.partitions.javascript", "xdmp.databaseForests(xdmp.database('spark-example-content' ))") \
428
+ .option("spark.marklogic.read.partitions.javascript", "xdmp.databaseForests(xdmp.database())") \
391
429
.option("spark.marklogic.read.javascript", "cts.uris(null, null, cts.collectionQuery('employee'), null, [PARTITION]);") \
392
430
.load() \
393
431
.writeStream \
@@ -397,21 +435,6 @@ stream.processAllAvailable()
397
435
stream.stop()
398
436
```
399
437
400
- For a streaming use case, you may wish to ensure that every query runs
401
- [ at the same point in time] ( https://docs.marklogic.com/guide/app-dev/point_in_time ) . Because you are free to return
402
- any partitions you wish, one technique for accomplishing this would be to construct partitions
403
- containing both a forest ID and a server timestamp:
404
-
405
- ```
406
- const forestIds = xdmp.databaseForests(xdmp.database('spark-example-content'))
407
- const timestamp = xdmp.requestTimestamp()
408
- Sequence.from(forestIds.toArray().map(forestId => forestId + ":" + timestamp))
409
- ```
410
-
411
- In your custom code, you would then parse out the forest ID and server timestamp from each partition and use
412
- them accordingly in your queries. The MarkLogic documentation in the link above can provide more details and examples
413
- on how to perform point-in-time queries with server timestamps.
414
-
415
438
### Tuning performance
416
439
417
440
A key difference with reading via custom code is that unless you are using Spark streaming, a single call will be made
0 commit comments