Skip to content

Commit d24b463

Browse files
committed
Small tweaks to avoid unnecessary object creation
Was reading through the Spark javadocs as I saw the Batch objects being created multiple times and the partitions logic being called multiple times. That's not necessary for us - the partitions will be the same for each Batch object. So having the Scan object eagerly create a Batch when possible and having that Batch eagerly calculate its partitions as well. No functional change, but does make our logging less confusing because the user won't see what seem like duplicate log entries (like for the number of partitions being used for reading documents).
1 parent 9093360 commit d24b463

File tree

5 files changed

+33
-28
lines changed

5 files changed

+33
-28
lines changed

src/main/java/com/marklogic/spark/reader/customcode/CustomCodeBatch.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,10 @@
99
class CustomCodeBatch implements Batch {
1010

1111
private CustomCodeContext customCodeContext;
12-
private List<String> partitions;
12+
private InputPartition[] inputPartitions;
1313

14-
public CustomCodeBatch(CustomCodeContext customCodeContext, List<String> partitions) {
14+
CustomCodeBatch(CustomCodeContext customCodeContext, List<String> partitions) {
1515
this.customCodeContext = customCodeContext;
16-
this.partitions = partitions;
17-
}
18-
19-
@Override
20-
public InputPartition[] planInputPartitions() {
21-
InputPartition[] inputPartitions;
2216
if (partitions != null && partitions.size() > 1) {
2317
inputPartitions = new InputPartition[partitions.size()];
2418
for (int i = 0; i < partitions.size(); i++) {
@@ -27,6 +21,10 @@ public InputPartition[] planInputPartitions() {
2721
} else {
2822
inputPartitions = new InputPartition[]{new CustomCodePartition()};
2923
}
24+
}
25+
26+
@Override
27+
public InputPartition[] planInputPartitions() {
3028
return inputPartitions;
3129
}
3230

src/main/java/com/marklogic/spark/reader/customcode/CustomCodeScan.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class CustomCodeScan implements Scan {
1515

1616
private CustomCodeContext customCodeContext;
1717
private final List<String> partitions;
18+
private final CustomCodeBatch batch;
1819

1920
public CustomCodeScan(CustomCodeContext customCodeContext) {
2021
this.customCodeContext = customCodeContext;
@@ -35,6 +36,8 @@ public CustomCodeScan(CustomCodeContext customCodeContext) {
3536
client.release();
3637
}
3738
}
39+
40+
batch = new CustomCodeBatch(customCodeContext, partitions);
3841
}
3942

4043
@Override
@@ -44,7 +47,7 @@ public StructType readSchema() {
4447

4548
@Override
4649
public Batch toBatch() {
47-
return new CustomCodeBatch(customCodeContext, partitions);
50+
return batch;
4851
}
4952

5053
@Override

src/main/java/com/marklogic/spark/reader/document/DocumentBatch.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.marklogic.client.io.SearchHandle;
66
import com.marklogic.client.query.QueryManager;
77
import com.marklogic.client.query.SearchQueryDefinition;
8+
import com.marklogic.spark.Util;
89
import org.apache.spark.sql.connector.read.Batch;
910
import org.apache.spark.sql.connector.read.InputPartition;
1011
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
@@ -16,19 +17,15 @@ class DocumentBatch implements Batch {
1617
private static final Logger logger = LoggerFactory.getLogger(DocumentBatch.class);
1718

1819
private final DocumentContext context;
19-
20-
DocumentBatch(DocumentContext context) {
21-
this.context = context;
22-
}
20+
private final InputPartition[] partitions;
2321

2422
/**
2523
* Reuses the DMSDK support for obtaining a list of all eligible forests. A partition reader will then be
2624
* created for each partition/forest.
27-
*
28-
* @return
2925
*/
30-
@Override
31-
public InputPartition[] planInputPartitions() {
26+
DocumentBatch(DocumentContext context) {
27+
this.context = context;
28+
3229
DatabaseClient client = this.context.connectToMarkLogic();
3330
Forest[] forests = client.newDataMovementManager().readForestConfig().listForests();
3431

@@ -47,11 +44,16 @@ public InputPartition[] planInputPartitions() {
4744
logger.debug("Creating forest partitions; query estimate: {}; server timestamp: {}", estimate, serverTimestamp);
4845
}
4946
ForestPartitionPlanner planner = new ForestPartitionPlanner(context.getPartitionsPerForest());
50-
ForestPartition[] partitions = planner.makePartitions(estimate, serverTimestamp, forests);
51-
if (logger.isDebugEnabled()) {
52-
logger.debug("Created {} forest partitions", partitions.length);
47+
this.partitions = planner.makePartitions(estimate, serverTimestamp, forests);
48+
49+
if (Util.MAIN_LOGGER.isInfoEnabled()) {
50+
logger.info("Created {} partitions; query estimate: {}", partitions.length, estimate);
5351
}
54-
return partitions;
52+
}
53+
54+
@Override
55+
public InputPartition[] planInputPartitions() {
56+
return this.partitions;
5557
}
5658

5759
@Override

src/main/java/com/marklogic/spark/reader/document/DocumentScan.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77
class DocumentScan implements Scan {
88

9-
private final DocumentContext context;
9+
private final DocumentBatch batch;
1010

1111
DocumentScan(DocumentContext context) {
12-
this.context = context;
12+
this.batch = new DocumentBatch(context);
1313
}
1414

1515
@Override
@@ -19,6 +19,6 @@ public StructType readSchema() {
1919

2020
@Override
2121
public Batch toBatch() {
22-
return new DocumentBatch(context);
22+
return this.batch;
2323
}
2424
}

src/main/java/com/marklogic/spark/reader/optic/OpticBatch.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,19 @@ class OpticBatch implements Batch {
2626
private static final Logger logger = LoggerFactory.getLogger(OpticBatch.class);
2727

2828
private final ReadContext readContext;
29+
private final InputPartition[] partitions;
2930

3031
OpticBatch(ReadContext readContext) {
3132
this.readContext = readContext;
33+
PlanAnalysis planAnalysis = readContext.getPlanAnalysis();
34+
partitions = planAnalysis != null ?
35+
planAnalysis.getPartitionArray() :
36+
new InputPartition[]{};
3237
}
3338

3439
@Override
3540
public InputPartition[] planInputPartitions() {
36-
PlanAnalysis planAnalysis = readContext.getPlanAnalysis();
37-
return planAnalysis != null ?
38-
planAnalysis.getPartitionArray() :
39-
new InputPartition[]{};
41+
return partitions;
4042
}
4143

4244
@Override

0 commit comments

Comments
 (0)