Skip to content

Commit f6d76ac

Browse files
authored
Merge pull request #73 from marklogic/feature/orderBy-fix
Fixing issue with multiple orderBy/sort clauses
2 parents 1de1b0e + 74ad40a commit f6d76ac

File tree

3 files changed

+21
-14
lines changed

3 files changed

+21
-14
lines changed

src/main/java/com/marklogic/spark/reader/PlanUtil.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,25 @@ static ObjectNode buildLimit(int limit) {
6868
return newOperation("limit", args -> args.add(limit));
6969
}
7070

71-
static ObjectNode buildOrderBy(SortOrder sortOrder) {
72-
final String direction = SortDirection.ASCENDING.equals(sortOrder.direction()) ? "asc" : "desc";
73-
final String columnName = expressionToColumnName(sortOrder.expression());
71+
static ObjectNode buildOrderBy(SortOrder[] sortOrders) {
7472
return newOperation("order-by", args -> {
75-
ArrayNode orderByArgs = args.addObject().put("ns", "op").put("fn", direction).putArray("args");
76-
// This may be a bad hack to account for when the user does a groupBy/count/orderBy/limit, which does not
77-
// seem like the correct approach - the Spark ScanBuilder javadocs indicate that it should be limit/orderBy
78-
// instead. In the former scenario, we get "COUNT(*)" as the expression to order by, and we know that's not
79-
// the column name.
80-
if (logger.isDebugEnabled()) {
81-
logger.debug("Adjusting `COUNT(*)` column to be `count`");
73+
ArrayNode innerArgs = args.addArray();
74+
for (SortOrder sortOrder: sortOrders) {
75+
final String direction = SortDirection.ASCENDING.equals(sortOrder.direction()) ? "asc" : "desc";
76+
ArrayNode orderByArgs = innerArgs.addObject().put("ns", "op").put("fn", direction).putArray("args");
77+
String columnName = expressionToColumnName(sortOrder.expression());
78+
// This may be a bad hack to account for when the user does a groupBy/count/orderBy/limit, which does not
79+
// seem like the correct approach - the Spark ScanBuilder javadocs indicate that it should be limit/orderBy
80+
// instead. In the former scenario, we get "COUNT(*)" as the expression to order by, and we know that's not
81+
// the column name.
82+
if ("COUNT(*)".equals(columnName)) {
83+
if (logger.isDebugEnabled()) {
84+
logger.debug("Adjusting `COUNT(*)` column to be `count`");
85+
}
86+
columnName = "count";
87+
}
88+
populateSchemaCol(orderByArgs.addObject(), columnName);
8289
}
83-
populateSchemaCol(orderByArgs.addObject(), "COUNT(*)".equals(columnName) ? "count" : columnName);
8490
});
8591
}
8692

src/main/java/com/marklogic/spark/reader/ReadContext.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,7 @@ void pushDownLimit(int limit) {
155155
}
156156

157157
void pushDownTopN(SortOrder[] orders, int limit) {
158-
for (SortOrder sortOrder : orders) {
159-
addOperatorToPlan(PlanUtil.buildOrderBy(sortOrder));
160-
}
158+
addOperatorToPlan(PlanUtil.buildOrderBy(orders));
161159
pushDownLimit(limit);
162160
}
163161

src/test/java/com/marklogic/spark/reader/PushDownOrderByAndLimitTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ void sort() {
192192
void sortByMultiple() {
193193
List<Row> rows = newDefaultReader()
194194
.option(Options.READ_OPTIC_QUERY, QUERY_WITH_NO_QUALIFIER)
195+
// Force a single request to ensure the orderBy is constructed correctly.
196+
.option(Options.READ_NUM_PARTITIONS, 1)
197+
.option(Options.READ_BATCH_SIZE, 0)
195198
.load()
196199
.sort("CitationID", "LastName")
197200
.limit(8)

0 commit comments

Comments
 (0)