Skip to content

Commit ea116f6

Browse files
committed
Fix for not pushing down when count of rows is zero
Need to do this in the ScanBuilder so we can return "false" to Spark to let it know that we did not push down the operation. Also added an assertion to WriteRowsTest to help debug what seems like an intermittent failure on Jenkins.
1 parent 31d0ea4 commit ea116f6

File tree

11 files changed

+122
-44
lines changed

11 files changed

+122
-44
lines changed

src/main/java/com/marklogic/spark/reader/MarkLogicScanBuilder.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ public Scan build() {
6666
*/
6767
@Override
6868
public Filter[] pushFilters(Filter[] filters) {
69+
pushedFilters = new ArrayList<>();
70+
if (readContext.planAnalysisFoundNoRows()) {
71+
return filters;
72+
}
73+
6974
List<Filter> unsupportedFilters = new ArrayList<>();
7075
List<OpticFilter> opticFilters = new ArrayList<>();
71-
pushedFilters = new ArrayList<>();
7276
if (logger.isDebugEnabled()) {
7377
logger.debug("Filter count: {}", filters.length);
7478
}
@@ -102,6 +106,9 @@ public Filter[] pushedFilters() {
102106

103107
@Override
104108
public boolean pushLimit(int limit) {
109+
if (readContext.planAnalysisFoundNoRows()) {
110+
return false;
111+
}
105112
if (logger.isDebugEnabled()) {
106113
logger.debug("Pushing down limit: {}", limit);
107114
}
@@ -111,6 +118,9 @@ public boolean pushLimit(int limit) {
111118

112119
@Override
113120
public boolean pushTopN(SortOrder[] orders, int limit) {
121+
if (readContext.planAnalysisFoundNoRows()) {
122+
return false;
123+
}
114124
// This will be invoked when the user calls both orderBy and limit in their Spark program. If the user only
115125
// calls limit, then only pushLimit is called and this will not be called. If the user only calls orderBy and
116126
// not limit, then neither this nor pushLimit will be called.
@@ -131,6 +141,9 @@ public boolean isPartiallyPushed() {
131141

132142
@Override
133143
public boolean pushOffset(int offset) {
144+
if (readContext.planAnalysisFoundNoRows()) {
145+
return false;
146+
}
134147
if (logger.isDebugEnabled()) {
135148
logger.debug("Pushing down offset: {}", offset);
136149
}
@@ -140,6 +153,9 @@ public boolean pushOffset(int offset) {
140153

141154
@Override
142155
public boolean pushAggregation(Aggregation aggregation) {
156+
if (readContext.planAnalysisFoundNoRows()) {
157+
return false;
158+
}
143159
if (supportCompletePushDown(aggregation)) {
144160
if (aggregation.groupByExpressions().length > 0) {
145161
Expression expr = aggregation.groupByExpressions()[0];
@@ -160,6 +176,9 @@ public boolean pushAggregation(Aggregation aggregation) {
160176

161177
@Override
162178
public boolean supportCompletePushDown(Aggregation aggregation) {
179+
if (readContext.planAnalysisFoundNoRows()) {
180+
return false;
181+
}
163182
AggregateFunc[] expressions = aggregation.aggregateExpressions();
164183
if (expressions.length == 1 && expressions[0] instanceof CountStar) {
165184
// If a count() is used, it's supported if there's no groupBy - i.e. just doing a count() by itself -
@@ -171,6 +190,9 @@ public boolean supportCompletePushDown(Aggregation aggregation) {
171190

172191
@Override
173192
public void pruneColumns(StructType requiredSchema) {
193+
if (readContext.planAnalysisFoundNoRows()) {
194+
return;
195+
}
174196
if (requiredSchema.equals(readContext.getSchema())) {
175197
if (logger.isDebugEnabled()) {
176198
logger.debug("The schema to push down is equal to the existing schema, so not pushing it down.");

src/main/java/com/marklogic/spark/reader/ReadContext.java

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -144,60 +144,48 @@ private PlanBuilder.Plan buildPlanForBucket(RowManager rowManager, PlanAnalysis.
144144
}
145145

146146
void pushDownFiltersIntoOpticQuery(List<OpticFilter> opticFilters) {
147-
if (planAnalysisFoundAtLeastOneRow()) {
148-
this.opticFilters = opticFilters;
149-
addOperatorToPlan(PlanUtil.buildWhere(opticFilters));
150-
}
147+
this.opticFilters = opticFilters;
148+
addOperatorToPlan(PlanUtil.buildWhere(opticFilters));
151149
}
152150

153151
void pushDownLimit(int limit) {
154-
if (planAnalysisFoundAtLeastOneRow()) {
155-
addOperatorToPlan(PlanUtil.buildLimit(limit));
156-
}
152+
addOperatorToPlan(PlanUtil.buildLimit(limit));
157153
}
158154

159155
void pushDownOffset(int offset) {
160-
if (planAnalysisFoundAtLeastOneRow()) {
161-
addOperatorToPlan(PlanUtil.buildOffset(offset));
162-
}
156+
addOperatorToPlan(PlanUtil.buildOffset(offset));
163157
}
164158

165159
void pushDownTopN(SortOrder[] orders, int limit) {
166-
if (planAnalysisFoundAtLeastOneRow()) {
167-
for (SortOrder sortOrder : orders) {
168-
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));
169-
}
170-
pushDownLimit(limit);
160+
for (SortOrder sortOrder : orders) {
161+
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));
171162
}
163+
pushDownLimit(limit);
172164
}
173165

174166
void pushDownCount() {
175-
if (planAnalysisFoundAtLeastOneRow()) {
176-
addOperatorToPlan(PlanUtil.buildGroupByCount());
177-
// As will likely be the case for all aggregations, the schema needs to be modified.
178-
this.schema = new StructType().add("count", DataTypes.LongType);
179-
modifyPlanAnalysisToUseSingleBucket();
180-
}
167+
addOperatorToPlan(PlanUtil.buildGroupByCount());
168+
// As will likely be the case for all aggregations, the schema needs to be modified.
169+
this.schema = new StructType().add("count", DataTypes.LongType);
170+
modifyPlanAnalysisToUseSingleBucket();
181171
}
182172

183173
void pushDownGroupByCount(Expression groupBy) {
184-
if (planAnalysisFoundAtLeastOneRow()) {
185-
final String columnName = PlanUtil.expressionToColumnName(groupBy);
186-
addOperatorToPlan(PlanUtil.buildGroupByCount(columnName));
187-
188-
StructField columnField = null;
189-
for (StructField field : this.schema.fields()) {
190-
if (columnName.equals(field.name())) {
191-
columnField = field;
192-
break;
193-
}
194-
}
195-
if (columnField == null) {
196-
throw new IllegalArgumentException("Unable to find groupBy column in schema; groupBy expression: " + groupBy.describe());
174+
final String columnName = PlanUtil.expressionToColumnName(groupBy);
175+
addOperatorToPlan(PlanUtil.buildGroupByCount(columnName));
176+
177+
StructField columnField = null;
178+
for (StructField field : this.schema.fields()) {
179+
if (columnName.equals(field.name())) {
180+
columnField = field;
181+
break;
197182
}
198-
this.schema = new StructType().add(columnField).add("count", DataTypes.LongType);
199-
modifyPlanAnalysisToUseSingleBucket();
200183
}
184+
if (columnField == null) {
185+
throw new IllegalArgumentException("Unable to find groupBy column in schema; groupBy expression: " + groupBy.describe());
186+
}
187+
this.schema = new StructType().add(columnField).add("count", DataTypes.LongType);
188+
modifyPlanAnalysisToUseSingleBucket();
201189
}
202190

203191
/**
@@ -213,16 +201,14 @@ private void modifyPlanAnalysisToUseSingleBucket() {
213201
}
214202

215203
void pushDownRequiredSchema(StructType requiredSchema) {
216-
if (planAnalysisFoundAtLeastOneRow()) {
217-
this.schema = requiredSchema;
218-
addOperatorToPlan(PlanUtil.buildSelect(requiredSchema));
219-
}
204+
this.schema = requiredSchema;
205+
addOperatorToPlan(PlanUtil.buildSelect(requiredSchema));
220206
}
221207

222-
private boolean planAnalysisFoundAtLeastOneRow() {
208+
boolean planAnalysisFoundNoRows() {
223209
// The planAnalysis will be null if no rows were found, which internal/viewinfo unfortunately throws an error
224210
// on. None of the push down operations need to be applied in this scenario.
225-
return planAnalysis != null;
211+
return planAnalysis == null;
226212
}
227213

228214
/**

src/test/java/com/marklogic/spark/AbstractIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class AbstractIntegrationTest extends AbstractSpringMarkLogicTest {
3838
protected final static String TEST_USERNAME = "spark-test-user";
3939
protected final static String TEST_PASSWORD = "spark";
4040
protected final static String CONNECTOR_IDENTIFIER = "com.marklogic.spark";
41+
protected final static String NO_AUTHORS_QUERY = "op.fromView('Medical', 'NoAuthors', '')";
4142

4243
private static MarkLogicVersion markLogicVersion;
4344

src/test/java/com/marklogic/spark/reader/PushDownCountTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,16 @@ void count() {
3838
"that regardless of the number of matching rows, MarkLogic can efficiently determine a count in a single " +
3939
"request.");
4040
}
41+
42+
@Test
43+
void noRowsFound() {
44+
long count = newDefaultReader()
45+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
46+
.load()
47+
.count();
48+
49+
assertEquals(0, count);
50+
assertEquals(0, countOfRowsReadFromMarkLogic, "When no rows exist, neither the count() operation nor the " +
51+
"pruneColumns() operation should be pushed down since there's no optimization to be done.");
52+
}
4153
}

src/test/java/com/marklogic/spark/reader/PushDownFilterTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ void equalToWithViewQualifier() {
5858
assertEquals(4, countOfRowsReadFromMarkLogic);
5959
}
6060

61+
@Test
62+
void noRowsFound() {
63+
assertEquals(0, newDefaultReader()
64+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
65+
.load()
66+
.filter("CitationID == 1")
67+
.collectAsList()
68+
.size());
69+
assertEquals(0, countOfRowsReadFromMarkLogic);
70+
}
71+
6172
@Test
6273
void equalToWithWhere() {
6374
assertEquals(2, getCountOfRowsWithFilter("CitationID = 5"));

src/test/java/com/marklogic/spark/reader/PushDownGroupByCountTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ void groupByWithNoQualifier() {
3939
assertEquals(1l, (long) rows.get(0).getAs("CitationID"));
4040
}
4141

42+
@Test
43+
void noRowsFound() {
44+
List<Row> rows = newDefaultReader()
45+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
46+
.load()
47+
.groupBy("CitationID")
48+
.count()
49+
.orderBy("CitationID")
50+
.collectAsList();
51+
52+
assertEquals(0, rows.size());
53+
assertEquals(0, countOfRowsReadFromMarkLogic);
54+
}
55+
4256
@Test
4357
void groupByWithView() {
4458
List<Row> rows = newDefaultReader()

src/test/java/com/marklogic/spark/reader/PushDownOffsetTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.marklogic.spark.reader;
1717

18+
import com.marklogic.spark.Options;
1819
import org.apache.spark.sql.Row;
1920
import org.junit.jupiter.api.Test;
2021

@@ -36,6 +37,18 @@ void offset() {
3637
assertEquals(7, countOfRowsReadFromMarkLogic);
3738
}
3839

40+
@Test
41+
void noRowsFound() {
42+
List<Row> rows = newDefaultReader()
43+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
44+
.load()
45+
.offset(1)
46+
.collectAsList();
47+
48+
assertEquals(0, rows.size());
49+
assertEquals(0, countOfRowsReadFromMarkLogic);
50+
}
51+
3952
@Test
4053
void limitBeforeOffset() {
4154
List<Row> rows = newDatasetOrderedByCitationIDWithOneBucket()

src/test/java/com/marklogic/spark/reader/PushDownRequiredColumnsTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ void withNoQualifier() {
4848
assertEquals("Humbee", rows.get(0).getAs("LastName"));
4949
}
5050

51+
@Test
52+
void noRowsFound() {
53+
List<Row> rows = newDefaultReader()
54+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
55+
.load()
56+
.select("CitationID")
57+
.collectAsList();
58+
59+
assertEquals(0, rows.size());
60+
assertEquals(0, countOfRowsReadFromMarkLogic);
61+
}
62+
5163
@Test
5264
void withSchemaAndViewQualifiers() {
5365
List<Row> rows = newDefaultReader()

src/test/java/com/marklogic/spark/reader/ReadRowsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void emptyQualifier() {
7171
@Test
7272
void queryReturnsZeroRows() {
7373
List<Row> rows = newDefaultReader()
74-
.option(Options.READ_OPTIC_DSL, "op.fromView('Medical', 'NoAuthors')")
74+
.option(Options.READ_OPTIC_DSL, NO_AUTHORS_QUERY)
7575
.load()
7676
.collectAsList();
7777

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525

2626
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2728
import static org.junit.jupiter.api.Assertions.assertThrows;
2829
import static org.junit.jupiter.api.Assertions.assertTrue;
2930

@@ -191,6 +192,7 @@ void dontAbortOnFailure() {
191192
}
192193

193194
private void verifyFailureIsDueToLackOfPermission(SparkException ex) {
195+
assertNotNull(ex.getCause(), "Unexpected exception with no cause: " + ex.getClass() + "; " + ex.getMessage());
194196
assertTrue(ex.getCause() instanceof IOException, "Unexpected cause: " + ex.getCause().getClass());
195197
assertTrue(ex.getCause().getMessage().contains("Server Message: You do not have permission to this method and URL"),
196198
"Unexpected cause message: " + ex.getCause().getMessage());

0 commit comments

Comments
 (0)