12
12
import com .marklogic .spark .Util ;
13
13
import org .apache .spark .sql .catalyst .InternalRow ;
14
14
import org .apache .spark .sql .catalyst .json .JacksonGenerator ;
15
+ import org .apache .spark .sql .types .StructField ;
15
16
import org .apache .spark .sql .types .StructType ;
16
17
import org .json .JSONObject ;
17
18
import org .json .XML ;
20
21
import java .util .ArrayList ;
21
22
import java .util .List ;
22
23
import java .util .Optional ;
24
+ import java .util .UUID ;
23
25
24
26
/**
25
27
* Handles building a document from an "arbitrary" row - i.e. one with an unknown schema, where the row will be
26
28
* serialized by Spark to a JSON object.
27
29
*/
28
30
class ArbitraryRowConverter implements RowConverter {
29
31
32
+ private static final String MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME = "marklogic_spark_file_path" ;
33
+
30
34
private final ObjectMapper objectMapper ;
31
35
32
36
private final StructType schema ;
@@ -35,8 +39,12 @@ class ArbitraryRowConverter implements RowConverter {
35
39
private final String xmlRootName ;
36
40
private final String xmlNamespace ;
37
41
42
+ private final int filePathIndex ;
43
+
38
44
ArbitraryRowConverter (WriteContext writeContext ) {
39
45
this .schema = writeContext .getSchema ();
46
+ this .filePathIndex = determineFilePathIndex ();
47
+
40
48
this .uriTemplate = writeContext .getStringOption (Options .WRITE_URI_TEMPLATE );
41
49
this .jsonRootName = writeContext .getStringOption (Options .WRITE_JSON_ROOT_NAME );
42
50
this .xmlRootName = writeContext .getStringOption (Options .WRITE_XML_ROOT_NAME );
@@ -46,6 +54,12 @@ class ArbitraryRowConverter implements RowConverter {
46
54
47
55
@ Override
48
56
public Optional <DocBuilder .DocumentInputs > convertRow (InternalRow row ) {
57
+ String initialUri = null ;
58
+ if (this .filePathIndex > -1 ) {
59
+ initialUri = row .getString (this .filePathIndex ) + "/" + UUID .randomUUID ();
60
+ row .setNullAt (this .filePathIndex );
61
+ }
62
+
49
63
final String json = convertRowToJSONString (row );
50
64
AbstractWriteHandle contentHandle = this .xmlRootName != null ?
51
65
new StringHandle (convertJsonToXml (json )).withFormat (Format .XML ) :
@@ -66,14 +80,34 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
66
80
}
67
81
}
68
82
}
69
- return Optional .of (new DocBuilder .DocumentInputs (null , contentHandle , uriTemplateValues , null ));
83
+ return Optional .of (new DocBuilder .DocumentInputs (initialUri , contentHandle , uriTemplateValues , null ));
70
84
}
71
85
72
86
@ Override
73
87
public List <DocBuilder .DocumentInputs > getRemainingDocumentInputs () {
74
88
return new ArrayList <>();
75
89
}
76
90
91
+ /**
92
+ * A Spark user can add a column via:
93
+ * withColumn("marklogic_spark_file_path", new Column("_metadata.file_path"))
94
+ * <p>
95
+ * This allows access to the file path when using a Spark data source - e.g. CSV, Parquet - to read a file.
96
+ * The column will be used to generate an initial URI for the corresponding document, and the column will then
97
+ * be removed after that so that it's not included in the document.
98
+ *
99
+ * @return
100
+ */
101
+ private int determineFilePathIndex () {
102
+ StructField [] fields = schema .fields ();
103
+ for (int i = 0 ; i < fields .length ; i ++) {
104
+ if (MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME .equals (fields [i ].name ())) {
105
+ return i ;
106
+ }
107
+ }
108
+ return -1 ;
109
+ }
110
+
77
111
private ObjectNode readTree (String json ) {
78
112
// We don't ever expect this to fail, as the JSON is produced by Spark's JacksonGenerator and should always
79
113
// be valid JSON. But Jackson throws a checked exception, so gotta handle it.
0 commit comments