Skip to content

Commit d6fe65a

Browse files
authored
Merge pull request #157 from marklogic/feature/12218-forest-partitions
MLE-12218 User can now configure partitions per forest
2 parents 7636a2f + 7c4ea58 commit d6fe65a

15 files changed

+310
-71
lines changed

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ protected long getNumericOption(String optionName, long defaultValue, long minim
113113
Long.parseLong(this.getProperties().get(optionName)) :
114114
defaultValue;
115115
if (value < minimumValue) {
116-
throw new IllegalArgumentException(String.format("Value of '%s' option must be %d or greater", optionName, minimumValue));
116+
throw new ConnectorException(String.format("Value of '%s' option must be %d or greater.", optionName, minimumValue));
117117
}
118118
return value;
119119
} catch (NumberFormatException ex) {
120-
throw new IllegalArgumentException(String.format("Value of '%s' option must be numeric", optionName), ex);
120+
throw new ConnectorException(String.format("Value of '%s' option must be numeric.", optionName), ex);
121121
}
122122
}
123123

src/main/java/com/marklogic/spark/Options.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ public abstract class Options {
4848
public static final String READ_DOCUMENTS_STRING_QUERY = "spark.marklogic.read.documents.stringQuery";
4949
// Corresponds to the complex query submitted via the request body at https://docs.marklogic.com/REST/POST/v1/search .
5050
public static final String READ_DOCUMENTS_QUERY = "spark.marklogic.read.documents.query";
51+
public static final String READ_DOCUMENTS_QUERY_FORMAT = "spark.marklogic.read.documents.queryFormat";
5152
public static final String READ_DOCUMENTS_OPTIONS = "spark.marklogic.read.documents.options";
5253
public static final String READ_DOCUMENTS_DIRECTORY = "spark.marklogic.read.documents.directory";
5354
public static final String READ_DOCUMENTS_TRANSFORM = "spark.marklogic.read.documents.transform";
5455
public static final String READ_DOCUMENTS_TRANSFORM_PARAMS = "spark.marklogic.read.documents.transformParams";
5556
public static final String READ_DOCUMENTS_TRANSFORM_PARAMS_DELIMITER = "spark.marklogic.read.documents.transformParamsDelimiter";
57+
public static final String READ_DOCUMENTS_PARTITIONS_PER_FOREST = "spark.marklogic.read.documents.partitionsPerForest";
5658

5759
public static final String READ_FILES_COMPRESSION = "spark.marklogic.read.files.compression";
5860

src/main/java/com/marklogic/spark/reader/document/DocumentBatch.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@
22

33
import com.marklogic.client.DatabaseClient;
44
import com.marklogic.client.datamovement.Forest;
5+
import com.marklogic.client.io.SearchHandle;
6+
import com.marklogic.client.query.QueryManager;
7+
import com.marklogic.client.query.SearchQueryDefinition;
58
import org.apache.spark.sql.connector.read.Batch;
69
import org.apache.spark.sql.connector.read.InputPartition;
710
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
811
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
914

1015
class DocumentBatch implements Batch {
1116

17+
private static final Logger logger = LoggerFactory.getLogger(DocumentBatch.class);
18+
1219
private DocumentContext context;
1320

1421
DocumentBatch(CaseInsensitiveStringMap options) {
@@ -25,9 +32,25 @@ class DocumentBatch implements Batch {
2532
public InputPartition[] planInputPartitions() {
2633
DatabaseClient client = this.context.connectToMarkLogic();
2734
Forest[] forests = client.newDataMovementManager().readForestConfig().listForests();
28-
InputPartition[] partitions = new InputPartition[forests.length];
29-
for (int i = 0; i < forests.length; i++) {
30-
partitions[i] = new ForestPartition(forests[i].getForestName());
35+
36+
SearchQueryDefinition query = this.context.buildSearchQuery(client);
37+
// Must null this out so SearchHandle still works below.
38+
query.setResponseTransform(null);
39+
40+
QueryManager queryManager = client.newQueryManager();
41+
queryManager.setPageLength(1);
42+
43+
SearchHandle handle = queryManager.search(query, new SearchHandle());
44+
final long estimate = handle.getTotalResults();
45+
final long serverTimestamp = handle.getServerTimestamp();
46+
47+
if (logger.isDebugEnabled()) {
48+
logger.debug("Creating forest partitions; query estimate: {}; server timestamp: {}", estimate, serverTimestamp);
49+
}
50+
ForestPartitionPlanner planner = new ForestPartitionPlanner(context.getPartitionsPerForest());
51+
ForestPartition[] partitions = planner.makePartitions(estimate, serverTimestamp, forests);
52+
if (logger.isDebugEnabled()) {
53+
logger.debug("Created {} forest partitions", partitions.length);
3154
}
3255
return partitions;
3356
}

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.marklogic.client.DatabaseClient;
44
import com.marklogic.client.document.DocumentManager;
55
import com.marklogic.client.query.SearchQueryDefinition;
6-
import com.marklogic.spark.ConnectorException;
76
import com.marklogic.spark.ContextSupport;
87
import com.marklogic.spark.Options;
98
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -52,6 +51,7 @@ SearchQueryDefinition buildSearchQuery(DatabaseClient client) {
5251
return new SearchQueryBuilder()
5352
.withStringQuery(props.get(Options.READ_DOCUMENTS_STRING_QUERY))
5453
.withQuery(props.get(Options.READ_DOCUMENTS_QUERY))
54+
.withQueryFormat(props.get(Options.READ_DOCUMENTS_QUERY_FORMAT))
5555
.withCollections(props.get(Options.READ_DOCUMENTS_COLLECTIONS))
5656
.withDirectory(props.get(Options.READ_DOCUMENTS_DIRECTORY))
5757
.withOptionsName(props.get(Options.READ_DOCUMENTS_OPTIONS))
@@ -62,18 +62,15 @@ SearchQueryDefinition buildSearchQuery(DatabaseClient client) {
6262
}
6363

6464
int getBatchSize() {
65-
if (hasOption(Options.READ_BATCH_SIZE)) {
66-
String value = getProperties().get(Options.READ_BATCH_SIZE);
67-
try {
68-
return Integer.parseInt(value);
69-
} catch (NumberFormatException e) {
70-
String message = String.format("Invalid value for option %s: %s; must be numeric.", Options.READ_BATCH_SIZE, value);
71-
throw new ConnectorException(message);
72-
}
73-
}
7465
// Testing has shown that at least for smaller documents, 100 or 200 can be significantly slower than something
7566
// like 1000 or even 10000. 500 is thus used as a default that should still be reasonably performant for larger
7667
// documents.
77-
return 500;
68+
int defaultBatchSize = 500;
69+
return (int) getNumericOption(Options.READ_BATCH_SIZE, defaultBatchSize, 1);
70+
}
71+
72+
int getPartitionsPerForest() {
73+
int defaultPartitionsPerForest = 2;
74+
return (int) getNumericOption(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, defaultPartitionsPerForest, 1);
7875
}
7976
}

src/main/java/com/marklogic/spark/reader/document/ForestPartition.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,36 @@ class ForestPartition implements InputPartition {
66

77
static final long serialVersionUID = 1;
88

9-
private String forestName;
9+
private final String forestName;
10+
private final long serverTimestamp;
11+
private final Long offsetStart;
12+
private final Long offsetEnd;
1013

11-
ForestPartition(String forestName) {
14+
ForestPartition(String forestName, long serverTimestamp, Long offsetStart, Long offsetEnd) {
1215
this.forestName = forestName;
16+
this.serverTimestamp = serverTimestamp;
17+
this.offsetStart = offsetStart;
18+
this.offsetEnd = offsetEnd;
1319
}
1420

15-
public String getForestName() {
21+
String getForestName() {
1622
return forestName;
1723
}
24+
25+
long getServerTimestamp() {
26+
return serverTimestamp;
27+
}
28+
29+
Long getOffsetStart() {
30+
return offsetStart;
31+
}
32+
33+
Long getOffsetEnd() {
34+
return offsetEnd;
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return String.format("[%s; %d; %d]", forestName, offsetStart, offsetEnd);
40+
}
1841
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.marklogic.spark.reader.document;
2+
3+
import com.marklogic.client.datamovement.Forest;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
class ForestPartitionPlanner {
9+
10+
private final int partitionsPerForest;
11+
12+
ForestPartitionPlanner(int partitionsPerForest) {
13+
this.partitionsPerForest = partitionsPerForest;
14+
}
15+
16+
ForestPartition[] makePartitions(long estimate, long serverTimestamp, Forest... forests) {
17+
final long urisPerForest = (long) (Math.ceil((double) estimate / forests.length));
18+
final long urisPerPartition = (long) (Math.ceil((double) urisPerForest / partitionsPerForest));
19+
20+
List<ForestPartition> partitions = new ArrayList<>();
21+
for (int i = 0; i < forests.length; i++) {
22+
Forest forest = forests[i];
23+
long offset = 1;
24+
for (int j = 0; j < partitionsPerForest; j++) {
25+
// If the offset for this forest exceeds the estimate across all forests, then the user has asked for
26+
// too many partitions. Any subsequent partition will not return any results, so we stop creating partitions.
27+
if (offset > estimate) {
28+
break;
29+
}
30+
Long offsetEnd = j < (partitionsPerForest - 1) ? (urisPerPartition + offset - 1) : null;
31+
partitions.add(new ForestPartition(forest.getForestName(), serverTimestamp, offset, offsetEnd));
32+
offset += urisPerPartition;
33+
}
34+
}
35+
return partitions.toArray(new ForestPartition[]{});
36+
}
37+
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,32 @@
3636
class ForestReader implements PartitionReader<InternalRow> {
3737

3838
private static final Logger logger = LoggerFactory.getLogger(ForestReader.class);
39+
3940
private final UriBatcher uriBatcher;
4041
private final GenericDocumentManager documentManager;
4142
private final StructuredQueryBuilder queryBuilder;
4243
private final Set<DocumentManager.Metadata> requestedMetadata;
4344
private final boolean contentWasRequested;
4445

4546
// Only used for logging.
46-
private final String forestName;
47+
private final ForestPartition forestPartition;
48+
private long startTime;
4749

4850
private DocumentPage currentDocumentPage;
4951

50-
// Used for logging/debugging.
51-
private long startTime;
5252
private int docCount;
5353

5454
ForestReader(ForestPartition forestPartition, DocumentContext documentContext) {
55-
this.forestName = forestPartition.getForestName();
5655
if (logger.isDebugEnabled()) {
57-
logger.debug("Will read from forest: {}", this.forestName);
56+
logger.debug("Will read from partition: {}", forestPartition);
5857
}
58+
this.forestPartition = forestPartition;
5959

6060
DatabaseClient client = documentContext.connectToMarkLogic();
6161

6262
SearchQueryDefinition query = documentContext.buildSearchQuery(client);
6363
int batchSize = documentContext.getBatchSize();
64-
this.uriBatcher = new UriBatcher(client, query, forestPartition.getForestName(), batchSize, true);
64+
this.uriBatcher = new UriBatcher(client, query, forestPartition, batchSize, false);
6565

6666
this.documentManager = client.newDocumentManager();
6767
this.documentManager.setReadTransform(query.getResponseTransform());
@@ -82,7 +82,8 @@ public boolean next() {
8282
if (uris.isEmpty()) {
8383
// TBD on whether this should be info/debug.
8484
if (logger.isInfoEnabled()) {
85-
logger.info("Read {} documents from forest {} in {}ms", docCount, forestName, System.currentTimeMillis() - startTime);
85+
long duration = System.currentTimeMillis() - startTime;
86+
logger.info("Read {} documents from partition {} in {}ms", docCount, forestPartition, duration);
8687
}
8788
return false;
8889
}
@@ -116,8 +117,8 @@ private List<String> getNextBatchOfUris() {
116117
long start = System.currentTimeMillis();
117118
List<String> uris = uriBatcher.nextBatchOfUris();
118119
if (logger.isTraceEnabled()) {
119-
logger.trace("Retrieved {} URIs in {}ms from forest {}", uris.size(),
120-
(System.currentTimeMillis() - start), this.forestName);
120+
logger.trace("Retrieved {} URIs in {}ms from partition {}", uris.size(),
121+
(System.currentTimeMillis() - start), this.forestPartition);
121122
}
122123
return uris;
123124
}
@@ -134,7 +135,7 @@ private DocumentPage readPage(List<String> uris) {
134135
// some inefficiency if the caller only wants metadata and no content.
135136
DocumentPage page = this.documentManager.search(queryDefinition, 0);
136137
if (logger.isTraceEnabled()) {
137-
logger.trace("Retrieved page of documents in {}ms from forest {}", (System.currentTimeMillis() - start), this.forestName);
138+
logger.trace("Retrieved page of documents in {}ms from partition {}", (System.currentTimeMillis() - start), this.forestPartition);
138139
}
139140
return page;
140141
}

src/main/java/com/marklogic/spark/reader/document/SearchQueryBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.marklogic.client.DatabaseClient;
44
import com.marklogic.client.document.ServerTransform;
5+
import com.marklogic.client.io.Format;
56
import com.marklogic.client.io.StringHandle;
67
import com.marklogic.client.query.*;
78

@@ -13,6 +14,7 @@ public class SearchQueryBuilder {
1314

1415
private String stringQuery;
1516
private String query;
17+
private Format queryFormat;
1618
private String[] collections;
1719
private String directory;
1820
private String optionsName;
@@ -49,6 +51,13 @@ public SearchQueryBuilder withQuery(String query) {
4951
return this;
5052
}
5153

54+
public SearchQueryBuilder withQueryFormat(String format) {
55+
if (format != null) {
56+
this.queryFormat = Format.valueOf(format.toUpperCase());
57+
}
58+
return this;
59+
}
60+
5261
public SearchQueryBuilder withCollections(String value) {
5362
if (value != null) {
5463
this.collections = value.split(",");
@@ -86,7 +95,11 @@ private QueryDefinition buildQueryDefinition(DatabaseClient client) {
8695
// The Java Client misleadingly suggests a distinction amongst the 3 complex queries - structured,
8796
// serialized CTS, and combined - but the REST API does not.
8897
if (query != null) {
89-
RawStructuredQueryDefinition queryDefinition = queryManager.newRawStructuredQueryDefinition(new StringHandle(query));
98+
StringHandle queryHandle = new StringHandle(query);
99+
if (queryFormat != null) {
100+
queryHandle.withFormat(queryFormat);
101+
}
102+
RawStructuredQueryDefinition queryDefinition = queryManager.newRawStructuredQueryDefinition(queryHandle);
90103
if (stringQuery != null && stringQuery.length() > 0) {
91104
queryDefinition.withCriteria(stringQuery);
92105
}

src/main/java/com/marklogic/spark/reader/document/UriBatcher.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,26 @@
1717
class UriBatcher {
1818

1919
private final DatabaseClient client;
20+
private final QueryManagerImpl queryManager;
2021
private final SearchQueryDefinition query;
21-
private final boolean filtered;
22-
private final String forestName;
22+
private final ForestPartition partition;
2323
private final int pageLength;
24+
private final boolean filtered;
2425

25-
private long serverTimestamp = -1;
26+
// These change as batches of URIs are retrieved.
2627
private String lastUri;
27-
private long start = 1;
28+
private long offsetStart = 1;
29+
2830

29-
UriBatcher(DatabaseClient client, SearchQueryDefinition query, String forestName, int pageLength, boolean filtered) {
31+
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength, boolean filtered) {
3032
this.client = client;
33+
this.queryManager = (QueryManagerImpl) this.client.newQueryManager();
34+
this.queryManager.setPageLength(pageLength);
3135
this.query = query;
32-
this.filtered = filtered;
33-
this.forestName = forestName;
36+
this.partition = partition;
37+
this.offsetStart = this.partition.getOffsetStart();
3438
this.pageLength = pageLength;
39+
this.filtered = filtered;
3540
}
3641

3742
/**
@@ -40,23 +45,27 @@ class UriBatcher {
4045
* without resorting to pagination.
4146
*/
4247
List<String> nextBatchOfUris() {
43-
QueryManagerImpl queryManager = (QueryManagerImpl) client.newQueryManager();
44-
queryManager.setPageLength(this.pageLength);
48+
if (partition.getOffsetEnd() != null && this.offsetStart > partition.getOffsetEnd()) {
49+
return new ArrayList<>();
50+
}
51+
4552
UrisHandle urisHandle = new UrisHandle();
46-
if (this.serverTimestamp > -1) {
47-
urisHandle.setPointInTimeQueryTimestamp(this.serverTimestamp);
53+
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
54+
55+
// If we have an offsetEnd, the page length is adjusted to ensure we do not go past offsetEnd.
56+
if (partition.getOffsetEnd() != null && (this.offsetStart + this.pageLength > partition.getOffsetEnd())) {
57+
// e.g. 9001 to 10000; need a page length of 1000, so 1 is added.
58+
int finalPageLength = (int) (partition.getOffsetEnd() - this.offsetStart) + 1;
59+
this.queryManager.setPageLength(finalPageLength);
4860
}
4961

50-
try (UrisHandle results = queryManager.uris("POST", query, filtered, urisHandle, start, lastUri, forestName)) {
51-
if (serverTimestamp == -1) {
52-
this.serverTimestamp = results.getServerTimestamp();
53-
}
62+
try (UrisHandle results = queryManager.uris("POST", query, filtered, urisHandle, offsetStart, lastUri, partition.getForestName())) {
5463
List<String> uris = new ArrayList<>();
5564
results.iterator().forEachRemaining(uris::add);
5665
if (!uris.isEmpty()) {
5766
this.lastUri = uris.get(uris.size() - 1);
5867
}
59-
this.start = this.start + uris.size();
68+
this.offsetStart = this.offsetStart + uris.size();
6069
return uris;
6170
} catch (ResourceNotFoundException ex) {
6271
// QueryBatcherImpl notes that this is how internal/uris informs us that there are no results left.

0 commit comments

Comments
 (0)