Skip to content

Commit e33ac58

Browse files
authored
Merge pull request #277 from marklogic/feature/15791-agg-xml-perf
MLE-15791 Performance improvement for extracting URI element value
2 parents 8e0ac1a + 82f3914 commit e33ac58

File tree

6 files changed

+151
-84
lines changed

6 files changed

+151
-84
lines changed

src/main/java/com/marklogic/spark/reader/file/xml/AggregateXmlSplitter.java

Lines changed: 21 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import javax.xml.stream.XMLInputFactory;
1414
import javax.xml.stream.XMLStreamException;
1515
import javax.xml.stream.XMLStreamReader;
16-
import java.io.ByteArrayInputStream;
1716
import java.io.IOException;
1817
import java.io.InputStream;
1918
import java.util.Iterator;
@@ -61,9 +60,13 @@ class AggregateXmlSplitter {
6160
final String element = fileContext.getStringOption(Options.READ_AGGREGATES_XML_ELEMENT);
6261
final String encoding = fileContext.getStringOption(Options.READ_FILES_ENCODING);
6362

63+
final XMLSplitter<StringHandle> splitter = this.uriElement != null ?
64+
new XMLSplitter<>(new UriElementExtractingVisitor(namespace, element, uriNamespace, uriElement)) :
65+
XMLSplitter.makeSplitter(namespace, element);
66+
6467
try {
6568
XMLStreamReader reader = xmlInputFactory.createXMLStreamReader(inputStream, encoding);
66-
this.contentStream = XMLSplitter.makeSplitter(namespace, element).split(reader).iterator();
69+
this.contentStream = splitter.split(reader).iterator();
6770
} catch (IOException | XMLStreamException e) {
6871
throw new ConnectorException(
6972
String.format("Unable to read XML at %s; cause: %s", this.identifierForErrors, e.getMessage()), e
@@ -81,88 +84,39 @@ boolean hasNext() {
8184
}
8285

8386
/**
84-
* @param pathPrefix used to construct a path if no uriElement was specified
87+
* @param uriPrefix used to construct a URI if no uriElement was specified
8588
* @return a row corresponding to the {@code DocumentRowSchema}
8689
*/
87-
InternalRow nextRow(String pathPrefix) {
88-
String xml;
90+
InternalRow nextRow(String uriPrefix) {
91+
StringHandle stringHandle;
8992
try {
90-
xml = this.contentStream.next().get();
93+
stringHandle = this.contentStream.next();
9194
} catch (RuntimeException ex) {
9295
String message = String.format("Unable to read XML from %s; cause: %s",
9396
this.identifierForErrors, ex.getMessage());
9497
throw new ConnectorException(message, ex);
9598
}
9699

97-
final String path = this.uriElement != null && !this.uriElement.trim().isEmpty() ?
98-
extractUriElementValue(xml) :
99-
pathPrefix + "-" + rowCounter + ".xml";
100-
100+
final String initialUri = determineInitialUri(stringHandle, uriPrefix);
101101
rowCounter++;
102-
103-
byte[] content = xml.getBytes();
104102
return new GenericInternalRow(new Object[]{
105-
UTF8String.fromString(path),
106-
ByteArray.concat(content),
103+
UTF8String.fromString(initialUri),
104+
ByteArray.concat(stringHandle.get().getBytes()),
107105
UTF8String.fromString("xml"),
108106
null, null, null, null, null
109107
});
110108
}
111109

112-
/**
113-
* MLCP has undocumented support for attribute references via "@(attribute-name)". We are not supporting this yet
114-
* as we are using XMLSplitter to find the user-defined element, and XMLSplitter does not support finding
115-
* attributes. Additionally, this feature is still fairly limited in comparison to the "URI template" that the
116-
* connector supports. Ultimately, we'd want to support N path expressions against both Spark columns and against
117-
* a JSON or XML tree in a single Spark column.
118-
*
119-
* @param xml
120-
* @return
121-
*/
122-
private String extractUriElementValue(String xml) {
123-
Iterator<StringHandle> iterator;
124-
XMLSplitter<StringHandle> splitter = XMLSplitter.makeSplitter(this.uriNamespace, this.uriElement);
125-
splitter.setVisitor(new UriElementVisitor(this.uriNamespace, this.uriElement));
126-
try {
127-
iterator = splitter.split(new ByteArrayInputStream(xml.getBytes())).iterator();
128-
} catch (Exception e) {
129-
// We don't expect this to ever occur, as if the XML couldn't be parsed, an error would have been thrown
130-
// when the child element was originally extracted. But still have to catch an exception.
131-
String message = String.format("Unable to parse XML in aggregate element %d in %s; cause: %s",
132-
rowCounter, this.identifierForErrors, e.getMessage());
133-
throw new ConnectorException(message, e);
134-
}
135-
136-
if (!iterator.hasNext()) {
137-
String message = String.format("No occurrence of URI element '%s' found in aggregate element %d in %s",
138-
this.uriElement, rowCounter, this.identifierForErrors);
139-
throw new ConnectorException(message);
140-
}
141-
return iterator.next().get();
142-
}
143-
144-
/**
145-
* Extends the Java Client visitor class so that it can return a handle containing the text of the
146-
* user-defined URI element.
147-
*/
148-
private class UriElementVisitor extends XMLSplitter.BasicElementVisitor {
149-
public UriElementVisitor(String nsUri, String localName) {
150-
super(nsUri, localName);
151-
}
152-
153-
@Override
154-
public StringHandle makeBufferedHandle(XMLStreamReader xmlStreamReader) {
155-
String text;
156-
try {
157-
text = xmlStreamReader.getElementText();
158-
} catch (XMLStreamException e) {
159-
String message = String.format(
160-
"Unable to get text from URI element '%s' found in aggregate element %d in %s; cause: %s",
161-
uriElement, rowCounter, identifierForErrors, e.getMessage()
162-
);
163-
throw new ConnectorException(message, e);
110+
private String determineInitialUri(StringHandle stringHandle, String uriPrefix) {
111+
if (stringHandle instanceof StringHandleWithUriValue) {
112+
String uriValue = ((StringHandleWithUriValue) stringHandle).getUriValue();
113+
if (uriValue == null) {
114+
String message = String.format("No occurrence of URI element '%s' found in aggregate element %d in %s",
115+
this.uriElement, rowCounter, this.identifierForErrors);
116+
throw new ConnectorException(message);
164117
}
165-
return new StringHandle(text);
118+
return uriValue;
166119
}
120+
return String.format("%s-%d.xml", uriPrefix, rowCounter);
167121
}
168122
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file.xml;
5+
6+
import com.marklogic.client.io.StringHandle;
7+
8+
/**
9+
* Captures a URI value based on the user-defined URI element.
10+
*/
11+
class StringHandleWithUriValue extends StringHandle {
12+
13+
private final String uriValue;
14+
15+
StringHandleWithUriValue(String content, String uriValue) {
16+
super(content);
17+
this.uriValue = uriValue;
18+
}
19+
20+
String getUriValue() {
21+
return uriValue;
22+
}
23+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file.xml;
5+
6+
import javax.xml.stream.XMLStreamConstants;
7+
import javax.xml.stream.XMLStreamException;
8+
import javax.xml.stream.XMLStreamReader;
9+
import javax.xml.stream.util.StreamReaderDelegate;
10+
11+
/**
12+
* Knows how to extract a URI element value while the XML for an aggregate element is being read and serialized.
13+
*/
14+
class UriElementExtractingReader extends StreamReaderDelegate {
15+
16+
private XMLStreamReader source;
17+
private final String uriNamespace;
18+
private final String uriElement;
19+
20+
// Used to track when the URI element is detected.
21+
private boolean isReadingUriElement;
22+
private String uriValue;
23+
24+
UriElementExtractingReader(XMLStreamReader source, String uriNamespace, String uriElement) {
25+
super(source);
26+
this.source = source;
27+
this.uriNamespace = uriNamespace;
28+
this.uriElement = uriElement;
29+
}
30+
31+
@Override
32+
public int next() throws XMLStreamException {
33+
int value = source.next();
34+
if (value == XMLStreamConstants.START_ELEMENT) {
35+
// Only use the first instance of the URI element that is found.
36+
if (matchesUriElement() && this.uriValue == null) {
37+
this.isReadingUriElement = true;
38+
this.uriValue = "";
39+
}
40+
} else if (value == XMLStreamConstants.CHARACTERS) {
41+
if (this.isReadingUriElement) {
42+
this.uriValue += source.getText();
43+
}
44+
} else if (value == XMLStreamConstants.END_ELEMENT && this.isReadingUriElement && matchesUriElement()) {
45+
this.isReadingUriElement = false;
46+
}
47+
return value;
48+
}
49+
50+
private boolean matchesUriElement() {
51+
return source.getLocalName().equals(uriElement) &&
52+
(this.uriNamespace == null || this.uriNamespace.equals(source.getNamespaceURI()));
53+
}
54+
55+
String getUriValue() {
56+
return uriValue;
57+
}
58+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright © 2024 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file.xml;
5+
6+
import com.marklogic.client.datamovement.XMLSplitter;
7+
import com.marklogic.client.io.Format;
8+
import com.marklogic.client.io.StringHandle;
9+
10+
import javax.xml.stream.XMLStreamReader;
11+
12+
/**
13+
* Supports extracting a URI element value for each aggregate element.
14+
*/
15+
class UriElementExtractingVisitor extends XMLSplitter.BasicElementVisitor {
16+
17+
private final String uriNamespace;
18+
private final String uriElement;
19+
20+
UriElementExtractingVisitor(String nsUri, String localName, String uriNamespace, String uriElement) {
21+
super(nsUri, localName);
22+
this.uriNamespace = uriNamespace;
23+
this.uriElement = uriElement;
24+
}
25+
26+
@Override
27+
public StringHandle makeBufferedHandle(XMLStreamReader xmlStreamReader) {
28+
UriElementExtractingReader reader = new UriElementExtractingReader(xmlStreamReader, uriNamespace, uriElement);
29+
String content = serialize(reader);
30+
String uriValue = reader.getUriValue();
31+
return new StringHandleWithUriValue(content, uriValue).withFormat(Format.XML);
32+
}
33+
}

src/test/java/com/marklogic/spark/reader/file/ReadAggregateXmlFilesTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,20 @@ void uriElementHasNamespace() {
100100

101101
@Test
102102
void uriElementHasMixedContent() {
103-
Dataset<Row> dataset = newSparkSession().read()
103+
List<Row> rows = newSparkSession().read()
104104
.format(CONNECTOR_IDENTIFIER)
105105
.option(Options.READ_AGGREGATES_XML_ELEMENT, "Employee")
106106
.option(Options.READ_AGGREGATES_XML_URI_ELEMENT, "mixed")
107-
.load("src/test/resources/aggregates/employees.xml");
107+
.load("src/test/resources/aggregates/employees.xml")
108+
.collectAsList();
108109

109-
ConnectorException ex = assertThrowsConnectorException(() -> dataset.count());
110-
String message = ex.getMessage();
111-
assertTrue(message.startsWith("Unable to get text from URI element 'mixed' found in aggregate element 1 in file"),
112-
"The error should identify the URI element that text could not be retrieved from along with which aggregate " +
113-
"element produced the failure; actual message: " + message);
110+
rows.forEach(row -> {
111+
String uri = row.getString(0);
112+
assertEquals("has mixed content", uri, "We don't have a good reason to throw an exception when the user " +
113+
"specifies a URI element with mixed content. While MLCP carefully reconstructs the XML, and thus may " +
114+
"not want to deal with the complexity of mixed content, our connector plucks the URI element value " +
115+
"while the element is transformed into a string via a standard Java Transformer.");
116+
});
114117
}
115118

116119
@Test

src/test/java/com/marklogic/spark/reader/file/ReadAggregateXmlZipFilesTest.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,17 @@ void twoZipsOnePartition() {
5353

5454
@Test
5555
void uriElementHasMixedContent() {
56-
Dataset<Row> dataset = newSparkSession().read()
56+
List<Row> rows = newSparkSession().read()
5757
.format(CONNECTOR_IDENTIFIER)
5858
.option(Options.READ_AGGREGATES_XML_ELEMENT, "Employee")
5959
.option(Options.READ_AGGREGATES_XML_URI_ELEMENT, "mixed")
6060
.option(Options.READ_FILES_COMPRESSION, "zip")
61-
.load("src/test/resources/aggregate-zips/employee-aggregates.zip");
61+
.load("src/test/resources/aggregate-zips/employee-aggregates.zip")
62+
.collectAsList();
6263

63-
ConnectorException ex = assertThrowsConnectorException(() -> dataset.count());
64-
String message = ex.getMessage();
65-
assertTrue(
66-
message.startsWith(
67-
"Unable to get text from URI element 'mixed' found in aggregate element 1 in entry employees.xml in file:///"
68-
),
69-
"The error should identify the URI element that text could not be retrieved from along with which aggregate " +
70-
"element and which zip entry produced the failure; actual message: " + message
64+
rows.forEach(row ->
65+
assertEquals("has mixed content", row.getString(0),
66+
"Mixed content is supported by the connector in a URI element.")
7167
);
7268
}
7369

0 commit comments

Comments
 (0)