Skip to content

Commit bd312b8

Browse files
authored
Merge pull request #60 from marklogic/feature/test-fixes
Fix for not pushing down when count of rows is zero
2 parents 66747ba + ea116f6 commit bd312b8

File tree

11 files changed

+122
-44
lines changed

11 files changed

+122
-44
lines changed

src/main/java/com/marklogic/spark/reader/MarkLogicScanBuilder.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ public Scan build() {
6666
*/
6767
@Override
6868
public Filter[] pushFilters(Filter[] filters) {
69+
pushedFilters = new ArrayList<>();
70+
if (readContext.planAnalysisFoundNoRows()) {
71+
return filters;
72+
}
73+
6974
List<Filter> unsupportedFilters = new ArrayList<>();
7075
List<OpticFilter> opticFilters = new ArrayList<>();
71-
pushedFilters = new ArrayList<>();
7276
if (logger.isDebugEnabled()) {
7377
logger.debug("Filter count: {}", filters.length);
7478
}
@@ -102,6 +106,9 @@ public Filter[] pushedFilters() {
102106

103107
@Override
104108
public boolean pushLimit(int limit) {
109+
if (readContext.planAnalysisFoundNoRows()) {
110+
return false;
111+
}
105112
if (logger.isDebugEnabled()) {
106113
logger.debug("Pushing down limit: {}", limit);
107114
}
@@ -111,6 +118,9 @@ public boolean pushLimit(int limit) {
111118

112119
@Override
113120
public boolean pushTopN(SortOrder[] orders, int limit) {
121+
if (readContext.planAnalysisFoundNoRows()) {
122+
return false;
123+
}
114124
// This will be invoked when the user calls both orderBy and limit in their Spark program. If the user only
115125
// calls limit, then only pushLimit is called and this will not be called. If the user only calls orderBy and
116126
// not limit, then neither this nor pushLimit will be called.
@@ -131,6 +141,9 @@ public boolean isPartiallyPushed() {
131141

132142
@Override
133143
public boolean pushOffset(int offset) {
144+
if (readContext.planAnalysisFoundNoRows()) {
145+
return false;
146+
}
134147
if (logger.isDebugEnabled()) {
135148
logger.debug("Pushing down offset: {}", offset);
136149
}
@@ -140,6 +153,9 @@ public boolean pushOffset(int offset) {
140153

141154
@Override
142155
public boolean pushAggregation(Aggregation aggregation) {
156+
if (readContext.planAnalysisFoundNoRows()) {
157+
return false;
158+
}
143159
if (supportCompletePushDown(aggregation)) {
144160
if (aggregation.groupByExpressions().length > 0) {
145161
Expression expr = aggregation.groupByExpressions()[0];
@@ -160,6 +176,9 @@ public boolean pushAggregation(Aggregation aggregation) {
160176

161177
@Override
162178
public boolean supportCompletePushDown(Aggregation aggregation) {
179+
if (readContext.planAnalysisFoundNoRows()) {
180+
return false;
181+
}
163182
AggregateFunc[] expressions = aggregation.aggregateExpressions();
164183
if (expressions.length == 1 && expressions[0] instanceof CountStar) {
165184
// If a count() is used, it's supported if there's no groupBy - i.e. just doing a count() by itself -
@@ -171,6 +190,9 @@ public boolean supportCompletePushDown(Aggregation aggregation) {
171190

172191
@Override
173192
public void pruneColumns(StructType requiredSchema) {
193+
if (readContext.planAnalysisFoundNoRows()) {
194+
return;
195+
}
174196
if (requiredSchema.equals(readContext.getSchema())) {
175197
if (logger.isDebugEnabled()) {
176198
logger.debug("The schema to push down is equal to the existing schema, so not pushing it down.");

src/main/java/com/marklogic/spark/reader/ReadContext.java

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -144,60 +144,48 @@ private PlanBuilder.Plan buildPlanForBucket(RowManager rowManager, PlanAnalysis.
144144
}
145145

146146
void pushDownFiltersIntoOpticQuery(List<OpticFilter> opticFilters) {
147-
if (planAnalysisFoundAtLeastOneRow()) {
148-
this.opticFilters = opticFilters;
149-
addOperatorToPlan(PlanUtil.buildWhere(opticFilters));
150-
}
147+
this.opticFilters = opticFilters;
148+
addOperatorToPlan(PlanUtil.buildWhere(opticFilters));
151149
}
152150

153151
void pushDownLimit(int limit) {
154-
if (planAnalysisFoundAtLeastOneRow()) {
155-
addOperatorToPlan(PlanUtil.buildLimit(limit));
156-
}
152+
addOperatorToPlan(PlanUtil.buildLimit(limit));
157153
}
158154

159155
void pushDownOffset(int offset) {
160-
if (planAnalysisFoundAtLeastOneRow()) {
161-
addOperatorToPlan(PlanUtil.buildOffset(offset));
162-
}
156+
addOperatorToPlan(PlanUtil.buildOffset(offset));
163157
}
164158

165159
void pushDownTopN(SortOrder[] orders, int limit) {
166-
if (planAnalysisFoundAtLeastOneRow()) {
167-
for (SortOrder sortOrder : orders) {
168-
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));
169-
}
170-
pushDownLimit(limit);
160+
for (SortOrder sortOrder : orders) {
161+
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));
171162
}
163+
pushDownLimit(limit);
172164
}
173165

174166
void pushDownCount() {
175-
if (planAnalysisFoundAtLeastOneRow()) {
176-
addOperatorToPlan(PlanUtil.buildGroupByCount());
177-
// As will likely be the case for all aggregations, the schema needs to be modified.
178-
this.schema = new StructType().add("count", DataTypes.LongType);
179-
modifyPlanAnalysisToUseSingleBucket();
180-
}
167+
addOperatorToPlan(PlanUtil.buildGroupByCount());
168+
// As will likely be the case for all aggregations, the schema needs to be modified.
169+
this.schema = new StructType().add("count", DataTypes.LongType);
170+
modifyPlanAnalysisToUseSingleBucket();
181171
}
182172

183173
void pushDownGroupByCount(Expression groupBy) {
184-
if (planAnalysisFoundAtLeastOneRow()) {
185-
final String columnName = PlanUtil.expressionToColumnName(groupBy);
186-
addOperatorToPlan(PlanUtil.buildGroupByCount(columnName));
187-
188-
StructField columnField = null;
189-
for (StructField field : this.schema.fields()) {
190-
if (columnName.equals(field.name())) {
191-
columnField = field;
192-
break;
193-
}
194-
}
195-
if (columnField == null) {
196-
throw new IllegalArgumentException("Unable to find groupBy column in schema; groupBy expression: " + groupBy.describe());
174+
final String columnName = PlanUtil.expressionToColumnName(groupBy);
175+
addOperatorToPlan(PlanUtil.buildGroupByCount(columnName));
176+
177+
StructField columnField = null;
178+
for (StructField field : this.schema.fields()) {
179+
if (columnName.equals(field.name())) {
180+
columnField = field;
181+
break;
197182
}
198-
this.schema = new StructType().add(columnField).add("count", DataTypes.LongType);
199-
modifyPlanAnalysisToUseSingleBucket();
200183
}
184+
if (columnField == null) {
185+
throw new IllegalArgumentException("Unable to find groupBy column in schema; groupBy expression: " + groupBy.describe());
186+
}
187+
this.schema = new StructType().add(columnField).add("count", DataTypes.LongType);
188+
modifyPlanAnalysisToUseSingleBucket();
201189
}
202190

203191
/**
@@ -213,16 +201,14 @@ private void modifyPlanAnalysisToUseSingleBucket() {
213201
}
214202

215203
void pushDownRequiredSchema(StructType requiredSchema) {
216-
if (planAnalysisFoundAtLeastOneRow()) {
217-
this.schema = requiredSchema;
218-
addOperatorToPlan(PlanUtil.buildSelect(requiredSchema));
219-
}
204+
this.schema = requiredSchema;
205+
addOperatorToPlan(PlanUtil.buildSelect(requiredSchema));
220206
}
221207

222-
private boolean planAnalysisFoundAtLeastOneRow() {
208+
boolean planAnalysisFoundNoRows() {
223209
// The planAnalysis will be null if no rows were found, which internal/viewinfo unfortunately throws an error
224210
// on. None of the push down operations need to be applied in this scenario.
225-
return planAnalysis != null;
211+
return planAnalysis == null;
226212
}
227213

228214
/**

src/test/java/com/marklogic/spark/AbstractIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class AbstractIntegrationTest extends AbstractSpringMarkLogicTest {
3838
protected final static String TEST_USERNAME = "spark-test-user";
3939
protected final static String TEST_PASSWORD = "spark";
4040
protected final static String CONNECTOR_IDENTIFIER = "com.marklogic.spark";
41+
protected final static String NO_AUTHORS_QUERY = "op.fromView('Medical', 'NoAuthors', '')";
4142

4243
private static MarkLogicVersion markLogicVersion;
4344

src/test/java/com/marklogic/spark/reader/PushDownCountTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,16 @@ void count() {
3838
"that regardless of the number of matching rows, MarkLogic can efficiently determine a count in a single " +
3939
"request.");
4040
}
41+
42+
@Test
43+
void noRowsFound() {
44+
long count = newDefaultReader()
45+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
46+
.load()
47+
.count();
48+
49+
assertEquals(0, count);
50+
assertEquals(0, countOfRowsReadFromMarkLogic, "When no rows exist, neither the count() operation nor the " +
51+
"pruneColumns() operation should be pushed down since there's no optimization to be done.");
52+
}
4153
}

src/test/java/com/marklogic/spark/reader/PushDownFilterTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ void equalToWithViewQualifier() {
5858
assertEquals(4, countOfRowsReadFromMarkLogic);
5959
}
6060

61+
@Test
62+
void noRowsFound() {
63+
assertEquals(0, newDefaultReader()
64+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
65+
.load()
66+
.filter("CitationID == 1")
67+
.collectAsList()
68+
.size());
69+
assertEquals(0, countOfRowsReadFromMarkLogic);
70+
}
71+
6172
@Test
6273
void equalToWithWhere() {
6374
assertEquals(2, getCountOfRowsWithFilter("CitationID = 5"));

src/test/java/com/marklogic/spark/reader/PushDownGroupByCountTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ void groupByWithNoQualifier() {
3939
assertEquals(1l, (long) rows.get(0).getAs("CitationID"));
4040
}
4141

42+
@Test
43+
void noRowsFound() {
44+
List<Row> rows = newDefaultReader()
45+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
46+
.load()
47+
.groupBy("CitationID")
48+
.count()
49+
.orderBy("CitationID")
50+
.collectAsList();
51+
52+
assertEquals(0, rows.size());
53+
assertEquals(0, countOfRowsReadFromMarkLogic);
54+
}
55+
4256
@Test
4357
void groupByWithView() {
4458
List<Row> rows = newDefaultReader()

src/test/java/com/marklogic/spark/reader/PushDownOffsetTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.marklogic.spark.reader;
1717

18+
import com.marklogic.spark.Options;
1819
import org.apache.spark.sql.Row;
1920
import org.junit.jupiter.api.Test;
2021

@@ -36,6 +37,18 @@ void offset() {
3637
assertEquals(7, countOfRowsReadFromMarkLogic);
3738
}
3839

40+
@Test
41+
void noRowsFound() {
42+
List<Row> rows = newDefaultReader()
43+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
44+
.load()
45+
.offset(1)
46+
.collectAsList();
47+
48+
assertEquals(0, rows.size());
49+
assertEquals(0, countOfRowsReadFromMarkLogic);
50+
}
51+
3952
@Test
4053
void limitBeforeOffset() {
4154
List<Row> rows = newDatasetOrderedByCitationIDWithOneBucket()

src/test/java/com/marklogic/spark/reader/PushDownRequiredColumnsTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ void withNoQualifier() {
4848
assertEquals("Humbee", rows.get(0).getAs("LastName"));
4949
}
5050

51+
@Test
52+
void noRowsFound() {
53+
List<Row> rows = newDefaultReader()
54+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
55+
.load()
56+
.select("CitationID")
57+
.collectAsList();
58+
59+
assertEquals(0, rows.size());
60+
assertEquals(0, countOfRowsReadFromMarkLogic);
61+
}
62+
5163
@Test
5264
void withSchemaAndViewQualifiers() {
5365
List<Row> rows = newDefaultReader()

src/test/java/com/marklogic/spark/reader/ReadRowsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void emptyQualifier() {
7171
@Test
7272
void queryReturnsZeroRows() {
7373
List<Row> rows = newDefaultReader()
74-
.option(Options.READ_OPTIC_DSL, "op.fromView('Medical', 'NoAuthors')")
74+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
7575
.load()
7676
.collectAsList();
7777

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525

2626
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2728
import static org.junit.jupiter.api.Assertions.assertThrows;
2829
import static org.junit.jupiter.api.Assertions.assertTrue;
2930

@@ -191,6 +192,7 @@ void dontAbortOnFailure() {
191192
}
192193

193194
private void verifyFailureIsDueToLackOfPermission(SparkException ex) {
195+
assertNotNull(ex.getCause(), "Unexpected exception with no cause: " + ex.getClass() + "; " + ex.getMessage());
194196
assertTrue(ex.getCause() instanceof IOException, "Unexpected cause: " + ex.getCause().getClass());
195197
assertTrue(ex.getCause().getMessage().contains("Server Message: You do not have permission to this method and URL"),
196198
"Unexpected cause message: " + ex.getCause().getMessage());

0 commit comments

Comments
 (0)