Skip to content

Commit 12973f3

Browse files
authored
Merge pull request #257 from marklogic/feature/write-file-path
Adding support for using the Spark file path for a URI
2 parents aa15c15 + f28a2b0 commit 12973f3

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

src/main/java/com/marklogic/spark/writer/ArbitraryRowConverter.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.marklogic.spark.Util;
1313
import org.apache.spark.sql.catalyst.InternalRow;
1414
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
15+
import org.apache.spark.sql.types.StructField;
1516
import org.apache.spark.sql.types.StructType;
1617
import org.json.JSONObject;
1718
import org.json.XML;
@@ -20,13 +21,16 @@
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.Optional;
24+
import java.util.UUID;
2325

2426
/**
2527
* Handles building a document from an "arbitrary" row - i.e. one with an unknown schema, where the row will be
2628
* serialized by Spark to a JSON object.
2729
*/
2830
class ArbitraryRowConverter implements RowConverter {
2931

32+
private static final String MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME = "marklogic_spark_file_path";
33+
3034
private final ObjectMapper objectMapper;
3135

3236
private final StructType schema;
@@ -35,8 +39,12 @@ class ArbitraryRowConverter implements RowConverter {
3539
private final String xmlRootName;
3640
private final String xmlNamespace;
3741

42+
private final int filePathIndex;
43+
3844
ArbitraryRowConverter(WriteContext writeContext) {
3945
this.schema = writeContext.getSchema();
46+
this.filePathIndex = determineFilePathIndex();
47+
4048
this.uriTemplate = writeContext.getStringOption(Options.WRITE_URI_TEMPLATE);
4149
this.jsonRootName = writeContext.getStringOption(Options.WRITE_JSON_ROOT_NAME);
4250
this.xmlRootName = writeContext.getStringOption(Options.WRITE_XML_ROOT_NAME);
@@ -46,6 +54,12 @@ class ArbitraryRowConverter implements RowConverter {
4654

4755
@Override
4856
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
57+
String initialUri = null;
58+
if (this.filePathIndex > -1) {
59+
initialUri = row.getString(this.filePathIndex) + "/" + UUID.randomUUID();
60+
row.setNullAt(this.filePathIndex);
61+
}
62+
4963
final String json = convertRowToJSONString(row);
5064
AbstractWriteHandle contentHandle = this.xmlRootName != null ?
5165
new StringHandle(convertJsonToXml(json)).withFormat(Format.XML) :
@@ -66,14 +80,34 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
6680
}
6781
}
6882
}
69-
return Optional.of(new DocBuilder.DocumentInputs(null, contentHandle, uriTemplateValues, null));
83+
return Optional.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null));
7084
}
7185

7286
@Override
7387
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
7488
return new ArrayList<>();
7589
}
7690

91+
/**
92+
* A Spark user can add a column via:
93+
* withColumn("marklogic_spark_file_path", new Column("_metadata.file_path"))
94+
* <p>
95+
* This allows access to the file path when using a Spark data source - e.g. CSV, Parquet - to read a file.
96+
* The column will be used to generate an initial URI for the corresponding document, and the column will then
97+
* be removed after that so that it's not included in the document.
98+
*
99+
* @return
100+
*/
101+
private int determineFilePathIndex() {
102+
StructField[] fields = schema.fields();
103+
for (int i = 0; i < fields.length; i++) {
104+
if (MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME.equals(fields[i].name())) {
105+
return i;
106+
}
107+
}
108+
return -1;
109+
}
110+
77111
private ObjectNode readTree(String json) {
78112
// We don't ever expect this to fail, as the JSON is produced by Spark's JacksonGenerator and should always
79113
// be valid JSON. But Jackson throws a checked exception, so gotta handle it.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.marklogic.spark.Options;
8+
import org.apache.spark.sql.Column;
9+
import org.apache.spark.sql.SaveMode;
10+
import org.junit.jupiter.api.Test;
11+
12+
import java.util.List;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
17+
class WriteRowsWithFilePathTest extends AbstractWriteTest {
18+
19+
/**
20+
* Intended to allow for Flux to optionally use the filename for an initial URI. Relevant any time we use Flux with
21+
* a Spark data source that produces arbitrary data rows.
22+
*/
23+
@Test
24+
void test() {
25+
newSparkSession().read()
26+
.option("header", true)
27+
.format("csv")
28+
.csv("src/test/resources/data.csv")
29+
.withColumn("marklogic_spark_file_path", new Column("_metadata.file_path"))
30+
.limit(10)
31+
.write().format(CONNECTOR_IDENTIFIER)
32+
.option(Options.CLIENT_URI, makeClientUri())
33+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
34+
.option(Options.WRITE_COLLECTIONS, "some-files")
35+
.option(Options.WRITE_URI_REPLACE, ".*/src/test,'/test'")
36+
.mode(SaveMode.Append)
37+
.save();
38+
39+
List<String> uris = getUrisInCollection("some-files", 10);
40+
uris.forEach(uri -> {
41+
assertTrue(uri.startsWith("/test/resources/data.csv/"), "When a column named 'marklogic_spark_file_path' is passed " +
42+
"to the connector for writing arbitrary rows, it will be used to construct an initial URI that " +
43+
"also has a UUID in it. This is useful for the somewhat rare use case of wanting the physical file " +
44+
"path to be a part of the URI (as opposed to using a URI template). Actual URI: " + uri);
45+
46+
JsonNode doc = readJsonDocument(uri);
47+
assertEquals(2, doc.size(), "The marklogic_spark_file_path column should not have been used when " +
48+
"constructing the JSON document.");
49+
assertTrue(doc.has("docNum"));
50+
assertTrue(doc.has("docName"));
51+
});
52+
}
53+
}

0 commit comments

Comments
 (0)