Skip to content

Commit c02f5fb

Browse files
authored
Merge pull request #171 from marklogic/feature/12259-filtered-query
MLE-12259 Supporting filtered queries
2 parents d765ba2 + b152dcc commit c02f5fb

File tree

6 files changed

+105
-1
lines changed

6 files changed

+105
-1
lines changed

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ The following options control how the connector reads document rows from MarkLog
149149
| spark.marklogic.read.documents.categories | Controls which metadata is returned for each document. Defaults to `content`. Allowable values are `content`, `metadata`, `collections`, `permissions`, `quality`, `properties`, and `metadatavalues`. |
150150
| spark.marklogic.read.documents.collections | Comma-delimited string of zero to many collections to constrain the query. |
151151
| spark.marklogic.read.documents.directory | Database directory - e.g. "/company/employees/" - to constrain the query. |
152+
| spark.marklogic.read.documents.filtered | Set to true for [filtered searches](https://docs.marklogic.com/guide/performance/unfiltered). Defaults to `false` as unfiltered searches are significantly faster and will produce accurate results when your application indexes are sufficient for your query. |
152153
| spark.marklogic.read.documents.options | Name of a set of [MarkLogic search options](https://docs.marklogic.com/guide/search-dev/query-options) to be applied against a string query. |
153154
| spark.marklogic.read.documents.partitionsPerForest | Number of Spark partition readers to create per forest; defaults to 4. |
154155
| spark.marklogic.read.documents.transform | Name of a [MarkLogic REST transform](https://docs.marklogic.com/guide/rest-dev/transforms) to apply to each matching document. |

docs/reading-data/documents.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,23 @@ doc = json.loads(df2.head()['content'])
205205
doc['Department']
206206
```
207207

208+
## Filtered searches
209+
210+
The connector defaults to [unfiltered searches in MarkLogic](https://docs.marklogic.com/guide/performance/unfiltered).
211+
Assuming you have sufficient indexes configured for your query, an unfiltered search will return correct results with
212+
optimal performance.
213+
214+
However, as noted in the above linked documentation, a query may need to be "filtered" to ensure that the returned
215+
results are accurate. If your query and index configuration meet this need, you can use the following option to
216+
request a filtered search:
217+
218+
.option("spark.marklogic.read.documents.filtered", "true")
219+
220+
Filtered searches are generally slower, and you should be careful with this setting for larger result sets. However,
221+
the cost of a filtered search may be outweighed by the connector having to return far fewer results. In that scenario,
222+
a filtered search will both return accurate results and may be faster. Ideally though, you can configure indexes on your
223+
database to allow for an unfiltered search, which will return accurate results and be faster than a filtered search.
224+
208225
## Tuning performance
209226

210227
The connector mimics the behavior of the [MarkLogic Data Movement SDK](https://docs.marklogic.com/guide/java/data-movement)

src/main/java/com/marklogic/spark/Options.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public abstract class Options {
4545
public static final String READ_DOCUMENTS_CATEGORIES = "spark.marklogic.read.documents.categories";
4646
public static final String READ_DOCUMENTS_COLLECTIONS = "spark.marklogic.read.documents.collections";
4747
public static final String READ_DOCUMENTS_DIRECTORY = "spark.marklogic.read.documents.directory";
48+
public static final String READ_DOCUMENTS_FILTERED = "spark.marklogic.read.documents.filtered";
4849
public static final String READ_DOCUMENTS_OPTIONS = "spark.marklogic.read.documents.options";
4950
public static final String READ_DOCUMENTS_PARTITIONS_PER_FOREST = "spark.marklogic.read.documents.partitionsPerForest";
5051
// Corresponds to "q" at https://docs.marklogic.com/REST/POST/v1/search, known as a "string query".

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.marklogic.client.query.QueryDefinition;
1212
import com.marklogic.client.query.SearchQueryDefinition;
1313
import com.marklogic.client.query.StructuredQueryBuilder;
14+
import com.marklogic.spark.Options;
1415
import org.apache.spark.sql.catalyst.InternalRow;
1516
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1617
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
@@ -66,7 +67,11 @@ class ForestReader implements PartitionReader<InternalRow> {
6667
}
6768

6869
SearchQueryDefinition query = context.buildSearchQuery(client);
69-
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), false);
70+
boolean filtered = false;
71+
if (context.hasOption(Options.READ_DOCUMENTS_FILTERED)) {
72+
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_DOCUMENTS_FILTERED));
73+
}
74+
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered);
7075

7176
this.documentManager = client.newDocumentManager();
7277
this.documentManager.setReadTransform(query.getResponseTransform());
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.marklogic.spark.reader.document;
2+
3+
import com.marklogic.spark.AbstractIntegrationTest;
4+
import com.marklogic.spark.Options;
5+
import org.apache.spark.sql.DataFrameReader;
6+
import org.apache.spark.sql.Dataset;
7+
import org.apache.spark.sql.Row;
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
12+
/**
13+
* As touched in the documentation for this feature, filtering can in some scenarios significantly improve performance
14+
* by not retrieving a large number of false positives. Generally, as the percentage of false positives increases,
15+
* the benefit from filtering will increase by causing the connector to retrieve fewer documents. Overall though,
16+
* we would still recommend to a customer to configure their indexes so that they can use an unfiltered query that is
17+
* both fast and accurate.
18+
*/
19+
class ReadFilteredDocumentRowsTest extends AbstractIntegrationTest {
20+
21+
private static final String FALSE_POSITIVE_QUERY = "<json-property-word-query xmlns='http://marklogic.com/cts'>" +
22+
"<property>ForeName</property>" +
23+
"<text xml:lang='en'>Wool*</text>" +
24+
"</json-property-word-query>";
25+
26+
private static final String CORRECT_WILDCARD_QUERY = "<json-property-word-query xmlns='http://marklogic.com/cts'>" +
27+
"<property>LastName</property>" +
28+
"<text xml:lang='en'>Wool*</text>" +
29+
"</json-property-word-query>";
30+
31+
@Test
32+
void falsePositive() {
33+
DataFrameReader reader = newSparkSession().read()
34+
.format(CONNECTOR_IDENTIFIER)
35+
.option(Options.CLIENT_URI, makeClientUri())
36+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
37+
.option(Options.READ_DOCUMENTS_QUERY, FALSE_POSITIVE_QUERY);
38+
39+
Dataset<Row> dataset = reader.load();
40+
41+
assertEquals(1, dataset.count(), "The database has trailing-wildcard-searches enabled, which allows for " +
42+
"'Wool*' to work. But since the search is unfiltered, we get a false positive as 'Wooles' appears in " +
43+
"the LastName property, not the ForeName property.");
44+
45+
dataset = reader.option(Options.READ_DOCUMENTS_FILTERED, "true").load();
46+
47+
assertEquals(0, dataset.count(), "Now that the search is filtered, the false positive will be omitted.");
48+
}
49+
50+
@Test
51+
void correctWildcardQuery() {
52+
DataFrameReader reader = newSparkSession().read()
53+
.format(CONNECTOR_IDENTIFIER)
54+
.option(Options.CLIENT_URI, makeClientUri())
55+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
56+
.option(Options.READ_DOCUMENTS_QUERY, CORRECT_WILDCARD_QUERY);
57+
58+
Dataset<Row> dataset = reader.load();
59+
assertEquals(1, dataset.count());
60+
61+
dataset = reader.option(Options.READ_DOCUMENTS_FILTERED, "true").load();
62+
assertEquals(1, dataset.count(), "This test just verifies that a valid wildcard query works correctly on " +
63+
"our test database.");
64+
}
65+
66+
@Test
67+
void invalidValue() {
68+
Dataset<Row> dataset = newSparkSession().read()
69+
.format(CONNECTOR_IDENTIFIER)
70+
.option(Options.CLIENT_URI, makeClientUri())
71+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
72+
.option(Options.READ_DOCUMENTS_QUERY, FALSE_POSITIVE_QUERY)
73+
.option(Options.READ_DOCUMENTS_FILTERED, "not-valid")
74+
.load();
75+
76+
assertEquals(1, dataset.count(), "Boolean.parseBoolean interprets a non-true/false value as false, so we " +
77+
"expect the query to be unfiltered and thus we get back a count of 1 due to the false positive.");
78+
}
79+
}

src/test/ml-config/databases/content-database.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"database-name": "%%DATABASE%%",
33
"schema-database": "%%SCHEMAS_DATABASE%%",
4+
"trailing-wildcard-searches": true,
45
"range-element-index": [
56
{
67
"collation": "",

0 commit comments

Comments
 (0)