Skip to content

Commit 5dae33f

Browse files
committed
MLE-17429 Added option for an inconsistent snapshot
Fixed a couple Sonar warnings introduced by the Sonar upgrade too. This will require manual verification via Flux during a long-running job where we can force an update and a merge.
1 parent 7c26c76 commit 5dae33f

File tree

10 files changed

+60
-20
lines changed

10 files changed

+60
-20
lines changed

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ public final String getStringOption(String option) {
145145
return hasOption(option) ? properties.get(option).trim() : null;
146146
}
147147

148+
public final boolean getBooleanOption(String option, boolean defaultValue) {
149+
return hasOption(option) ? Boolean.parseBoolean(getStringOption(option)) : defaultValue;
150+
}
151+
148152
public final boolean isStreamingFiles() {
149153
return "true".equalsIgnoreCase(getStringOption(Options.STREAM_FILES));
150154
}

src/main/java/com/marklogic/spark/Options.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public abstract class Options {
5959
public static final String READ_TRIPLES_FILTERED = "spark.marklogic.read.triples.filtered";
6060
public static final String READ_TRIPLES_BASE_IRI = "spark.marklogic.read.triples.baseIri";
6161

62+
/**
63+
* The connector uses a consistent snapshot by default. Setting this to false results in queries being executed
64+
* at multiple points of time, potentially yielding inconsistent results.
65+
*
66+
* @since 2.4.2
67+
*/
68+
public static final String READ_SNAPSHOT = "spark.marklogic.read.snapshot";
69+
6270
// For logging progress when reading documents, rows, or items via custom code. Defines the interval at which
6371
// progress should be logged - e.g. a value of 10,000 will result in a message being logged on every 10,000 items
6472
// being written/processed.

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ int getPartitionsPerForest() {
9999
return (int) getNumericOption(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, defaultPartitionsPerForest, 1);
100100
}
101101

102+
boolean isConsistentSnapshot() {
103+
// Starting in 2.2.0 and through 2.4.2, the default is a consistent snapshot. We may change this later.
104+
return getBooleanOption(Options.READ_SNAPSHOT, true);
105+
}
106+
102107
void setLimit(Integer limit) {
103108
this.limit = limit;
104109
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,16 @@ class ForestReader implements PartitionReader<InternalRow> {
6161
context.connectToMarkLogic(forestPartition.getHost()) :
6262
context.connectToMarkLogic();
6363

64+
final boolean filtered = context.getBooleanOption(Options.READ_DOCUMENTS_FILTERED, false);
65+
final boolean consistentSnapshot = context.isConsistentSnapshot();
66+
6467
if (logger.isDebugEnabled()) {
65-
logger.debug("Will read from host {} for partition: {}", client.getHost(), forestPartition);
68+
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
69+
client.getHost(), forestPartition, filtered, consistentSnapshot);
6670
}
6771

6872
SearchQueryDefinition query = context.buildSearchQuery(client);
69-
boolean filtered = false;
70-
if (context.hasOption(Options.READ_DOCUMENTS_FILTERED)) {
71-
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_DOCUMENTS_FILTERED));
72-
}
73-
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered);
73+
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);
7474

7575
this.documentManager = client.newDocumentManager();
7676
this.documentManager.setReadTransform(query.getResponseTransform());

src/main/java/com/marklogic/spark/reader/document/OpticTriplesReader.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
1616
import org.apache.spark.sql.connector.read.PartitionReader;
1717
import org.apache.spark.unsafe.types.UTF8String;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
1820

1921
import java.io.IOException;
2022
import java.net.URI;
@@ -27,6 +29,8 @@
2729
*/
2830
class OpticTriplesReader implements PartitionReader<InternalRow> {
2931

32+
private static final Logger logger = LoggerFactory.getLogger(OpticTriplesReader.class);
33+
3034
private static final String DATATYPE_COLUMN = "datatype";
3135
private static final String GRAPH_COLUMN = "graph";
3236
private static final String OBJECT_COLUMN = "object";
@@ -54,12 +58,15 @@ public OpticTriplesReader(ForestPartition forestPartition, DocumentContext conte
5458
this.op = this.rowManager.newPlanBuilder();
5559

5660
final SearchQueryDefinition query = context.buildTriplesSearchQuery(this.databaseClient);
57-
boolean filtered = false;
58-
if (context.hasOption(Options.READ_TRIPLES_FILTERED)) {
59-
filtered = Boolean.parseBoolean(context.getProperties().get(Options.READ_TRIPLES_FILTERED));
61+
final boolean filtered = context.getBooleanOption(Options.READ_TRIPLES_FILTERED, false);
62+
final boolean consistentSnapshot = context.isConsistentSnapshot();
63+
64+
if (logger.isDebugEnabled()) {
65+
logger.debug("Will read from host {} for partition: {}; filtered: {}; consistent snapshot: {}",
66+
databaseClient.getHost(), forestPartition, filtered, consistentSnapshot);
6067
}
61-
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered);
6268

69+
this.uriBatcher = new UriBatcher(this.databaseClient, query, forestPartition, context.getBatchSize(), filtered, consistentSnapshot);
6370
this.batchSize = context.getBatchSize();
6471
}
6572

src/main/java/com/marklogic/spark/reader/document/UriBatcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ class UriBatcher {
2525
private final ForestPartition partition;
2626
private final int pageLength;
2727
private final boolean filtered;
28+
private final boolean useConsistentSnapshot;
2829

2930
// These change as batches of URIs are retrieved.
3031
private String lastUri;
3132
private long offsetStart = 1;
3233

3334

34-
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength, boolean filtered) {
35+
UriBatcher(DatabaseClient client, SearchQueryDefinition query, ForestPartition partition, int pageLength,
36+
boolean filtered, boolean useConsistentSnapshot) {
3537
this.client = client;
3638
this.queryManager = (QueryManagerImpl) this.client.newQueryManager();
3739
this.queryManager.setPageLength(pageLength);
@@ -40,6 +42,7 @@ class UriBatcher {
4042
this.offsetStart = this.partition.getOffsetStart();
4143
this.pageLength = pageLength;
4244
this.filtered = filtered;
45+
this.useConsistentSnapshot = useConsistentSnapshot;
4346
}
4447

4548
/**
@@ -53,7 +56,9 @@ List<String> nextBatchOfUris() {
5356
}
5457

5558
UrisHandle urisHandle = new UrisHandle();
56-
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
59+
if (useConsistentSnapshot) {
60+
urisHandle.setPointInTimeQueryTimestamp(partition.getServerTimestamp());
61+
}
5762

5863
// If we have an offsetEnd, the page length is adjusted to ensure we do not go past offsetEnd.
5964
if (partition.getOffsetEnd() != null && (this.offsetStart + this.pageLength > partition.getOffsetEnd())) {

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,7 @@ public InputStream openFile(String filePath) {
6161
}
6262

6363
public boolean isReadAbortOnFailure() {
64-
if (hasOption(Options.READ_FILES_ABORT_ON_FAILURE)) {
65-
return Boolean.parseBoolean(getStringOption(Options.READ_FILES_ABORT_ON_FAILURE));
66-
}
67-
return true;
64+
return getBooleanOption(Options.READ_FILES_ABORT_ON_FAILURE, true);
6865
}
6966

7067
byte[] readBytes(InputStream inputStream) throws IOException {

src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ void readByCollection() {
4040
assertEquals("Vivianne", doc.get("ForeName").asText());
4141
}
4242

43+
@Test
44+
void dirtyRead() {
45+
Dataset<Row> rows = startRead()
46+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
47+
.option(Options.READ_SNAPSHOT, false)
48+
.load();
49+
50+
assertEquals(15, rows.count(), "This test only verifies that the snapshot option can be set to false. " +
51+
"We don't yet have a way to verify that the query doesn't use a consistent snapshot, which would entail " +
52+
"forcing the read to pause while an update and merge are performed in the database. Verifying the " +
53+
"difference between a consistent snapshot and a dirty read will need to be done manually, including " +
54+
"by inspecting the debug logs generated by this test.");
55+
}
56+
4357
@Test
4458
void logProgress() {
4559
newWriter().save();

src/test/java/com/marklogic/spark/writer/file/WriteFilesWithEncodingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ void prettyPrintJsonFile(@TempDir Path tempDir) {
142142
}
143143

144144
@Test
145-
void invalidEncoding(@TempDir Path tempDir) {
145+
void invalidEncoding() {
146146
DataFrameWriter writer = newSparkSession().read().format(CONNECTOR_IDENTIFIER)
147147
.option(Options.CLIENT_URI, makeClientUri())
148148
.option(Options.READ_DOCUMENTS_URIS, SAMPLE_JSON_DOC_URI)
@@ -151,7 +151,7 @@ void invalidEncoding(@TempDir Path tempDir) {
151151
.option(Options.WRITE_FILES_ENCODING, "not-valid-encoding")
152152
.mode(SaveMode.Append);
153153

154-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save(tempDir.toAbsolutePath().toString()));
154+
ConnectorException ex = assertThrowsConnectorException(() -> writer.save("."));
155155
assertEquals("Unsupported encoding value: not-valid-encoding", ex.getMessage());
156156
}
157157

src/test/java/com/marklogic/spark/writer/file/WriteRdfGzipFilesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void gzip(@TempDir Path tempDir) {
5454
}
5555

5656
@Test
57-
void zipIsntValidChoice(@TempDir Path tempDir) {
57+
void zipIsntValidChoice() {
5858
DataFrameWriter writer = newSparkSession()
5959
.read().format(CONNECTOR_IDENTIFIER)
6060
.option(Options.CLIENT_URI, makeClientUri())
@@ -65,7 +65,7 @@ void zipIsntValidChoice(@TempDir Path tempDir) {
6565
.option(Options.WRITE_FILES_COMPRESSION, "zip")
6666
.mode(SaveMode.Append);
6767

68-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save(tempDir.toFile().getAbsolutePath()));
68+
ConnectorException ex = assertThrowsConnectorException(() -> writer.save("."));
6969
assertEquals("Unsupported compression value; only 'gzip' is supported: zip", ex.getMessage());
7070
}
7171
}

0 commit comments

Comments
 (0)