Skip to content

Commit b0f70ea

Browse files
authored
Merge pull request #357 from marklogic/feature/embedder-testing
Added debug logging for the embedder
2 parents e62a8e2 + 6f647d2 commit b0f70ea

File tree

2 files changed

+151
-10
lines changed

2 files changed

+151
-10
lines changed

src/main/java/com/marklogic/spark/Options.java

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,23 +127,131 @@ public abstract class Options {
127127
// Example - "spark.marklogic.write.json.ignoreNullFields=false.
128128
public static final String WRITE_JSON_SERIALIZATION_OPTION_PREFIX = "spark.marklogic.write.json.";
129129

130-
// Add @since annotations before we release.
130+
/**
131+
* Enables the splitter feature by defining an XPath expression for selecting text to split in a document.
132+
*
133+
* @since 2.5.0
134+
*/
131135
public static final String WRITE_SPLITTER_XPATH = "spark.marklogic.write.splitter.xpath";
136+
137+
/**
138+
* Enables the splitter feature by defining one or more newline-delimited JSON Pointer expressions for selecting
139+
* text to split in a document.
140+
*
141+
* @since 2.5.0
142+
*/
143+
public static final String WRITE_SPLITTER_JSON_POINTERS = "spark.marklogic.write.splitter.jsonPointers";
144+
145+
/**
146+
* Enables the splitter feature by declaring that all the text in a document should be split. This is typically for
147+
* text documents, but could be used for JSON and XML as well.
148+
*
149+
* @since 2.5.0
150+
*/
151+
public static final String WRITE_SPLITTER_TEXT = "spark.marklogic.writer.splitter.text";
152+
153+
/**
154+
* Defines the maximum chunk size in characters. Defaults to 1000.
155+
*
156+
* @since 2.5.0
157+
*/
132158
public static final String WRITE_SPLITTER_MAX_CHUNK_SIZE = "spark.marklogic.write.splitter.maxChunkSize";
159+
160+
/**
161+
* Defines the maximum overlap size in characters between two chunks. Defaults to 0.
162+
*
163+
* @since 2.5.0
164+
*/
133165
public static final String WRITE_SPLITTER_MAX_OVERLAP_SIZE = "spark.marklogic.write.splitter.maxOverlapSize";
134-
public static final String WRITE_SPLITTER_TEXT = "spark.marklogic.writer.splitter.text";
166+
167+
/**
168+
* Defines a regex for splitting text into chunks. The default strategy is langchain4's "recursive" strategy that
169+
* splits on paragraphs, sentences, lines, and words.
170+
*
171+
* @since 2.5.0
172+
*/
135173
public static final String WRITE_SPLITTER_REGEX = "spark.marklogic.write.splitter.regex";
174+
175+
/**
176+
* Defines a delimiter for usage with the splitter regex option. The delimiter joins together two or more chunks
177+
* identified via the regex to produce a chunk that is as close as possible to the maximum chunk size.
178+
*
179+
* @since 2.5.0
180+
*/
136181
public static final String WRITE_SPLITTER_JOIN_DELIMITER = "spark.marklogic.splitter.joinDelimiter";
137-
public static final String WRITE_SPLITTER_JSON_POINTERS = "spark.marklogic.write.splitter.jsonPointers";
182+
183+
/**
184+
* Defines the class name of an implementation of langchain4j's {@code dev.langchain4j.data.document.DocumentSplitter}
185+
* interface to be used for splitting the selected text into chunks.
186+
*
187+
* @since 2.5.0
188+
*/
138189
public static final String WRITE_SPLITTER_CUSTOM_CLASS = "spark.marklogic.write.splitter.customClass";
190+
191+
/**
192+
* Defines one or more options to pass in a {@code Map<String, String>} to the constructor of the custom splitter
193+
* class.
194+
*
195+
* @since 2.5.0
196+
*/
139197
public static final String WRITE_SPLITTER_CUSTOM_CLASS_OPTION_PREFIX = "spark.marklogic.write.splitter.customClass.option.";
198+
199+
/**
200+
* Configures the connector to write chunks to separate "sidecar" documents instead of to the source document (the
201+
* default behavior). Defines the maximum number of chunks to write to a sidecar document.
202+
*
203+
* @since 2.5.0
204+
*/
140205
public static final String WRITE_SPLITTER_SIDECAR_MAX_CHUNKS = "spark.marklogic.write.splitter.sidecar.maxChunks";
206+
207+
/**
208+
* Defines the type - either JSON or XML - of each chunk document. Defaults to the type of the source document.
209+
*
210+
* @since 2.5.0
211+
*/
141212
public static final String WRITE_SPLITTER_SIDECAR_DOCUMENT_TYPE = "spark.marklogic.write.splitter.sidecar.documentType";
213+
214+
/**
215+
* Comma-delimited list of collections to assign to each chunk document.
216+
*
217+
* @since 2.5.0
218+
*/
142219
public static final String WRITE_SPLITTER_SIDECAR_COLLECTIONS = "spark.marklogic.write.splitter.sidecar.collections";
220+
221+
/**
222+
* Comma-delimited list of roles and capabilities to assign to each chunk document. If not defined, chunk documents
223+
* will inherit the permissions defined by {@code WRITE_PERMISSIONS}.
224+
*
225+
* @since 2.5.0
226+
*/
143227
public static final String WRITE_SPLITTER_SIDECAR_PERMISSIONS = "spark.marklogic.write.splitter.sidecar.permissions";
228+
229+
/**
230+
* Root name for a JSON or XML sidecar chunk document.
231+
*
232+
* @since 2.5.0
233+
*/
144234
public static final String WRITE_SPLITTER_SIDECAR_ROOT_NAME = "spark.marklogic.write.splitter.sidecar.rootName";
235+
236+
/**
237+
* URI prefix for each sidecar chunk document. If defined, will be followed by a UUID.
238+
*
239+
* @since 2.5.0
240+
*/
145241
public static final String WRITE_SPLITTER_SIDECAR_URI_PREFIX = "spark.marklogic.write.splitter.sidecar.uriPrefix";
242+
243+
/**
244+
* URI suffix for each sidecar chunk document. If defined, will be preceded by a UUID.
245+
*
246+
* @since 2.5.0
247+
*/
146248
public static final String WRITE_SPLITTER_SIDECAR_URI_SUFFIX = "spark.marklogic.write.splitter.sidecar.uriSuffix";
249+
250+
/**
251+
* Namespace for XML sidecar chunk documents.
252+
*
253+
* @since 2.5.0
254+
*/
147255
public static final String WRITE_SPLITTER_SIDECAR_XML_NAMESPACE = "spark.marklogic.write.splitter.sidecar.xmlNamespace";
148256

149257
// For writing RDF

src/main/java/com/marklogic/spark/writer/embedding/EmbeddingGenerator.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.ArrayList;
1414
import java.util.Iterator;
1515
import java.util.List;
16+
import java.util.concurrent.atomic.AtomicLong;
1617
import java.util.stream.Collectors;
1718

1819
class EmbeddingGenerator {
@@ -23,6 +24,10 @@ class EmbeddingGenerator {
2324
private final EmbeddingModel embeddingModel;
2425
private final int batchSize;
2526

27+
// Only used for debug logging.
28+
private static final AtomicLong tokenCount = new AtomicLong(0);
29+
private static final AtomicLong requestCount = new AtomicLong(0);
30+
2631
private List<Chunk> pendingChunks = new ArrayList<>();
2732

2833
EmbeddingGenerator(EmbeddingModel embeddingModel) {
@@ -82,18 +87,46 @@ private void addChunkToPendingChunks(Chunk chunk) {
8287
}
8388

8489
private void addEmbeddingsToChunks(List<Chunk> chunks) {
85-
List<TextSegment> textSegments = chunks.stream()
90+
List<TextSegment> textSegments = makeTextSegments(chunks);
91+
Response<List<Embedding>> response = embeddingModel.embedAll(textSegments);
92+
logResponse(response, textSegments);
93+
94+
if (response.content() == null) {
95+
Util.EMBEDDER_LOGGER.warn("Sent {} chunks; no embeddings were returned; finish reason: {}",
96+
textSegments.size(), response.finishReason());
97+
} else {
98+
List<Embedding> embeddings = response.content();
99+
for (int i = 0; i < embeddings.size(); i++) {
100+
chunks.get(i).addEmbedding(embeddings.get(i));
101+
}
102+
}
103+
}
104+
105+
private List<TextSegment> makeTextSegments(List<Chunk> chunks) {
106+
return chunks.stream()
86107
.map(chunk -> new TextSegment(chunk.getEmbeddingText(), TEXT_SEGMENT_METADATA))
87108
.collect(Collectors.toList());
109+
}
88110

89-
Response<List<Embedding>> response = embeddingModel.embedAll(textSegments);
111+
private void logResponse(Response<List<Embedding>> response, List<TextSegment> textSegments) {
90112
if (Util.EMBEDDER_LOGGER.isInfoEnabled()) {
91-
Util.EMBEDDER_LOGGER.info("Sent {} chunks; token usage: {}", textSegments.size(), response.tokenUsage());
92-
}
113+
// Not every embedding model provides token usage.
114+
if (response.tokenUsage() != null) {
115+
Util.EMBEDDER_LOGGER.info("Sent {} chunks; token usage: {}", textSegments.size(), response.tokenUsage());
116+
} else {
117+
Util.EMBEDDER_LOGGER.info("Sent {} chunks", textSegments.size());
118+
}
93119

94-
List<Embedding> embeddings = response.content();
95-
for (int i = 0; i < embeddings.size(); i++) {
96-
chunks.get(i).addEmbedding(embeddings.get(i));
120+
if (Util.EMBEDDER_LOGGER.isDebugEnabled()) {
121+
long totalRequests = requestCount.incrementAndGet();
122+
if (response.tokenUsage() != null) {
123+
Util.EMBEDDER_LOGGER.debug("Requests: {}; tokens: {}", totalRequests,
124+
tokenCount.addAndGet(response.tokenUsage().inputTokenCount())
125+
);
126+
} else {
127+
Util.EMBEDDER_LOGGER.debug("Requests: {}", totalRequests);
128+
}
129+
}
97130
}
98131
}
99132
}

0 commit comments

Comments
 (0)