Skip to content

Commit 3d157e2

Browse files
authored
Merge pull request #167 from marklogic/feature/uri-fix
Fixing bug with URI generation
2 parents b2deb97 + fdc10b4 commit 3d157e2

File tree

6 files changed

+36
-8
lines changed

6 files changed

+36
-8
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ dependencies {
5656
exclude module: 'jackson-dataformat-csv'
5757
}
5858

59-
testImplementation "ch.qos.logback:logback-classic:1.3.5"
59+
testImplementation "ch.qos.logback:logback-classic:1.3.14"
6060
testImplementation "org.slf4j:jcl-over-slf4j:1.7.36"
6161
testImplementation "org.skyscreamer:jsonassert:1.5.1"
6262
}

src/main/java/com/marklogic/spark/writer/MarkLogicWrite.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,16 @@ public class MarkLogicWrite implements BatchWrite, StreamingWrite {
3838
private WriteContext writeContext;
3939

4040
// Used solely for testing. Will never be populated in a real world scenario.
41-
public static Consumer<Integer> successCountConsumer;
42-
public static Consumer<Integer> failureCountConsumer;
41+
private static Consumer<Integer> successCountConsumer;
42+
private static Consumer<Integer> failureCountConsumer;
43+
44+
public static void setSuccessCountConsumer(Consumer<Integer> consumer) {
45+
successCountConsumer = consumer;
46+
}
47+
48+
public static void setFailureCountConsumer(Consumer<Integer> consumer) {
49+
failureCountConsumer = consumer;
50+
}
4351

4452
MarkLogicWrite(WriteContext writeContext) {
4553
this.writeContext = writeContext;

src/main/java/com/marklogic/spark/writer/WriteContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.marklogic.spark.ContextSupport;
2424
import com.marklogic.spark.Options;
2525
import com.marklogic.spark.Util;
26+
import com.marklogic.spark.reader.document.DocumentRowSchema;
2627
import org.apache.spark.sql.types.StructType;
2728

2829
import java.util.Arrays;
@@ -104,7 +105,7 @@ DocBuilder newDocBuilder() {
104105
String uriSuffix = null;
105106
if (hasOption(Options.WRITE_URI_SUFFIX)) {
106107
uriSuffix = getProperties().get(Options.WRITE_URI_SUFFIX);
107-
} else if (!isUsingFileSchema()) {
108+
} else if (!isUsingFileSchema() && !DocumentRowSchema.SCHEMA.equals(this.schema)) {
108109
uriSuffix = ".json";
109110
}
110111
factory.withUriMaker(new StandardUriMaker(

src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ void invalidPermissionsConfig() {
180180
void dontAbortOnFailure() {
181181
AtomicInteger successCount = new AtomicInteger();
182182
AtomicInteger failureCount = new AtomicInteger();
183-
MarkLogicWrite.successCountConsumer = count -> successCount.set(count);
184-
MarkLogicWrite.failureCountConsumer = count -> failureCount.set(count);
183+
MarkLogicWrite.setSuccessCountConsumer(count -> successCount.set(count));
184+
MarkLogicWrite.setFailureCountConsumer(count -> failureCount.set(count));
185185

186186
newWriterWithDefaultConfig("temporal-data-with-invalid-rows.csv", 1)
187187
.option(Options.WRITE_TEMPORAL_COLLECTION, TEMPORAL_COLLECTION)

src/test/java/com/marklogic/spark/writer/customcode/ProcessWithCustomCodeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ void abortOnFailure() {
127127
void dontAbortOnFailure() {
128128
AtomicInteger successCount = new AtomicInteger();
129129
AtomicInteger failureCount = new AtomicInteger();
130-
MarkLogicWrite.successCountConsumer = count -> successCount.set(count);
131-
MarkLogicWrite.failureCountConsumer = count -> failureCount.set(count);
130+
MarkLogicWrite.setSuccessCountConsumer(count -> successCount.set(count));
131+
MarkLogicWrite.setFailureCountConsumer(count -> failureCount.set(count));
132132

133133
// The lack of an error here indicates that the job did not abort. The connector is expected to have logged
134134
// each error instead.

src/test/java/com/marklogic/spark/writer/document/WriteDocumentRowsToMarkLogicTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,25 @@ void retainMetadataFromCopiedDocuments() {
5555
});
5656
}
5757

58+
/**
59+
* This ensures that the default suffix of ".json" isn't applied when the incoming row is a "document row" and
60+
* thus has an initial URI.
61+
*/
62+
@Test
63+
void uriPrefix() {
64+
readTheTwoTestDocuments()
65+
.write().format(CONNECTOR_IDENTIFIER)
66+
.option(Options.CLIENT_URI, makeClientUri())
67+
.option(Options.WRITE_URI_PREFIX, "/backup")
68+
.option(Options.WRITE_COLLECTIONS, "backup-docs")
69+
.mode(SaveMode.Append)
70+
.save();
71+
72+
assertCollectionSize("backup-docs", 2);
73+
assertInCollections("/backup/test/1.xml", "backup-docs");
74+
assertInCollections("/backup/test/2.xml", "backup-docs");
75+
}
76+
5877
@Test
5978
void overrideMetadataFromCopiedDocuments() {
6079
readTheTwoTestDocuments()

0 commit comments

Comments
 (0)