Skip to content

Commit 3d90acd

Browse files
authored
Merge pull request #162 from BillFarber/develop
MLE-12296 Added an example of nested aggregation.
2 parents 5353099 + b065d30 commit 3d90acd

File tree

3 files changed

+139
-0
lines changed

3 files changed

+139
-0
lines changed

examples/entity-aggregation/README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,65 @@ Note as well that this example is not intended to be authoritative. Please see
115115
on writing Spark programs. You may also find
116116
[this reference on Spark aggregate functions](https://sparkbyexamples.com/spark/spark-sql-aggregate-functions/) helpful.
117117

118+
## Importing customers, rentals and payments from Postgres to MarkLogic
119+
120+
This project also contains an example of nested joins being aggregated. In this example, the query includes a join with
121+
rentals (as above), but with the addition that payments are joined with rentals, producing a nested join in the query.
122+
Then Spark aggregation functions are used to perform nested aggregations, resulting in customer documents with JSON
123+
similar to the following snippet.
124+
125+
```
126+
{
127+
"customer_id": 182,
128+
"last_name": "Lane",
129+
"Rentals": [
130+
{
131+
"rental_id": 1542,
132+
"payments": [
133+
{
134+
"payment_id": 19199,
135+
"amount": 3.99
136+
}
137+
]
138+
},
139+
...
140+
{
141+
"rental_id": 4591,
142+
"payments": [
143+
{
144+
"payment_id": 19518,
145+
"amount": 1.99
146+
},
147+
{
148+
"payment_id": 25162,
149+
"amount": 1.99
150+
},
151+
{
152+
"payment_id": 29163,
153+
"amount": 0.99
154+
},
155+
{
156+
"payment_id": 31069,
157+
"amount": 3.99
158+
},
159+
{
160+
"payment_id": 31834,
161+
"amount": 3.99
162+
}
163+
]
164+
},
165+
...
166+
}
167+
```
168+
169+
To try this out, there is another Gradle command similar to the first:
170+
171+
./gradlew importCustomersWithRentalsAndPayments
172+
173+
You can then use [MarkLogic's qconsole application](https://docs.marklogic.com/guide/qconsole/intro) to view the
174+
customer documents written to the Documents database. In this example, 10 customer rows are queried from the Postgres
175+
"customers" table, so you will see 10 customer documents in the Documents database.
176+
177+
The JSON snippet above is from the document for customer 182. One of that customer's rentals, 4591, has multiple
178+
payments. If you examine the document for that customer, (/customerWithDoubleNesting/182.json) you will be able to
179+
verify the nested aggregation.

examples/entity-aggregation/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ task importCustomers(type: JavaExec) {
1616
classpath = sourceSets.main.runtimeClasspath
1717
mainClass = 'org.example.ImportCustomers'
1818
}
19+
20+
task importCustomersWithRentalsAndPayments(type: JavaExec) {
21+
classpath = sourceSets.main.runtimeClasspath
22+
mainClass = 'org.example.ImportCustomersWithRentalsAndPayments'
23+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.example;
2+
3+
import org.apache.commons.lang3.StringUtils;
4+
import org.apache.spark.sql.SaveMode;
5+
import org.apache.spark.sql.SparkSession;
6+
import org.apache.spark.sql.functions;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Properties;
11+
12+
public class ImportCustomersWithRentalsAndPayments {
13+
14+
public static void main(String[] args) {
15+
// The MarkLogic admin password is assumed to be "admin" per the docker-compose.yml file. This is purely for
16+
// demonstrational purposes and should never be used in a real application.
17+
final String markLogicAdminPassword = "admin";
18+
19+
// Create a vanilla local Spark session.
20+
SparkSession session = SparkSession.builder()
21+
.master("local[*]")
22+
.getOrCreate();
23+
24+
Map<String, String> jdbcOptions = new HashMap<String, String>() {{
25+
put("driver", "org.postgresql.Driver");
26+
put("url", "jdbc:postgresql://localhost/dvdrental");
27+
put("user", "postgres");
28+
put("password", "postgres");
29+
}};
30+
31+
String query =
32+
"select c.customer_id, c.last_name, r.rental_id, r.rental_date, p.payment_id, p.amount " +
33+
"from customer c " +
34+
"inner join rental r on c.customer_id = r.customer_id " +
35+
"inner join payment p on r.rental_id = p.rental_id " +
36+
"where (c.customer_id >= 180 and c.customer_id < 190) ";
37+
38+
session
39+
.read()
40+
// Use Spark's built-in JDBC support to read rows from Postgres.
41+
.format("jdbc").options(jdbcOptions)
42+
.option("query", query)
43+
.load()
44+
45+
.groupBy("rental_id")
46+
.agg(
47+
functions.first("customer_id").alias("customer_id"),
48+
functions.first("last_name").alias("last_name"),
49+
functions.collect_list(functions.struct("payment_id","amount")).alias("payments")
50+
)
51+
.groupBy("customer_id")
52+
.agg(
53+
functions.first("last_name").alias("last_name"),
54+
functions.collect_list(functions.struct("rental_id","payments")).alias("Rentals")
55+
)
56+
57+
58+
// The remaining calls use the MarkLogic Spark connector to write customer rows, with nested rentals and
59+
// sub-nested payments, to the Documents database in MarkLogic.
60+
.write()
61+
.format("com.marklogic.spark")
62+
.option("spark.marklogic.client.host", "localhost")
63+
.option("spark.marklogic.client.port", "8000")
64+
.option("spark.marklogic.client.username", "admin")
65+
.option("spark.marklogic.client.password", markLogicAdminPassword)
66+
.option("spark.marklogic.write.uriTemplate", "/customerWithDoubleNesting/{customer_id}.json")
67+
.option("spark.marklogic.write.collections", "CustomerRentalPayments")
68+
.option("spark.marklogic.write.permissions", "rest-reader,read,rest-writer,update")
69+
.mode(SaveMode.Append)
70+
.save();
71+
}
72+
}

0 commit comments

Comments
 (0)