Skip to content

Commit 17580b7

Browse files
authored
Merge pull request #65 from marklogic/feature/spark-3.3
Testing against Spark 3.3.2 now
2 parents d8eb4da + a3e7824 commit 17580b7

16 files changed

+88
-146
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ repositories {
2020
}
2121

2222
dependencies {
23-
compileOnly 'org.apache.spark:spark-sql_2.12:3.4.0'
23+
compileOnly 'org.apache.spark:spark-sql_2.12:' + sparkVersion
2424
implementation "com.marklogic:marklogic-client-api:6.2.1"
2525

2626
// Makes it possible to use lambdas in Java 8 to implement Spark's Function1 and Function2 interfaces
@@ -30,7 +30,7 @@ dependencies {
3030
exclude module: "scala-library"
3131
}
3232

33-
testImplementation 'org.apache.spark:spark-sql_2.12:3.4.0'
33+
testImplementation 'org.apache.spark:spark-sql_2.12:' + sparkVersion
3434
testImplementation 'com.marklogic:ml-app-deployer:4.5.2'
3535
testImplementation 'com.marklogic:marklogic-junit5:1.3.0'
3636
testImplementation "ch.qos.logback:logback-classic:1.3.5"

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ reading data from and writing data to MarkLogic.
99

1010
The connector has the following system requirements:
1111

12-
* Apache Spark 3.4.0 or higher; earlier versions of Spark 3.x may work but have not been tested.
12+
* Apache Spark 3.3.0 or higher; Spark 3.3.2 and Spark 3.4.0 are recommended to better leverage the connector's support
13+
for pushing operations down when reading data.
1314
* For writing data, MarkLogic 9.0-9 or higher.
1415
* For reading data, MarkLogic 10.0-9 or higher.
1516

docs/reading.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ down the following operations to MarkLogic:
118118
- `filter` and `where`
119119
- `groupBy` when followed by `count`
120120
- `limit`
121-
- `offset`
122121
- `orderBy`
123122

124123
For each of the above operations, user's Optic query is enhanced to include the associated Optic function.

examples/java-dependency/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ repositories {
88
}
99

1010
dependencies {
11-
implementation 'org.apache.spark:spark-sql_2.12:3.4.0'
11+
implementation 'org.apache.spark:spark-sql_2.12:3.3.2'
1212
implementation 'com.marklogic:marklogic-spark-connector:2.0-SNAPSHOT'
1313
}
1414

gradle.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# Testing against 3.3.2 for the 2.0.0 release as 3.3.0 was released in June 2022 and 3.3.2 in February 2023, while
2+
# 3.4.0 is fairly new - April 2023. And at least AWS Glue and EMR are only on 3.3.0. But 3.3.2 has bug fixes that
3+
# affect some of our tests - see PushDownGroupByCountTest for an example. So we're choosing to build and test
4+
# against the latest 3.3.x release so we're not writing assertions based on buggy behavior in Spark 3.3.0.
5+
sparkVersion=3.3.2
6+
17
# Only used for the test app and for running tests.
28
mlHost=localhost
39
mlAppName=spark-test

src/main/java/com/marklogic/spark/reader/MarkLogicScanBuilder.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.spark.sql.connector.read.SupportsPushDownAggregates;
2828
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
2929
import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
30-
import org.apache.spark.sql.connector.read.SupportsPushDownOffset;
3130
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
3231
import org.apache.spark.sql.connector.read.SupportsPushDownTopN;
3332
import org.apache.spark.sql.sources.Filter;
@@ -40,7 +39,7 @@
4039
import java.util.List;
4140

4241
public class MarkLogicScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit,
43-
SupportsPushDownOffset, SupportsPushDownTopN, SupportsPushDownAggregates, SupportsPushDownRequiredColumns {
42+
SupportsPushDownTopN, SupportsPushDownAggregates, SupportsPushDownRequiredColumns {
4443

4544
private final static Logger logger = LoggerFactory.getLogger(MarkLogicScanBuilder.class);
4645

@@ -139,18 +138,6 @@ public boolean isPartiallyPushed() {
139138
return readContext.getBucketCount() > 1;
140139
}
141140

142-
@Override
143-
public boolean pushOffset(int offset) {
144-
if (readContext.planAnalysisFoundNoRows()) {
145-
return false;
146-
}
147-
if (logger.isDebugEnabled()) {
148-
logger.debug("Pushing down offset: {}", offset);
149-
}
150-
readContext.pushDownOffset(offset);
151-
return true;
152-
}
153-
154141
@Override
155142
public boolean pushAggregation(Aggregation aggregation) {
156143
if (readContext.planAnalysisFoundNoRows()) {
@@ -160,7 +147,7 @@ public boolean pushAggregation(Aggregation aggregation) {
160147
if (aggregation.groupByExpressions().length > 0) {
161148
Expression expr = aggregation.groupByExpressions()[0];
162149
if (logger.isDebugEnabled()) {
163-
logger.debug("Pushing down by groupBy + count on: {}", expr.describe());
150+
logger.debug("Pushing down groupBy + count on: {}", expr.describe());
164151
}
165152
readContext.pushDownGroupByCount(expr);
166153
} else {

src/main/java/com/marklogic/spark/reader/PlanUtil.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,6 @@ static ObjectNode buildLimit(int limit) {
6767
return newOperation("limit", args -> args.add(limit));
6868
}
6969

70-
static ObjectNode buildOffset(int offset) {
71-
return newOperation("offset", args -> args.add(offset));
72-
}
73-
7470
static ObjectNode buildOrderBy(SortOrder sortOrder) {
7571
final String direction = SortDirection.ASCENDING.equals(sortOrder.direction()) ? "asc" : "desc";
7672
final String columnName = expressionToColumnName(sortOrder.expression());

src/main/java/com/marklogic/spark/reader/ReadContext.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,6 @@ void pushDownLimit(int limit) {
152152
addOperatorToPlan(PlanUtil.buildLimit(limit));
153153
}
154154

155-
void pushDownOffset(int offset) {
156-
addOperatorToPlan(PlanUtil.buildOffset(offset));
157-
}
158-
159155
void pushDownTopN(SortOrder[] orders, int limit) {
160156
for (SortOrder sortOrder : orders) {
161157
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));

src/test/java/com/marklogic/spark/AbstractIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import com.marklogic.junit5.spring.SimpleTestConfig;
2020
import org.apache.spark.sql.DataFrameReader;
2121
import org.apache.spark.sql.SparkSession;
22+
import org.apache.spark.util.VersionUtils;
2223
import org.junit.jupiter.api.AfterEach;
2324
import org.springframework.beans.factory.annotation.Autowired;
2425
import org.springframework.core.io.ClassPathResource;
2526
import org.springframework.util.FileCopyUtils;
2627

2728
import java.io.IOException;
2829

30+
import static org.junit.jupiter.api.Assertions.assertNotNull;
31+
2932
/**
3033
* Uses marklogic-junit (from marklogic-unit-test) to construct a DatabaseClient
3134
* based on the properties in gradle.properties and gradle-local.properties.
@@ -120,4 +123,11 @@ protected final boolean isMarkLogic10() {
120123
return markLogicVersion.getMajor() == 10;
121124
}
122125

126+
protected final boolean isSpark340OrHigher() {
127+
assertNotNull(sparkSession, "Cannot check Spark version until a Spark Session has been created.");
128+
final String version = sparkSession.version();
129+
int major = VersionUtils.majorVersion(version);
130+
int minor = VersionUtils.minorVersion(version);
131+
return major > 3 || (major == 3 && minor >= 4);
132+
}
123133
}

src/test/java/com/marklogic/spark/reader/PushDownGroupByCountTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,25 @@ void groupByCountOrderByLimit() {
114114
.collectAsList();
115115

116116
assertEquals(4, rows.size());
117-
assertEquals(4, countOfRowsReadFromMarkLogic);
117+
if (isSpark340OrHigher()) {
118+
assertEquals(4, countOfRowsReadFromMarkLogic);
119+
} else {
120+
assertEquals(5, countOfRowsReadFromMarkLogic, "With Spark 3.3.x, the limit is not pushed down, perhaps " +
121+
"when groupBy is called as well. Spark 3.4.0 fixes this so that the limit is pushed down. So for 3.3.x, " +
122+
"we expect all 5 rows - one per CitationID.");
123+
}
118124
assertEquals(4l, (long) rows.get(0).getAs("CitationID"));
119125
assertEquals(1l, (long) rows.get(0).getAs("count"));
120126
}
121127

122128
private void verifyGroupByWasPushedDown(List<Row> rows) {
129+
/**
130+
* Note that in Spark 3.3.0, there seems to be a bug where groupBy+count are not always pushed down. That's not
131+
* an issue in Spark 3.3.2, so the behavior in 3.3.0 seems to be considered buggy and thus fixed in 3.3.2.
132+
* While AWS Glue and EMR are both currently using 3.3.0 and not 3.3.2, we'd rather test against the latest
133+
* bugfix release to ensure we're in sync with that and not writing test assertions against what's considered
134+
* buggy behavior in 3.3.0.
135+
*/
123136
assertEquals(5, countOfRowsReadFromMarkLogic, "groupBy should be pushed down to MarkLogic when used with " +
124137
"count, and since there are 5 CitationID values, 5 rows should be returned.");
125138

0 commit comments

Comments
 (0)