Skip to content

Commit 291bc5e

Browse files
authored
Merge pull request #348 from marklogic/feature/switch-to-dom
Switching to DOM for splitting and embedding
2 parents 97d9c83 + 0a86a11 commit 291bc5e

21 files changed

+370
-340
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ dependencies {
8181
// Supports testing the embedder feature.
8282
testImplementation "dev.langchain4j:langchain4j-embeddings-all-minilm-l6-v2:0.35.0"
8383

84-
// Needed for constructing XML documents and for splitting them.
84+
// Needed for some XML operations that are far easier with JDOM2 than with DOM.
8585
shadowDependencies "org.jdom:jdom2:2.0.6.1"
8686

8787
// Needed for splitting XML documents via XPath.

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@
66
import com.marklogic.client.DatabaseClient;
77
import com.marklogic.client.DatabaseClientFactory;
88
import com.marklogic.client.extra.okhttpclient.OkHttpClientConfigurator;
9-
import org.jdom2.Namespace;
109
import org.slf4j.Logger;
1110
import org.slf4j.LoggerFactory;
1211

1312
import java.io.Serializable;
1413
import java.util.HashMap;
15-
import java.util.List;
1614
import java.util.Map;
1715
import java.util.concurrent.TimeUnit;
18-
import java.util.stream.Collectors;
1916

2017
public class ContextSupport implements Serializable {
2118

@@ -164,16 +161,6 @@ public final boolean isStreamingFiles() {
164161
return "true".equalsIgnoreCase(getStringOption(Options.STREAM_FILES));
165162
}
166163

167-
public List<Namespace> getGlobalNamespaces() {
168-
return getProperties().keySet().stream()
169-
.filter(key -> key.startsWith(Options.XPATH_NAMESPACE_PREFIX))
170-
.map(key -> {
171-
String prefix = key.substring(Options.XPATH_NAMESPACE_PREFIX.length());
172-
return Namespace.getNamespace(prefix, getStringOption(key));
173-
})
174-
.collect(Collectors.toList());
175-
}
176-
177164
public Map<String, String> getProperties() {
178165
return properties;
179166
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.dom;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
import com.marklogic.client.impl.HandleAccessor;
8+
import com.marklogic.client.io.DOMHandle;
9+
import com.marklogic.client.io.marker.AbstractWriteHandle;
10+
import com.marklogic.spark.ConnectorException;
11+
import org.w3c.dom.Document;
12+
import org.xml.sax.InputSource;
13+
14+
import javax.xml.namespace.NamespaceContext;
15+
import javax.xml.parsers.DocumentBuilder;
16+
import javax.xml.parsers.DocumentBuilderFactory;
17+
import javax.xml.parsers.ParserConfigurationException;
18+
import javax.xml.xpath.XPath;
19+
import javax.xml.xpath.XPathExpression;
20+
import javax.xml.xpath.XPathExpressionException;
21+
import javax.xml.xpath.XPathFactory;
22+
import java.io.StringReader;
23+
24+
/**
25+
* Simplifies operations with the Java DOM API.
26+
*/
27+
public class DOMHelper {
28+
29+
private final DocumentBuilderFactory documentBuilderFactory;
30+
private final XPathFactory xPathFactory = XPathFactory.newInstance();
31+
private final NamespaceContext namespaceContext;
32+
private DocumentBuilder documentBuilder;
33+
34+
public DOMHelper(NamespaceContext namespaceContext) {
35+
// This can be reused for multiple calls, which will only be in the context of a single partition writer and
36+
// thus we don't need to worry about thread safety for it.
37+
this.documentBuilderFactory = DocumentBuilderFactory.newInstance();
38+
this.documentBuilderFactory.setNamespaceAware(true);
39+
this.namespaceContext = namespaceContext;
40+
}
41+
42+
public Document extractDocument(DocumentWriteOperation sourceDocument) {
43+
AbstractWriteHandle handle = sourceDocument.getContent();
44+
if (handle instanceof DOMHandle) {
45+
return ((DOMHandle) handle).get();
46+
}
47+
48+
String xml = HandleAccessor.contentAsString(handle);
49+
50+
try {
51+
return getDocumentBuilder().parse(new InputSource(new StringReader(xml)));
52+
} catch (Exception e) {
53+
throw new ConnectorException(String.format("Unable to parse XML for document with URI: %s; cause: %s",
54+
sourceDocument.getUri(), e.getMessage()), e);
55+
}
56+
}
57+
58+
public Document newDocument() {
59+
return getDocumentBuilder().newDocument();
60+
}
61+
62+
public XPathExpression compileXPath(String xpathExpression, String purposeForErrorMessage) {
63+
XPath xpath = this.xPathFactory.newXPath();
64+
if (namespaceContext != null) {
65+
xpath.setNamespaceContext(namespaceContext);
66+
}
67+
68+
try {
69+
return xpath.compile(xpathExpression);
70+
} catch (XPathExpressionException e) {
71+
String message = massageXPathCompilationError(e.getMessage());
72+
throw new ConnectorException(String.format(
73+
"Unable to compile XPath expression for %s: %s; cause: %s",
74+
purposeForErrorMessage, xpathExpression, message), e
75+
);
76+
}
77+
}
78+
79+
private String massageXPathCompilationError(String message) {
80+
String unnecessaryPart = "javax.xml.transform.TransformerException: ";
81+
if (message.startsWith(unnecessaryPart)) {
82+
return message.substring(unnecessaryPart.length());
83+
}
84+
return message;
85+
}
86+
87+
private DocumentBuilder getDocumentBuilder() {
88+
if (this.documentBuilder == null) {
89+
try {
90+
this.documentBuilder = this.documentBuilderFactory.newDocumentBuilder();
91+
} catch (ParserConfigurationException e) {
92+
throw new ConnectorException(String.format("Unable to create XML document; cause: %s", e.getMessage()), e);
93+
}
94+
}
95+
return this.documentBuilder;
96+
}
97+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.dom;
5+
6+
import com.marklogic.spark.Options;
7+
8+
import javax.xml.XMLConstants;
9+
import javax.xml.namespace.NamespaceContext;
10+
import java.util.HashMap;
11+
import java.util.Iterator;
12+
import java.util.Map;
13+
14+
public class XPathNamespaceContext implements NamespaceContext {
15+
16+
private final Map<String, String> prefixesToNamespaces;
17+
18+
public XPathNamespaceContext(Map<String, String> properties) {
19+
prefixesToNamespaces = new HashMap<>();
20+
properties.keySet().stream()
21+
.filter(key -> key.startsWith(Options.XPATH_NAMESPACE_PREFIX))
22+
.forEach(key -> {
23+
String prefix = key.substring(Options.XPATH_NAMESPACE_PREFIX.length());
24+
String namespace = properties.get(key);
25+
prefixesToNamespaces.put(prefix, namespace);
26+
});
27+
}
28+
29+
@Override
30+
public String getNamespaceURI(String prefix) {
31+
return prefixesToNamespaces.containsKey(prefix) ?
32+
prefixesToNamespaces.get(prefix) :
33+
XMLConstants.NULL_NS_URI;
34+
}
35+
36+
@Override
37+
public String getPrefix(String namespaceURI) {
38+
// Does not have to be implemented for resolving XPath expressions.
39+
return null;
40+
}
41+
42+
@Override
43+
public Iterator<String> getPrefixes(String namespaceURI) {
44+
// Does not have to be implemented for resolving XPath expressions.
45+
return null;
46+
}
47+
}

src/main/java/com/marklogic/spark/writer/embedding/DOMChunk.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.w3c.dom.Element;
1010
import org.w3c.dom.NodeList;
1111

12+
import javax.xml.xpath.XPath;
1213
import javax.xml.xpath.XPathConstants;
1314
import javax.xml.xpath.XPathExpressionException;
1415
import javax.xml.xpath.XPathFactory;
@@ -18,14 +19,14 @@ public class DOMChunk implements Chunk {
1819
private final String documentUri;
1920
private final Document document;
2021
private final Element chunkElement;
21-
private final String textExpression;
22+
private final XmlChunkConfig xmlChunkConfig;
2223
private final XPathFactory xpathFactory;
2324

24-
public DOMChunk(String documentUri, Document document, Element chunkElement, String textExpression, XPathFactory xpathFactory) {
25+
public DOMChunk(String documentUri, Document document, Element chunkElement, XmlChunkConfig xmlChunkConfig, XPathFactory xpathFactory) {
2526
this.documentUri = documentUri;
2627
this.document = document;
2728
this.chunkElement = chunkElement;
28-
this.textExpression = textExpression;
29+
this.xmlChunkConfig = xmlChunkConfig;
2930
this.xpathFactory = xpathFactory;
3031
}
3132

@@ -37,8 +38,15 @@ public String getDocumentUri() {
3738
@Override
3839
public String getEmbeddingText() {
3940
NodeList embeddingTextNodes;
41+
String textExpression = xmlChunkConfig.getTextExpression();
42+
43+
XPath xpath = xpathFactory.newXPath();
44+
if (xmlChunkConfig.getNamespaceContext() != null) {
45+
xpath.setNamespaceContext(xmlChunkConfig.getNamespaceContext());
46+
}
47+
4048
try {
41-
embeddingTextNodes = (NodeList) xpathFactory.newXPath().evaluate(textExpression, chunkElement, XPathConstants.NODESET);
49+
embeddingTextNodes = (NodeList) xpath.evaluate(textExpression, chunkElement, XPathConstants.NODESET);
4250
} catch (XPathExpressionException e) {
4351
throw new ConnectorException(String.format("Unable to evaluate XPath expression: %s; cause: %s",
4452
textExpression, e.getMessage()), e);
@@ -49,7 +57,10 @@ public String getEmbeddingText() {
4957

5058
@Override
5159
public void addEmbedding(Embedding embedding) {
52-
this.document.createElement("embedding").setTextContent(embedding.vectorAsList().toString());
60+
// DOM is fine with null as a value for the namespace.
61+
Element embeddingElement = document.createElementNS(xmlChunkConfig.getEmbeddingNamespace(), xmlChunkConfig.getEmbeddingName());
62+
embeddingElement.setTextContent(embedding.vectorAsList().toString());
63+
chunkElement.appendChild(embeddingElement);
5364
}
5465

5566
private String concatenateNodesIntoString(NodeList embeddingTextNodes) {

src/main/java/com/marklogic/spark/writer/embedding/DOMChunkSelector.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,38 @@
55

66
import com.marklogic.client.document.DocumentWriteOperation;
77
import com.marklogic.client.impl.DocumentWriteOperationImpl;
8-
import com.marklogic.client.impl.HandleAccessor;
98
import com.marklogic.client.io.DOMHandle;
10-
import com.marklogic.client.io.marker.AbstractWriteHandle;
119
import com.marklogic.spark.ConnectorException;
10+
import com.marklogic.spark.writer.dom.DOMHelper;
1211
import org.w3c.dom.Document;
1312
import org.w3c.dom.Element;
1413
import org.w3c.dom.Node;
1514
import org.w3c.dom.NodeList;
16-
import org.xml.sax.InputSource;
1715

18-
import javax.xml.parsers.DocumentBuilderFactory;
1916
import javax.xml.xpath.XPathConstants;
2017
import javax.xml.xpath.XPathExpression;
2118
import javax.xml.xpath.XPathExpressionException;
2219
import javax.xml.xpath.XPathFactory;
23-
import java.io.StringReader;
2420
import java.util.ArrayList;
2521
import java.util.List;
2622

2723
public class DOMChunkSelector implements ChunkSelector {
2824

2925
private final XPathFactory xpathFactory;
3026
private final XPathExpression chunksExpression;
31-
private final String chunkTextExpression;
32-
private final DocumentBuilderFactory documentBuilderFactory;
27+
private final XmlChunkConfig xmlChunkConfig;
28+
private final DOMHelper domHelper;
3329

34-
public DOMChunkSelector(String chunksExpression, String chunkTextExpression) {
30+
public DOMChunkSelector(String chunksExpression, XmlChunkConfig xmlChunkConfig) {
3531
this.xpathFactory = XPathFactory.newInstance();
36-
try {
37-
this.chunksExpression = this.xpathFactory.newXPath().compile(chunksExpression);
38-
} catch (XPathExpressionException e) {
39-
throw new ConnectorException(String.format(
40-
"Unable to compile XPath expression for selecting chunks: %s; cause: %s", chunksExpression, e.getMessage()), e);
41-
}
42-
this.chunkTextExpression = chunkTextExpression;
43-
this.documentBuilderFactory = DocumentBuilderFactory.newInstance();
32+
this.xmlChunkConfig = xmlChunkConfig;
33+
this.domHelper = new DOMHelper(xmlChunkConfig.getNamespaceContext());
34+
this.chunksExpression = domHelper.compileXPath(chunksExpression, "selecting chunks");
4435
}
4536

4637
@Override
4738
public DocumentAndChunks selectChunks(DocumentWriteOperation sourceDocument) {
48-
Document doc = extractDocument(sourceDocument);
39+
Document doc = domHelper.extractDocument(sourceDocument);
4940

5041
NodeList chunkNodes = selectChunkNodes(doc);
5142
if (chunkNodes.getLength() == 0) {
@@ -58,20 +49,6 @@ public DocumentAndChunks selectChunks(DocumentWriteOperation sourceDocument) {
5849
return new DocumentAndChunks(docToWrite, chunks);
5950
}
6051

61-
private Document extractDocument(DocumentWriteOperation sourceDocument) {
62-
AbstractWriteHandle handle = sourceDocument.getContent();
63-
if (handle instanceof DOMHandle) {
64-
return ((DOMHandle) handle).get();
65-
}
66-
String xml = HandleAccessor.contentAsString(handle);
67-
try {
68-
return documentBuilderFactory.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
69-
} catch (Exception e) {
70-
throw new ConnectorException(String.format("Unable to parse XML for document with URI: %s; cause: %s",
71-
sourceDocument.getUri(), e.getMessage()), e);
72-
}
73-
}
74-
7552
private NodeList selectChunkNodes(Document doc) {
7653
try {
7754
return (NodeList) chunksExpression.evaluate(doc, XPathConstants.NODESET);
@@ -89,7 +66,7 @@ private List<Chunk> makeChunks(DocumentWriteOperation sourceDocument, Document d
8966
throw new ConnectorException(String.format("XPath expression for selecting chunks must only " +
9067
"select elements; XPath: %s; document URI: %s", chunksExpression, sourceDocument.getUri()));
9168
}
92-
chunks.add(new DOMChunk(sourceDocument.getUri(), document, (Element) node, chunkTextExpression, xpathFactory));
69+
chunks.add(new DOMChunk(sourceDocument.getUri(), document, (Element) node, xmlChunkConfig, xpathFactory));
9370
}
9471
return chunks;
9572
}

src/main/java/com/marklogic/spark/writer/embedding/EmbedderDocumentProcessorFactory.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.spark.ConnectorException;
77
import com.marklogic.spark.ContextSupport;
88
import com.marklogic.spark.Options;
9+
import com.marklogic.spark.writer.dom.XPathNamespaceContext;
910
import com.marklogic.spark.writer.DocumentProcessor;
1011
import dev.langchain4j.model.embedding.EmbeddingModel;
1112

@@ -61,13 +62,16 @@ private static ChunkSelector makeJsonChunkSelector(ContextSupport context) {
6162
}
6263

6364
private static ChunkSelector makeXmlChunkSelector(ContextSupport context) {
64-
return new XmlChunkSelector.Builder()
65-
.withChunksXPathExpression(context.getStringOption(Options.WRITE_EMBEDDER_CHUNKS_XPATH))
66-
.withTextXPathExpression(context.getStringOption(Options.WRITE_EMBEDDER_TEXT_XPATH))
67-
.withEmbeddingName(context.getStringOption(Options.WRITE_EMBEDDER_EMBEDDING_NAME))
68-
.withEmbeddingNamespace(context.getStringOption(Options.WRITE_EMBEDDER_EMBEDDING_NAMESPACE))
69-
.withXPathNamespaces(context.getGlobalNamespaces())
70-
.build();
65+
XmlChunkConfig xmlChunkConfig = new XmlChunkConfig(
66+
context.getStringOption(Options.WRITE_EMBEDDER_TEXT_XPATH),
67+
context.getStringOption(Options.WRITE_EMBEDDER_EMBEDDING_NAME),
68+
context.getStringOption(Options.WRITE_EMBEDDER_EMBEDDING_NAMESPACE),
69+
new XPathNamespaceContext(context.getProperties())
70+
);
71+
return new DOMChunkSelector(
72+
context.getStringOption(Options.WRITE_EMBEDDER_CHUNKS_XPATH),
73+
xmlChunkConfig
74+
);
7175
}
7276

7377
public static Optional<EmbeddingModel> makeEmbeddingModel(ContextSupport context) {

0 commit comments

Comments
 (0)