Skip to content

Commit 206cffc

Browse files
committed
Fixing filters so they work for qualified column names
1 parent 7921c80 commit 206cffc

File tree

6 files changed

+91
-19
lines changed

6 files changed

+91
-19
lines changed

docs/reading.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ down the following operations to MarkLogic:
7171
- `count`
7272
- `drop` and `select`
7373
- `filter` and `where`
74-
- `orderBy`
74+
- `groupBy` when followed by `count`
7575
- `limit`
7676
- `offset`
77+
- `orderBy`
7778

7879
For each of the above operations, the Optic pipeline associated with the user's Optic DSL query is modified to include
7980
the associated Optic function. Note that if multiple partitions are used to perform the `read` operation, each

src/main/java/com/marklogic/spark/reader/PlanUtil.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* Methods for modifying a serialized Optic plan. These were moved here both to facilitate unit testing for some of them
2121
* and to simplify {@code ReadContext}.
2222
*/
23-
abstract class PlanUtil {
23+
public abstract class PlanUtil {
2424

2525
private final static Logger logger = LoggerFactory.getLogger(PlanUtil.class);
2626

@@ -73,9 +73,20 @@ static ObjectNode buildSelect(StructType schema) {
7373
});
7474
}
7575

76-
private static void populateSchemaCol(ObjectNode node, String columnName) {
76+
/**
77+
* Knows how to handle any of 3 variations of a column name: 1) no qualifier - "CitationID"; 2) view qualifier -
78+
* "myView.CitationID"; and 3) schema+view qualifier - "mySchema.myView.CitationID".
79+
* <p>
80+
* This should always be used whenever a push down operation involves constructing a column, as we need to handle
81+
* all 3 of the variations above. And op.schemaCol is required in order to achieve that.
82+
*
83+
* @param node
84+
* @param columnName
85+
*/
86+
public static void populateSchemaCol(ObjectNode node, String columnName) {
87+
final String[] parts = removeTickMarksFromColumnName(columnName).split("\\.");
88+
7789
ArrayNode colArgs = node.put("ns", "op").put("fn", "schema-col").putArray("args");
78-
String[] parts = columnName.split("\\.");
7990
if (parts.length == 3) {
8091
colArgs.add(parts[0]).add(parts[1]).add(parts[2]);
8192
} else if (parts.length == 2) {
@@ -85,6 +96,22 @@ private static void populateSchemaCol(ObjectNode node, String columnName) {
8596
}
8697
}
8798

99+
/**
100+
* For some push down operations, the tick marks that a user must use when a column name contains a period will
101+
* still be present.
102+
*
103+
* @param columnName
104+
* @return
105+
*/
106+
private static String removeTickMarksFromColumnName(String columnName) {
107+
if (columnName.startsWith("`")) {
108+
columnName = columnName.substring(1);
109+
}
110+
return columnName.endsWith("`") ?
111+
columnName.substring(0, columnName.length() - 1) :
112+
columnName;
113+
}
114+
88115
static ObjectNode buildWhere(List<OpticFilter> opticFilters) {
89116
return newOperation("where", args -> {
90117
// If there's only one filter, can toss it into the "where" clause. Else, toss an "and" into the "where" and

src/main/java/com/marklogic/spark/reader/filter/IsNotNullFilter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.node.ObjectNode;
44
import com.marklogic.client.expression.PlanBuilder;
5+
import com.marklogic.spark.reader.PlanUtil;
56
import org.apache.spark.sql.sources.IsNotNull;
67

78
class IsNotNullFilter implements OpticFilter {
@@ -16,11 +17,11 @@ class IsNotNullFilter implements OpticFilter {
1617

1718
@Override
1819
public void populateArg(ObjectNode arg) {
19-
arg
20+
ObjectNode colArg = arg
2021
.put("ns", "op").put("fn", "is-defined")
21-
.putArray("args").addObject()
22-
.put("ns", "op").put("fn", "col")
23-
.putArray("args").add(filter.attribute());
22+
.putArray("args").addObject();
23+
24+
PlanUtil.populateSchemaCol(colArg, filter.attribute());
2425
}
2526

2627
@Override

src/main/java/com/marklogic/spark/reader/filter/IsNullFilter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.node.ObjectNode;
44
import com.marklogic.client.expression.PlanBuilder;
5+
import com.marklogic.spark.reader.PlanUtil;
56
import org.apache.spark.sql.sources.IsNull;
67

78
class IsNullFilter implements OpticFilter {
@@ -16,13 +17,13 @@ class IsNullFilter implements OpticFilter {
1617

1718
@Override
1819
public void populateArg(ObjectNode arg) {
19-
arg
20+
ObjectNode colArg = arg
2021
.put("ns", "op").put("fn", "not")
2122
.putArray("args").addObject()
2223
.put("ns", "op").put("fn", "is-defined")
23-
.putArray("args").addObject()
24-
.put("ns", "op").put("fn", "col")
25-
.putArray("args").add(filter.attribute());
24+
.putArray("args").addObject();
25+
26+
PlanUtil.populateSchemaCol(colArg, filter.attribute());
2627
}
2728

2829
@Override

src/main/java/com/marklogic/spark/reader/filter/SingleValueFilter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.node.ArrayNode;
44
import com.fasterxml.jackson.databind.node.ObjectNode;
55
import com.marklogic.client.expression.PlanBuilder;
6+
import com.marklogic.spark.reader.PlanUtil;
67

78
import java.util.UUID;
89

@@ -29,14 +30,11 @@ class SingleValueFilter implements OpticFilter {
2930
public void populateArg(ObjectNode arg) {
3031
arg.put("ns", "op");
3132
arg.put("fn", this.functionName);
32-
ArrayNode equalArgs = arg.putArray("args");
33+
ArrayNode functionArgs = arg.putArray("args");
3334

34-
ObjectNode equalArg = equalArgs.addObject();
35-
equalArg.put("ns", "op");
36-
equalArg.put("fn", "col");
37-
equalArg.putArray("args").add(this.columnName);
35+
PlanUtil.populateSchemaCol(functionArgs.addObject(), this.columnName);
3836

39-
ObjectNode paramArg = equalArgs.addObject();
37+
ObjectNode paramArg = functionArgs.addObject();
4038
paramArg.put("ns", "op");
4139
paramArg.put("fn", "param");
4240
paramArg.putArray("args").add(this.paramName);

src/test/java/com/marklogic/spark/reader/PushDownFilterTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,27 @@ void equalToWithFilter() {
2222
assertEquals(4, countOfRowsReadFromMarkLogic);
2323
}
2424

25+
@Test
26+
void equalToWithSchemaAndViewQualifier() {
27+
assertEquals(4, newDefaultReader()
28+
.load()
29+
.filter("`Medical.Authors.CitationID` == 1")
30+
.collectAsList()
31+
.size(), "Verifying that a filter with a fully-qualified column name still works correctly.");
32+
assertEquals(4, countOfRowsReadFromMarkLogic);
33+
}
34+
35+
@Test
36+
void equalToWithViewQualifier() {
37+
assertEquals(4, newDefaultReader()
38+
.option(Options.READ_OPTIC_DSL, "op.fromView('Medical', 'Authors', 'myView')")
39+
.load()
40+
.filter("`myView.CitationID` == 1")
41+
.collectAsList()
42+
.size(), "Verifying that a filter with a view-qualified column name still works correctly.");
43+
assertEquals(4, countOfRowsReadFromMarkLogic);
44+
}
45+
2546
@Test
2647
void equalToWithWhere() {
2748
assertEquals(2, getCountOfRowsWithFilter("CitationID = 5"));
@@ -124,17 +145,40 @@ void inWithNoMatches() {
124145
@Test
125146
void isNotNull() {
126147
assertEquals(2, newDataset().filter(new Column("BooleanValue").isNotNull()).collectAsList().size());
148+
assertEquals(2, countOfRowsReadFromMarkLogic);
149+
}
150+
151+
@Test
152+
void isNotNullQualified() {
153+
assertEquals(2, newDefaultReader()
154+
.load()
155+
.filter(new Column("`Medical.Authors.BooleanValue`").isNotNull())
156+
.collectAsList()
157+
.size());
158+
127159
assertEquals(2, countOfRowsReadFromMarkLogic,
128160
"2 of the authors are expected to have a BooleanValue column.");
129161
}
130162

131163
@Test
132164
void isNull() {
133-
assertEquals(13, newDataset().filter(new Column("BooleanValue").isNull()).collectAsList().size());
165+
assertEquals(13, newDataset()
166+
.filter(new Column("BooleanValue").isNull())
167+
.collectAsList()
168+
.size());
134169
assertEquals(13, countOfRowsReadFromMarkLogic,
135170
"13 of the authors are expected to have a null BooleanValue column.");
136171
}
137172

173+
@Test
174+
void isNullQualified() {
175+
assertEquals(13, newDefaultReader()
176+
.load()
177+
.filter(new Column("`Medical.Authors.BooleanValue`").isNull())
178+
.collectAsList().size());
179+
assertEquals(13, countOfRowsReadFromMarkLogic);
180+
}
181+
138182
@Test
139183
void stringContains() {
140184
List<Row> rows = newDataset().filter(new Column("LastName").contains("umbe")).collectAsList();

0 commit comments

Comments
 (0)