Skip to content

Commit bf13ef7

Browse files
authored
Merge pull request #261 from marklogic/feature/json-ignore-null
Can now include null fields
2 parents da6f02b + 7ec7c10 commit bf13ef7

File tree

10 files changed

+272
-83
lines changed

10 files changed

+272
-83
lines changed

build.gradle

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ dependencies {
5050
exclude group: "com.fasterxml.jackson.dataformat"
5151
}
5252

53-
// For XML support; supports converting a string of JSON into a string of XML.
54-
// See ArbitraryRowConverter for more information.
55-
shadowDependencies "org.json:json:20240303"
53+
// Required for converting JSON to XML. Using 2.14.2 to align with Spark 3.4.1.
54+
shadowDependencies "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2"
5655

5756
// Need this so that an OkHttpClientConfigurator can be created.
5857
shadowDependencies 'com.squareup.okhttp3:okhttp:4.12.0'
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark;
5+
6+
import org.apache.spark.sql.catalyst.InternalRow;
7+
import org.apache.spark.sql.catalyst.json.JSONOptions;
8+
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
9+
import org.apache.spark.sql.types.StructType;
10+
import scala.Predef;
11+
import scala.collection.JavaConverters;
12+
13+
import java.io.StringWriter;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
/**
18+
* Handles serializing a Spark row into a JSON string. Includes support for all the options defined in Spark's
19+
* JSONOptions.scala class.
20+
*/
21+
public class JsonRowSerializer {
22+
23+
private final StructType schema;
24+
private final JSONOptions jsonOptions;
25+
private final boolean includeNullFields;
26+
27+
public JsonRowSerializer(StructType schema, Map<String, String> connectorProperties) {
28+
this.schema = schema;
29+
30+
final Map<String, String> options = buildOptionsForJsonOptions(connectorProperties);
31+
this.includeNullFields = "false".equalsIgnoreCase(options.get("ignoreNullFields"));
32+
33+
this.jsonOptions = new JSONOptions(
34+
// Funky code to convert a Java map into a Scala immutable Map.
35+
JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(Predef.$conforms()),
36+
37+
// As verified via tests, this default timezone ID is overridden by a user via
38+
// the spark.sql.session.timeZone option.
39+
"Z",
40+
41+
// We don't expect corrupted records - i.e. corrupted values - to be present in the index. But Spark
42+
// requires this to be set. See
43+
// https://medium.com/@sasidharan-r/how-to-handle-corrupt-or-bad-record-in-apache-spark-custom-logic-pyspark-aws-430ddec9bb41
44+
// for more information.
45+
"_corrupt_record"
46+
);
47+
}
48+
49+
public String serializeRowToJson(InternalRow row) {
50+
StringWriter writer = new StringWriter();
51+
JacksonGenerator jacksonGenerator = new JacksonGenerator(this.schema, writer, this.jsonOptions);
52+
jacksonGenerator.write(row);
53+
jacksonGenerator.flush();
54+
return writer.toString();
55+
}
56+
57+
/**
58+
* A user can specify any of the options found in the JSONOptions.scala class - though it's not yet clear where
59+
* a user finds out about these except via the Spark source code. "ignoreNullFields" however is expected to be the
60+
* primary one that is configured.
61+
*/
62+
private Map<String, String> buildOptionsForJsonOptions(Map<String, String> connectorProperties) {
63+
Map<String, String> options = new HashMap<>();
64+
connectorProperties.forEach((key, value) -> {
65+
if (key.startsWith(Options.WRITE_JSON_SERIALIZATION_OPTION_PREFIX)) {
66+
String optionName = key.substring(Options.WRITE_JSON_SERIALIZATION_OPTION_PREFIX.length());
67+
options.put(optionName, value);
68+
}
69+
});
70+
return options;
71+
}
72+
73+
public JSONOptions getJsonOptions() {
74+
return jsonOptions;
75+
}
76+
77+
public boolean isIncludeNullFields() {
78+
return this.includeNullFields;
79+
}
80+
}

src/main/java/com/marklogic/spark/Options.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ public abstract class Options {
119119
public static final String WRITE_XML_ROOT_NAME = "spark.marklogic.write.xmlRootName";
120120
public static final String WRITE_XML_NAMESPACE = "spark.marklogic.write.xmlNamespace";
121121

122+
// For serializing a row into JSON. Intent is to allow for other constants defined in the Spark
123+
// JSONOptions.scala class to be used after "spark.marklogic.write.json."
124+
// Example - "spark.marklogic.write.json.ignoreNullFields=false.
125+
public static final String WRITE_JSON_SERIALIZATION_OPTION_PREFIX = "spark.marklogic.write.json.";
126+
127+
122128
// For writing RDF
123129
public static final String WRITE_GRAPH = "spark.marklogic.write.graph";
124130
public static final String WRITE_GRAPH_OVERRIDE = "spark.marklogic.write.graphOverride";

src/main/java/com/marklogic/spark/Util.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package com.marklogic.spark;
1717

18-
import org.apache.spark.sql.catalyst.json.JSONOptions;
1918
import org.slf4j.Logger;
2019
import org.slf4j.LoggerFactory;
21-
import scala.collection.immutable.HashMap;
2220

2321
import java.util.*;
2422
import java.util.stream.Stream;
@@ -31,19 +29,6 @@ public interface Util {
3129
*/
3230
Logger MAIN_LOGGER = LoggerFactory.getLogger("com.marklogic.spark");
3331

34-
JSONOptions DEFAULT_JSON_OPTIONS = new JSONOptions(
35-
new HashMap<>(),
36-
37-
// As verified via tests, this default timezone ID is overridden by a user via the spark.sql.session.timeZone option.
38-
"Z",
39-
40-
// We don't expect corrupted records - i.e. corrupted values - to be present in the index. But Spark
41-
// requires this to be set. See
42-
// https://medium.com/@sasidharan-r/how-to-handle-corrupt-or-bad-record-in-apache-spark-custom-logic-pyspark-aws-430ddec9bb41
43-
// for more information.
44-
"_corrupt_record"
45-
);
46-
4732
static boolean hasOption(Map<String, String> properties, String... options) {
4833
return Stream.of(options)
4934
.anyMatch(option -> properties.get(option) != null && properties.get(option).trim().length() > 0);

src/main/java/com/marklogic/spark/reader/JsonRowDeserializer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import com.fasterxml.jackson.core.JsonFactory;
44
import com.fasterxml.jackson.core.JsonParser;
5-
import com.marklogic.spark.Util;
5+
import com.marklogic.spark.JsonRowSerializer;
66
import org.apache.spark.sql.catalyst.InternalRow;
77
import org.apache.spark.sql.catalyst.json.CreateJacksonParser;
8+
import org.apache.spark.sql.catalyst.json.JSONOptions;
89
import org.apache.spark.sql.catalyst.json.JacksonParser;
910
import org.apache.spark.sql.sources.Filter;
1011
import org.apache.spark.sql.types.StructType;
@@ -15,6 +16,7 @@
1516
import scala.collection.Seq;
1617

1718
import java.util.ArrayList;
19+
import java.util.HashMap;
1820

1921
/**
2022
* Handles deserializing a JSON object into a Spark InternalRow. This is accomplished via Spark's JacksonParser.
@@ -45,6 +47,7 @@ public InternalRow deserializeJson(String json) {
4547
private JacksonParser newJacksonParser(StructType schema) {
4648
final boolean allowArraysAsStructs = true;
4749
final Seq<Filter> filters = JavaConverters.asScalaIterator(new ArrayList<Filter>().iterator()).toSeq();
48-
return new JacksonParser(schema, Util.DEFAULT_JSON_OPTIONS, allowArraysAsStructs, filters);
50+
JSONOptions jsonOptions = new JsonRowSerializer(schema, new HashMap<>()).getJsonOptions();
51+
return new JacksonParser(schema, jsonOptions, allowArraysAsStructs, filters);
4952
}
5053
}
Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
package com.marklogic.spark.writer;
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
45
import com.fasterxml.jackson.databind.ObjectMapper;
56
import com.fasterxml.jackson.databind.node.ObjectNode;
7+
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
68
import com.marklogic.client.io.Format;
79
import com.marklogic.client.io.JacksonHandle;
810
import com.marklogic.client.io.StringHandle;
911
import com.marklogic.client.io.marker.AbstractWriteHandle;
1012
import com.marklogic.spark.ConnectorException;
13+
import com.marklogic.spark.JsonRowSerializer;
1114
import com.marklogic.spark.Options;
12-
import com.marklogic.spark.Util;
1315
import org.apache.spark.sql.catalyst.InternalRow;
14-
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
1516
import org.apache.spark.sql.types.StructField;
1617
import org.apache.spark.sql.types.StructType;
17-
import org.json.JSONObject;
18-
import org.json.XML;
1918

20-
import java.io.StringWriter;
2119
import java.util.ArrayList;
2220
import java.util.List;
2321
import java.util.Optional;
@@ -32,24 +30,23 @@ class ArbitraryRowConverter implements RowConverter {
3230
private static final String MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME = "marklogic_spark_file_path";
3331

3432
private final ObjectMapper objectMapper;
35-
36-
private final StructType schema;
33+
private final XmlMapper xmlMapper;
34+
private final JsonRowSerializer jsonRowSerializer;
3735
private final String uriTemplate;
3836
private final String jsonRootName;
3937
private final String xmlRootName;
4038
private final String xmlNamespace;
41-
4239
private final int filePathIndex;
4340

4441
ArbitraryRowConverter(WriteContext writeContext) {
45-
this.schema = writeContext.getSchema();
46-
this.filePathIndex = determineFilePathIndex();
47-
42+
this.filePathIndex = determineFilePathIndex(writeContext.getSchema());
4843
this.uriTemplate = writeContext.getStringOption(Options.WRITE_URI_TEMPLATE);
4944
this.jsonRootName = writeContext.getStringOption(Options.WRITE_JSON_ROOT_NAME);
5045
this.xmlRootName = writeContext.getStringOption(Options.WRITE_XML_ROOT_NAME);
5146
this.xmlNamespace = writeContext.getStringOption(Options.WRITE_XML_NAMESPACE);
5247
this.objectMapper = new ObjectMapper();
48+
this.xmlMapper = this.xmlRootName != null ? new XmlMapper() : null;
49+
this.jsonRowSerializer = new JsonRowSerializer(writeContext.getSchema(), writeContext.getProperties());
5350
}
5451

5552
@Override
@@ -60,26 +57,49 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
6057
row.setNullAt(this.filePathIndex);
6158
}
6259

63-
final String json = convertRowToJSONString(row);
64-
AbstractWriteHandle contentHandle = this.xmlRootName != null ?
65-
new StringHandle(convertJsonToXml(json)).withFormat(Format.XML) :
66-
new StringHandle(json).withFormat(Format.JSON);
60+
final String json = this.jsonRowSerializer.serializeRowToJson(row);
6761

62+
AbstractWriteHandle contentHandle = null;
63+
ObjectNode deserializedJson = null;
6864
ObjectNode uriTemplateValues = null;
69-
if (this.uriTemplate != null || this.jsonRootName != null) {
70-
ObjectNode jsonObject = readTree(json);
65+
final boolean mustRemoveFilePathField = this.filePathIndex > 1 && jsonRowSerializer.isIncludeNullFields();
66+
67+
if (this.jsonRootName != null || this.xmlRootName != null || this.uriTemplate != null || mustRemoveFilePathField) {
68+
deserializedJson = readTree(json);
69+
if (mustRemoveFilePathField) {
70+
deserializedJson.remove(MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME);
71+
}
72+
}
73+
74+
if (this.uriTemplate != null) {
75+
uriTemplateValues = deserializedJson;
76+
}
77+
78+
if (this.jsonRootName != null) {
79+
ObjectNode jsonObjectWithRootName = objectMapper.createObjectNode();
80+
jsonObjectWithRootName.set(jsonRootName, deserializedJson);
81+
contentHandle = new JacksonHandle(jsonObjectWithRootName);
7182
if (this.uriTemplate != null) {
72-
uriTemplateValues = jsonObject;
83+
uriTemplateValues = jsonObjectWithRootName;
7384
}
74-
if (this.jsonRootName != null) {
75-
ObjectNode root = objectMapper.createObjectNode();
76-
root.set(jsonRootName, jsonObject);
77-
contentHandle = new JacksonHandle(root);
78-
if (this.uriTemplate != null) {
79-
uriTemplateValues = root;
80-
}
85+
}
86+
87+
if (contentHandle == null) {
88+
// If the user wants XML, then we've definitely deserialized the JSON and removed the file path if
89+
// needed. So use that JsonNode to produce an XML string.
90+
if (xmlRootName != null) {
91+
contentHandle = new StringHandle(convertJsonToXml(deserializedJson)).withFormat(Format.XML);
92+
}
93+
// If we've already gone to the effort of creating deserializedJson, use it for the content.
94+
else if (deserializedJson != null) {
95+
contentHandle = new JacksonHandle(deserializedJson);
96+
} else {
97+
// Simplest scenario where we never have a reason to incur the expense of deserializing the JSON string,
98+
// so we can just use StringHandle.
99+
contentHandle = new StringHandle(json).withFormat(Format.JSON);
81100
}
82101
}
102+
83103
return Optional.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null));
84104
}
85105

@@ -98,7 +118,7 @@ public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
98118
*
99119
* @return
100120
*/
101-
private int determineFilePathIndex() {
121+
private int determineFilePathIndex(StructType schema) {
102122
StructField[] fields = schema.fields();
103123
for (int i = 0; i < fields.length; i++) {
104124
if (MARKLOGIC_SPARK_FILE_PATH_COLUMN_NAME.equals(fields[i].name())) {
@@ -118,33 +138,29 @@ private ObjectNode readTree(String json) {
118138
}
119139
}
120140

121-
private String convertRowToJSONString(InternalRow row) {
122-
StringWriter writer = new StringWriter();
123-
JacksonGenerator jacksonGenerator = new JacksonGenerator(this.schema, writer, Util.DEFAULT_JSON_OPTIONS);
124-
jacksonGenerator.write(row);
125-
jacksonGenerator.flush();
126-
return writer.toString();
127-
}
128-
129141
/**
130142
* jackson-xml-mapper unfortunately does not yet support a root namespace. Nor does it allow for the root element
131143
* to be omitted. So we always end up with "ObjectNode" as a root element. See
132-
* https://github.com/FasterXML/jackson-dataformat-xml/issues/541 for more information.
133-
* <p>
134-
* While JSON-Java does not support a root namespace, it does allow for the root element to be omitted. That is
135-
* sufficient for us, as we can then generate our own root element - albeit via string concatentation - that
136-
* includes a user-defined namespace.
144+
* https://github.com/FasterXML/jackson-dataformat-xml/issues/541 for more information. So this method does some
145+
* work to replace that root element with one based on user inputs.
137146
*
138-
* @param json
147+
* @param doc
139148
* @return
140149
*/
141-
private String convertJsonToXml(String json) {
142-
JSONObject jsonObject = new JSONObject(json);
143-
if (this.xmlNamespace != null) {
144-
StringBuilder xml = new StringBuilder(String.format("<%s xmlns='%s'>", this.xmlRootName, this.xmlNamespace));
145-
xml.append(XML.toString(jsonObject, null));
146-
return xml.append(String.format("</%s>", this.xmlRootName)).toString();
150+
private String convertJsonToXml(JsonNode doc) {
151+
try {
152+
String xml = xmlMapper.writer().writeValueAsString(doc);
153+
String startTag = this.xmlNamespace != null ?
154+
String.format("<%s xmlns='%s'>", this.xmlRootName, this.xmlNamespace) :
155+
String.format("<%s>", this.xmlRootName);
156+
return new StringBuilder(startTag)
157+
.append(xml.substring("<ObjectNode>".length(), xml.length() - "</ObjectNode>".length()))
158+
.append(String.format("</%s>", this.xmlRootName))
159+
.toString();
160+
} catch (JsonProcessingException e) {
161+
// We don't expect this occur; Jackson should be able to convert any JSON object that it created into
162+
// a valid XML document.
163+
throw new ConnectorException(String.format("Unable to convert JSON to XML for doc: %s", doc), e);
147164
}
148-
return XML.toString(jsonObject, this.xmlRootName);
149165
}
150166
}

0 commit comments

Comments
 (0)