Skip to content

Commit affa98f

Browse files
authored
Merge pull request #374 from marklogic/feature/optic-query-2
Can now execute non-fromView queries
2 parents 75006f2 + 389e78d commit affa98f

File tree

10 files changed

+258
-107
lines changed

10 files changed

+258
-107
lines changed

docs/reading-data/optic.md

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,32 @@ query expansion via [a thesaurus](https://docs.marklogic.com/guide/search-dev/th
5959

6060
## Optic query requirements
6161

62-
As of the 2.0.0 release of the connector, the Optic query must use the
63-
[op.fromView](https://docs.marklogic.com/op.fromView) accessor function. Future releases of both the connector and
64-
MarkLogic will strive to relax this requirement.
65-
66-
In addition, calls to `groupBy`, `orderBy`, `limit`, and `offset` should be performed via Spark instead of within
67-
the initial Optic query. A key benefit of Spark and the MarkLogic connector is the ability to execute the query in
68-
parallel via multiple Spark partitions. The aforementioned calls, if made in the Optic query, may not produce the
69-
expected results if more than one Spark partition is used or if more than one request is made to MarkLogic. The
70-
equivalent Spark operations should be called instead, or the connector should be configured to make a single request
71-
to MarkLogic. See the "Pushing down operations" and "Tuning performance" sections below for more information.
72-
73-
Finally, the query must adhere to the handful of limitations imposed by the
62+
**Starting with the 2.5.0 release**, an Optic query can use any
63+
[data access function](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_66011) with one caveat - only Optic
64+
queries that use `op.fromView` can be partitioned into multiple calls to MarkLogic. Optic queries that use any other
65+
data access function have the following constraints:
66+
67+
1. The connector will execute the query in a single call to MarkLogic. You will therefore need to ensure that the
68+
call can complete without timing out.
69+
2. The connector requires that the MarkLogic user have the necessary privileges to invoke the
70+
[MarkLogic eval endpoint](https://docs.marklogic.com/REST/POST/v1/eval) along with the `xdmp-invoke` privilege.
71+
72+
**Prior to the 2.5.0 release**, the Optic query must use the
73+
[op.fromView](https://docs.marklogic.com/op.fromView) accessor function. In addition, calls to `groupBy`, `orderBy`, `limit`, and `offset` should be
74+
performed via Spark instead of within the initial Optic query. As the connector will partition `op.fromView` queries
75+
into multiple calls to MarkLogic, the aforementioned calls will likely not produce the expected results when more
76+
than one request is made to MarkLogic. See the "Pushing down operations" and "Tuning performance" sections below for
77+
more information.
78+
79+
Finally, regardless of the Optic data access function you use, the query must adhere to the handful of limitations imposed by the
7480
[Optic Query DSL](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_46710). A good practice in validating a
7581
query is to run it in your [MarkLogic server's qconsole tool](https://docs.marklogic.com/guide/qconsole) in a buffer
7682
with a query type of "Optic DSL".
7783

7884
## Schema inference
7985

80-
The connector will infer a Spark schema automatically based on the view identified by `op.fromView` in
81-
the Optic query. Each column returned by your Optic query will be mapped to a Spark schema column with the
82-
same name and an appropriate type.
86+
The connector will infer a Spark schema automatically based your Optic query. Each column returned by your Optic query
87+
will be mapped to a Spark schema column with the same name and an appropriate type.
8388

8489
You may override this feature and provide your own schema instead. The example below shows how a custom schema can
8590
be provided within PySpark; this assumes that you have deployed the application in the
@@ -97,8 +102,9 @@ df.show()
97102

98103
## Accessing documents
99104

100-
While the connector requires that an Optic query use `op.fromView` as its accessor function, documents can still be
101-
retrieved via the [Optic functions for joining documents](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_78437).
105+
If your Optic query uses the `op.fromView` access function, documents can still be
106+
retrieved via the [Optic functions for joining documents](https://docs.marklogic.com/guide/app-dev/OpticAPI#id_78437). Starting with the 2.5.0 release, you can simply use
107+
`op.fromSearchDocs` instead, but only if your query can be executed in a single call to MarkLogic without timing out.
102108

103109
For example, the following query will find all matching rows and then retrieve the documents and URIs associated with
104110
those rows:
@@ -216,6 +222,10 @@ correct result, please [file an issue with this project](https://github.com/mark
216222

217223
## Tuning performance
218224

225+
If you are using the 2.5.0 connector or later along with an Optic query that does not use the `op.fromView` data
226+
access function, you can ignore this section. The performance of your query will be strictly based on the Optic query
227+
itself, which the connector does not impact.
228+
219229
The primary factor affecting connector performance when reading rows is how many requests are made to MarkLogic. In
220230
general, performance will be best when minimizing the number of requests to MarkLogic while ensuring that no single
221231
request attempts to return or process too much data.

marklogic-spark-connector/src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.marklogic.client.DatabaseClientFactory;
88
import com.marklogic.client.document.DocumentManager;
99
import com.marklogic.client.extra.okhttpclient.OkHttpClientConfigurator;
10-
import org.slf4j.Logger;
11-
import org.slf4j.LoggerFactory;
1210

1311
import java.io.Serializable;
1412
import java.util.HashMap;
@@ -19,7 +17,6 @@
1917

2018
public class ContextSupport extends Context implements Serializable {
2119

22-
protected static final Logger logger = LoggerFactory.getLogger(ContextSupport.class);
2320
private final boolean configuratorWasAdded;
2421

2522
// Java Client 6.5.0 has a bug in it (to be fixed in 6.5.1 or 6.6.0) where multiple threads that use a configurator

marklogic-spark-connector/src/main/java/com/marklogic/spark/reader/optic/OpticReadContext.java

Lines changed: 51 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@
44
package com.marklogic.spark.reader.optic;
55

66
import com.fasterxml.jackson.databind.JsonNode;
7-
import com.fasterxml.jackson.databind.node.ArrayNode;
8-
import com.fasterxml.jackson.databind.node.ObjectNode;
97
import com.marklogic.client.DatabaseClient;
108
import com.marklogic.client.FailedRequestException;
119
import com.marklogic.client.expression.PlanBuilder;
1210
import com.marklogic.client.impl.DatabaseClientImpl;
1311
import com.marklogic.client.io.JacksonHandle;
14-
import com.marklogic.client.io.StringHandle;
15-
import com.marklogic.client.row.RawQueryDSLPlan;
1612
import com.marklogic.client.row.RowManager;
1713
import com.marklogic.spark.ConnectorException;
1814
import com.marklogic.spark.ContextSupport;
@@ -25,8 +21,6 @@
2521
import org.apache.spark.sql.types.DataTypes;
2622
import org.apache.spark.sql.types.StructField;
2723
import org.apache.spark.sql.types.StructType;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3024

3125
import java.util.*;
3226
import java.util.stream.Collectors;
@@ -41,8 +35,6 @@ public class OpticReadContext extends ContextSupport {
4135

4236
static final long serialVersionUID = 1;
4337

44-
private static final Logger logger = LoggerFactory.getLogger(OpticReadContext.class);
45-
4638
// The ideal batch size depends highly on what a user chooses to do after a load() - and of course the user may
4739
// choose to perform multiple operations on the dataset, each of which may benefit from a fairly different batch
4840
// size. 100k has been chosen as the default batch size to strike a reasonable balance for operations that do need
@@ -51,43 +43,43 @@ public class OpticReadContext extends ContextSupport {
5143

5244
private PlanAnalysis planAnalysis;
5345
private StructType schema;
54-
private long serverTimestamp;
5546
private List<OpticFilter> opticFilters;
5647
private final long batchSize;
5748

5849
public OpticReadContext(Map<String, String> properties, StructType schema, int defaultMinPartitions) {
5950
super(properties);
60-
this.schema = schema;
61-
62-
final long partitionCount = getNumericOption(Options.READ_NUM_PARTITIONS, defaultMinPartitions, 1);
63-
this.batchSize = getNumericOption(Options.READ_BATCH_SIZE, DEFAULT_BATCH_SIZE, 0);
6451

6552
final String dslQuery = properties.get(Options.READ_OPTIC_QUERY);
6653
if (dslQuery == null || dslQuery.trim().length() < 1) {
6754
throw new ConnectorException(Util.getOptionNameForErrorMessage("spark.marklogic.read.noOpticQuery"));
6855
}
6956

70-
DatabaseClient client = connectToMarkLogic();
71-
RawQueryDSLPlan dslPlan = client.newRowManager().newRawQueryDSLPlan(new StringHandle(dslQuery));
72-
73-
try {
74-
this.planAnalysis = new PlanAnalyzer((DatabaseClientImpl) client).analyzePlan(
75-
dslPlan.getHandle(), partitionCount, batchSize
76-
);
77-
} catch (FailedRequestException ex) {
78-
handlePlanAnalysisError(dslQuery, ex);
79-
}
57+
this.schema = schema;
58+
this.batchSize = getNumericOption(Options.READ_BATCH_SIZE, DEFAULT_BATCH_SIZE, 0);
59+
this.planAnalysis = analyzePlan(dslQuery, getNumericOption(Options.READ_NUM_PARTITIONS, defaultMinPartitions, 1));
8060

8161
if (this.planAnalysis != null) {
8262
if (Util.MAIN_LOGGER.isInfoEnabled()) {
8363
Util.MAIN_LOGGER.info("Partition count: {}; number of requests that will be made to MarkLogic: {}",
8464
this.planAnalysis.getPartitions().size(), this.planAnalysis.getAllBuckets().size());
8565
}
86-
// Calling this to establish a server timestamp.
87-
StringHandle columnInfoHandle = client.newRowManager().columnInfo(dslPlan, new StringHandle());
88-
this.serverTimestamp = columnInfoHandle.getServerTimestamp();
89-
if (logger.isDebugEnabled()) {
90-
logger.debug("Will use server timestamp: {}", serverTimestamp);
66+
if (Util.MAIN_LOGGER.isDebugEnabled() && planAnalysis.getServerTimestamp() > 0) {
67+
Util.MAIN_LOGGER.debug("Will use server timestamp: {}", planAnalysis.getServerTimestamp());
68+
}
69+
}
70+
}
71+
72+
private PlanAnalysis analyzePlan(final String dslQuery, final long partitionCount) {
73+
DatabaseClient client = null;
74+
try {
75+
client = connectToMarkLogic();
76+
return new PlanAnalyzer((DatabaseClientImpl) client).analyzePlan(dslQuery, partitionCount, batchSize);
77+
} catch (FailedRequestException ex) {
78+
handlePlanAnalysisError(dslQuery, ex);
79+
return null;
80+
} finally {
81+
if (client != null) {
82+
client.release();
9183
}
9284
}
9385
}
@@ -102,33 +94,44 @@ private void handlePlanAnalysisError(String query, FailedRequestException ex) {
10294
}
10395

10496
Iterator<JsonNode> readRowsInBucket(RowManager rowManager, PlanAnalysis.Partition partition, PlanAnalysis.Bucket bucket) {
105-
if (logger.isDebugEnabled()) {
106-
logger.debug("Getting rows for partition {} and bucket {} at server timestamp {}", partition, bucket, serverTimestamp);
97+
final long serverTimestamp = planAnalysis.getServerTimestamp();
98+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
99+
if (serverTimestamp > 0) {
100+
Util.MAIN_LOGGER.debug("Getting rows for partition {} and bucket {} at server timestamp {}", partition, bucket, serverTimestamp);
101+
} else {
102+
Util.MAIN_LOGGER.debug("Getting rows for partition {} and bucket {}", partition, bucket);
103+
}
107104
}
108105

109106
// This should never occur, as a query should only ever occur when rows were initially found, which leads to a
110107
// server timestamp being captured. But if it were somehow to occur, we should error out as the row-ID-based
111108
// partitions are not reliable without a consistent server timestamp.
112-
if (serverTimestamp < 1) {
109+
if (serverTimestamp < 1 && !bucket.isSingleCallToMarkLogic()) {
113110
throw new ConnectorException(String.format("Unable to read rows; invalid server timestamp: %d", serverTimestamp));
114111
}
115112

116-
PlanBuilder.Plan plan = buildPlanForBucket(rowManager, bucket);
117-
JacksonHandle jsonHandle = new JacksonHandle();
118-
jsonHandle.setPointInTimeQueryTimestamp(serverTimestamp);
113+
final PlanBuilder.Plan plan = buildPlanForBucket(rowManager, bucket);
114+
final JacksonHandle jsonHandle = new JacksonHandle();
115+
if (!bucket.isSingleCallToMarkLogic()) {
116+
jsonHandle.setPointInTimeQueryTimestamp(serverTimestamp);
117+
}
118+
119119
// Remarkably, the use of resultDoc has consistently proven to be a few percentage points faster than using
120120
// resultRows with a StringHandle, even though the latter avoids the need for converting to and from a JsonNode.
121121
// The overhead with resultRows may be due to the processing of a multipart response; it's not yet clear.
122-
JsonNode result = rowManager.resultDoc(plan, jsonHandle).get();
122+
final JsonNode result = rowManager.resultDoc(plan, jsonHandle).get();
123123
return result != null && result.has("rows") ?
124124
result.get("rows").iterator() :
125125
new ArrayList<JsonNode>().iterator();
126126
}
127127

128128
private PlanBuilder.Plan buildPlanForBucket(RowManager rowManager, PlanAnalysis.Bucket bucket) {
129-
PlanBuilder.Plan plan = rowManager.newRawPlanDefinition(new JacksonHandle(planAnalysis.getBoundedPlan()))
130-
.bindParam("ML_LOWER_BOUND", bucket.lowerBound)
131-
.bindParam("ML_UPPER_BOUND", bucket.upperBound);
129+
PlanBuilder.Plan plan = rowManager.newRawPlanDefinition(new JacksonHandle(planAnalysis.getSerializedPlan()));
130+
131+
if (!bucket.isSingleCallToMarkLogic()) {
132+
plan = plan.bindParam("ML_LOWER_BOUND", bucket.lowerBound)
133+
.bindParam("ML_UPPER_BOUND", bucket.upperBound);
134+
}
132135

133136
if (opticFilters != null) {
134137
for (OpticFilter opticFilter : opticFilters) {
@@ -143,15 +146,15 @@ void pushDownFiltersIntoOpticQuery(List<OpticFilter> opticFilters) {
143146
this.opticFilters = opticFilters;
144147
// Add each filter in a separate "where" so we don't toss an op.sqlCondition into an op.and,
145148
// which Optic does not allow.
146-
opticFilters.forEach(filter -> addOperatorToPlan(PlanUtil.buildWhere(filter)));
149+
opticFilters.forEach(filter -> planAnalysis.pushOperatorIntoPlan(PlanUtil.buildWhere(filter)));
147150
}
148151

149152
void pushDownLimit(int limit) {
150-
addOperatorToPlan(PlanUtil.buildLimit(limit));
153+
planAnalysis.pushOperatorIntoPlan(PlanUtil.buildLimit(limit));
151154
}
152155

153156
void pushDownTopN(SortOrder[] orders, int limit) {
154-
addOperatorToPlan(PlanUtil.buildOrderBy(orders));
157+
planAnalysis.pushOperatorIntoPlan(PlanUtil.buildOrderBy(orders));
155158
pushDownLimit(limit);
156159
}
157160

@@ -160,10 +163,10 @@ void pushDownAggregation(Aggregation aggregation) {
160163
.map(PlanUtil::expressionToColumnName)
161164
.collect(Collectors.toList());
162165

163-
if (logger.isDebugEnabled()) {
164-
logger.debug("groupBy column names: {}", groupByColumnNames);
166+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
167+
Util.MAIN_LOGGER.debug("groupBy column names: {}", groupByColumnNames);
165168
}
166-
addOperatorToPlan(PlanUtil.buildGroupByAggregation(new HashSet<>(groupByColumnNames), aggregation));
169+
planAnalysis.pushOperatorIntoPlan(PlanUtil.buildGroupByAggregation(new HashSet<>(groupByColumnNames), aggregation));
167170

168171
StructType newSchema = buildSchemaWithColumnNames(groupByColumnNames);
169172

@@ -186,10 +189,8 @@ void pushDownAggregation(Aggregation aggregation) {
186189
Sum sum = (Sum) func;
187190
StructField field = findColumnInSchema(sum.column(), PlanUtil.expressionToColumnName(sum.column()));
188191
newSchema = newSchema.add(func.toString(), field.dataType());
189-
} else {
190-
if (logger.isDebugEnabled()) {
191-
logger.debug("Unsupported aggregate function: {}", func);
192-
}
192+
} else if (Util.MAIN_LOGGER.isDebugEnabled()) {
193+
Util.MAIN_LOGGER.debug("Unsupported aggregate function: {}", func);
193194
}
194195
}
195196

@@ -199,7 +200,7 @@ void pushDownAggregation(Aggregation aggregation) {
199200
List<PlanAnalysis.Partition> mergedPartitions = planAnalysis.getPartitions().stream()
200201
.map(p -> p.mergeBuckets())
201202
.collect(Collectors.toList());
202-
this.planAnalysis = new PlanAnalysis(planAnalysis.getBoundedPlan(), mergedPartitions);
203+
this.planAnalysis = new PlanAnalysis(planAnalysis.getSerializedPlan(), mergedPartitions, planAnalysis.getServerTimestamp());
203204
}
204205

205206
if (Util.MAIN_LOGGER.isDebugEnabled()) {
@@ -237,7 +238,7 @@ private StructField findColumnInSchema(Expression expression, String columnName)
237238

238239
void pushDownRequiredSchema(StructType requiredSchema) {
239240
this.schema = requiredSchema;
240-
addOperatorToPlan(PlanUtil.buildSelect(requiredSchema));
241+
planAnalysis.pushOperatorIntoPlan(PlanUtil.buildSelect(requiredSchema));
241242
}
242243

243244
boolean planAnalysisFoundNoRows() {
@@ -246,21 +247,6 @@ boolean planAnalysisFoundNoRows() {
246247
return planAnalysis == null;
247248
}
248249

249-
/**
250-
* The internal/viewinfo endpoint is known to add an op:prepare operator at the end of the list of operator
251-
* args. Each operator added by the connector based on pushdowns needs to be added before this op:prepare
252-
* operator; otherwise, MarkLogic will throw an error.
253-
*
254-
* @param operator
255-
*/
256-
private void addOperatorToPlan(ObjectNode operator) {
257-
if (logger.isDebugEnabled()) {
258-
logger.debug("Adding operator to plan: {}", operator);
259-
}
260-
ArrayNode operators = (ArrayNode) planAnalysis.getBoundedPlan().get("$optic").get("args");
261-
operators.insert(operators.size() - 1, operator);
262-
}
263-
264250
StructType getSchema() {
265251
return schema;
266252
}

0 commit comments

Comments
 (0)