Skip to content

Commit c1bf751

Browse files
authored
Merge pull request #44 from marklogic/feature/joinDoc-test
Added docs and a test for how to join in a document
2 parents 37763a9 + 32b00fc commit c1bf751

File tree

4 files changed

+64
-1
lines changed

4 files changed

+64
-1
lines changed

docs/reading.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,30 @@ df = spark.read.format("com.marklogic.spark") \
3737
.load()
3838
```
3939

40+
## Accessing documents
41+
42+
While the connector requires that an Optic query use `op.fromView` as its accessor function, documents can still be
43+
retrieved via the [Optic functions for joining documents](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_78437).
44+
45+
For example, the following query will find all matching rows and then retrieve the documents and URIs associated with
46+
those rows:
47+
48+
```
49+
query = "const joinCol = op.fragmentIdCol('id'); \
50+
op.fromView('example', 'employee', '', joinCol) \
51+
.joinDoc('doc', joinCol) \
52+
.select('doc')"
53+
54+
df = spark.read.format("com.marklogic.spark") \
55+
.option("spark.marklogic.client.uri", "pyspark-example-user:password@localhost:8020") \
56+
.option("spark.marklogic.read.opticDsl", query) \
57+
.load()
58+
```
59+
60+
Calling `df.show()` will then show the URI and JSON contents of the document associated with each row. The Python
61+
[from_json](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html)
62+
function can then be used to parse the contents of each `doc` column into a JSON object as needed.
63+
4064
## Pushing down operations
4165

4266
The Spark connector framework supports pushing down multiple operations to the connector data source. This can

docs/writing.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ above options for setting a prefix and suffix will be ignored, as the template c
5151
For example, consider a Spark DataFrame with, among other columns, columns named `organization` and `employee_id`.
5252
The following template would construct URIs based on both columns:
5353

54-
.options("spark.marklogic.write.uriTemplate", "/example/{organization}/{employee_id}.json")
54+
.option("spark.marklogic.write.uriTemplate", "/example/{organization}/{employee_id}.json")
5555

5656
Both columns should have values in each row in the DataFrame. If the connector encounters a row that does not have a
5757
value for any column in the URI template, an error will be thrown.

src/main/java/com/marklogic/spark/reader/SchemaInferrer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public abstract class SchemaInferrer {
4343
put("point", DataTypes.StringType);
4444
put("boolean", DataTypes.BooleanType);
4545
put("none", DataTypes.StringType); // See DBQ-296, this is intentional for some column types.
46+
put ("value", DataTypes.StringType); // In MarkLogic 10, "value" is returned for a column containing a JSON object.
4647
put("integer", DataTypes.IntegerType);
4748
put("unsignedInt", DataTypes.IntegerType);
4849
put("iri", DataTypes.StringType);
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.marklogic.spark.reader;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.marklogic.spark.AbstractIntegrationTest;
6+
import com.marklogic.spark.Options;
7+
import org.apache.spark.sql.Row;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.List;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
14+
public class ReadWithJoinDocTest extends AbstractIntegrationTest {
15+
16+
@Test
17+
void jsonDocuments() throws Exception {
18+
List<Row> rows = newDefaultReader()
19+
.option(Options.READ_OPTIC_DSL,
20+
"const idCol = op.fragmentIdCol('id'); " +
21+
"op.fromView('sparkTest', 'allTypes', '', idCol)" +
22+
".where(op.sqlCondition('intValue = 1'))" +
23+
".joinDoc('doc', idCol)" +
24+
".select('doc')")
25+
.option(Options.READ_NUM_PARTITIONS, 1)
26+
.option(Options.READ_BATCH_SIZE, 0)
27+
.load()
28+
.collectAsList();
29+
30+
assertEquals(1, rows.size());
31+
32+
Row row = rows.get(0);
33+
JsonNode doc = new ObjectMapper().readTree(row.getString(0));
34+
assertEquals(1, doc.get("allTypes").get(0).get("intValue").asInt(),
35+
"Verifying that the doc was correctly returned as a string in the Spark row, and could then be read via " +
36+
"Jackson into a JsonNode");
37+
}
38+
}

0 commit comments

Comments
 (0)