Skip to content

Commit 9ea7ce0

Browse files
authored
Merge pull request #298 from marklogic/feature/17095-add-iterator-after-swap
MLE-17095 Can now stream an archive into MarkLogic
2 parents 5aaf883 + 0c2d09b commit 9ea7ce0

File tree

7 files changed

+340
-26
lines changed

7 files changed

+340
-26
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.marklogic.spark.reader.file;
55

66
import com.marklogic.client.io.DocumentMetadataHandle;
7+
import com.marklogic.client.io.InputStreamHandle;
78
import com.marklogic.spark.ConnectorException;
89
import com.marklogic.spark.Options;
910
import com.marklogic.spark.Util;
@@ -12,17 +13,20 @@
1213
import org.apache.spark.sql.catalyst.InternalRow;
1314
import org.apache.spark.sql.connector.read.PartitionReader;
1415

16+
import java.io.ByteArrayOutputStream;
1517
import java.io.IOException;
18+
import java.io.ObjectOutputStream;
1619
import java.util.ArrayList;
1720
import java.util.List;
1821
import java.util.zip.ZipEntry;
1922
import java.util.zip.ZipInputStream;
2023

21-
class ArchiveFileReader implements PartitionReader<InternalRow> {
24+
public class ArchiveFileReader implements PartitionReader<InternalRow> {
2225

2326
private final FilePartition filePartition;
2427
private final FileContext fileContext;
2528
private final List<String> metadataCategories;
29+
private final StreamingMode streamingMode;
2630

2731
private String currentFilePath;
2832
private ZipInputStream currentZipInputStream;
@@ -32,9 +36,25 @@ class ArchiveFileReader implements PartitionReader<InternalRow> {
3236
// Legacy = content first, then metadata.
3337
private Boolean isLegacyFormat;
3438

39+
public enum StreamingMode {
40+
STREAM_DURING_READER_PHASE,
41+
STREAM_DURING_WRITER_PHASE
42+
}
43+
3544
ArchiveFileReader(FilePartition filePartition, FileContext fileContext) {
45+
this(
46+
filePartition, fileContext,
47+
// Will refactor this later to avoid duplication of this comparison.
48+
// Should be a nice little method in FileContext.
49+
"true".equalsIgnoreCase(fileContext.getStringOption(Options.STREAM_FILES)) ? StreamingMode.STREAM_DURING_READER_PHASE : null
50+
);
51+
}
52+
53+
public ArchiveFileReader(FilePartition filePartition, FileContext fileContext, StreamingMode streamingMode) {
3654
this.filePartition = filePartition;
3755
this.fileContext = fileContext;
56+
this.streamingMode = streamingMode;
57+
3858
this.metadataCategories = new ArrayList<>();
3959
if (fileContext.hasOption(Options.READ_ARCHIVES_CATEGORIES)) {
4060
for (String category : fileContext.getStringOption(Options.READ_ARCHIVES_CATEGORIES).split(",")) {
@@ -47,6 +67,10 @@ class ArchiveFileReader implements PartitionReader<InternalRow> {
4767

4868
@Override
4969
public boolean next() {
70+
if (StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode)) {
71+
return nextWhileStreamingDuringReaderPhase();
72+
}
73+
5074
try {
5175
ZipEntry nextZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
5276
if (nextZipEntry == null) {
@@ -55,6 +79,7 @@ public boolean next() {
5579

5680
if (isLegacyFormat == null) {
5781
isLegacyFormat = !nextZipEntry.getName().endsWith(".metadata");
82+
logArchiveFormat();
5883
}
5984

6085
return isLegacyFormat ? readContentFollowedByMetadata(nextZipEntry) : readMetadataFollowedByContent();
@@ -70,14 +95,52 @@ public boolean next() {
7095

7196
@Override
7297
public InternalRow get() {
73-
return nextRowToReturn;
98+
return StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode) ?
99+
buildSingleRowForArchiveFile() :
100+
nextRowToReturn;
74101
}
75102

76103
@Override
77104
public void close() {
78105
IOUtils.closeQuietly(this.currentZipInputStream);
79106
}
80107

108+
/**
109+
* Exposed for {@code ArchiveFileIterator} to be able to read from the zip stream when it produces a set of
110+
* document inputs.
111+
*
112+
* @return a {@code InputStreamHandle} to avoid reading a content zip entry into memory.
113+
*/
114+
public InputStreamHandle getContentHandleForCurrentZipEntry() {
115+
return new InputStreamHandle(currentZipInputStream);
116+
}
117+
118+
private void logArchiveFormat() {
119+
if (Util.MAIN_LOGGER.isInfoEnabled() && isLegacyFormat) {
120+
Util.MAIN_LOGGER.info("Archive {} uses Flux 1.0 format, will read content and then metadata.", this.currentFilePath);
121+
}
122+
if (Util.MAIN_LOGGER.isDebugEnabled() && !isLegacyFormat.booleanValue()) {
123+
Util.MAIN_LOGGER.debug("Archive {} uses Flux 1.1+ format, will read metadata and then content.", this.currentFilePath);
124+
}
125+
}
126+
127+
/**
128+
* Implementation of {@code next()} while streaming during the reader phase, where we don't want to actually read
129+
* anything from a zip file. We just want to build a row per zip file.
130+
*
131+
* @return
132+
*/
133+
private boolean nextWhileStreamingDuringReaderPhase() {
134+
if (currentFilePath != null) {
135+
return true;
136+
}
137+
if (nextFilePathIndex >= filePartition.getPaths().size()) {
138+
return false;
139+
}
140+
openNextFile();
141+
return true;
142+
}
143+
81144
/**
82145
* This is the Flux 1.0 "legacy" approach, where content was written first, followed by metadata. This does not
83146
* support streaming.
@@ -87,16 +150,15 @@ private boolean readContentFollowedByMetadata(ZipEntry contentZipEntry) throws I
87150
if (content == null || content.length == 0) {
88151
return openNextFileAndReadNextEntry();
89152
}
90-
final String zipEntryName = contentZipEntry.getName();
91153

154+
final String zipEntryName = contentZipEntry.getName();
92155
byte[] metadataBytes = readMetadataEntry(zipEntryName);
93156
if (metadataBytes == null || metadataBytes.length == 0) {
94157
return openNextFileAndReadNextEntry();
95158
}
96159

97160
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
98161
metadata.fromBuffer(metadataBytes);
99-
100162
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
101163
.withUri(zipEntryName).withContent(content).withMetadata(metadata)
102164
.buildRow();
@@ -105,29 +167,45 @@ private boolean readContentFollowedByMetadata(ZipEntry contentZipEntry) throws I
105167

106168
/**
107169
* This is the Flux 1.1+ approach, where the metadata entry is written first. This supports streaming.
170+
* <p>
171+
* This is where we implement streaming-during-write-to-MarkLogic. We read the metadata entry as normal - good.
172+
* Then we build everything in our row except the content.
108173
*/
109174
private boolean readMetadataFollowedByContent() throws IOException {
110175
byte[] metadataBytes = fileContext.readBytes(currentZipInputStream);
111176
if (metadataBytes == null || metadataBytes.length == 0) {
112177
return openNextFileAndReadNextEntry();
113178
}
114179

115-
ZipEntry contentZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
116-
byte[] content = fileContext.readBytes(currentZipInputStream);
117-
118180
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
119181
metadata.fromBuffer(metadataBytes);
120-
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
182+
183+
// We still do this to get the stream ready to read the next entry.
184+
ZipEntry contentZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
185+
186+
DocumentRowBuilder rowBuilder = new DocumentRowBuilder(this.metadataCategories)
121187
.withUri(contentZipEntry.getName())
122-
.withContent(content).withMetadata(metadata)
123-
.buildRow();
188+
.withMetadata(metadata);
189+
190+
if (!StreamingMode.STREAM_DURING_WRITER_PHASE.equals(this.streamingMode)) {
191+
byte[] content = fileContext.readBytes(currentZipInputStream);
192+
rowBuilder = rowBuilder.withContent(content);
193+
}
194+
195+
this.nextRowToReturn = rowBuilder.buildRow();
124196
return true;
125197
}
126198

127199
private void openNextFile() {
128-
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
200+
final boolean isStreamingDuringRead = StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode);
201+
final String nextFilePath = filePartition.getPaths().get(nextFilePathIndex);
202+
203+
this.currentFilePath = isStreamingDuringRead ? nextFilePath : fileContext.decodeFilePath(nextFilePath);
129204
nextFilePathIndex++;
130-
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
205+
206+
if (!isStreamingDuringRead) {
207+
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
208+
}
131209
}
132210

133211
private boolean openNextFileAndReadNextEntry() {
@@ -151,4 +229,26 @@ private byte[] readMetadataEntry(String zipEntryName) throws IOException {
151229
}
152230
return fileContext.readBytes(currentZipInputStream);
153231
}
232+
233+
/**
234+
* Builds a row containing the file path, the serialized FileContext, and the metadata.
235+
*/
236+
private InternalRow buildSingleRowForArchiveFile() {
237+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
238+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
239+
oos.writeObject(this.fileContext);
240+
oos.flush();
241+
} catch (Exception ex) {
242+
String message = String.format("Unable to build row for archive file at %s; cause: %s",
243+
this.currentFilePath, ex.getMessage());
244+
throw new ConnectorException(message, ex);
245+
}
246+
247+
InternalRow row = new DocumentRowBuilder(this.metadataCategories)
248+
.withUri(this.currentFilePath)
249+
.withContent(baos.toByteArray())
250+
.buildRow();
251+
this.currentFilePath = null;
252+
return row;
253+
}
154254
}

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
import com.marklogic.spark.ConnectorException;
1414
import com.marklogic.spark.Options;
1515
import com.marklogic.spark.reader.document.DocumentRowSchema;
16+
import com.marklogic.spark.reader.file.ArchiveFileReader;
1617
import com.marklogic.spark.reader.file.FileContext;
18+
import com.marklogic.spark.reader.file.FilePartition;
19+
import com.marklogic.spark.writer.file.ArchiveFileIterator;
1720
import org.apache.spark.sql.catalyst.InternalRow;
1821

1922
import java.io.ByteArrayInputStream;
2023
import java.io.IOException;
2124
import java.io.ObjectInputStream;
2225
import java.util.ArrayList;
26+
import java.util.Arrays;
2327
import java.util.Iterator;
2428
import java.util.List;
2529
import java.util.stream.Stream;
@@ -93,20 +97,31 @@ private JsonNode deserializeContentToJson(String initialUri, BytesHandle content
9397
*/
9498
private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath, InternalRow row) {
9599
byte[] bytes = row.getBinary(1);
96-
String filePathInErrorMessage = filePath;
97-
try {
98-
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
99-
FileContext fileContext = (FileContext) ois.readObject();
100-
final String decodedPath = fileContext.decodeFilePath(filePath);
101-
filePathInErrorMessage = decodedPath;
102-
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
103-
if (this.documentFormat != null) {
104-
streamHandle.withFormat(this.documentFormat);
105-
}
106-
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
107-
return Stream.of(new DocBuilder.DocumentInputs(filePath, streamHandle, null, metadata)).iterator();
100+
FileContext fileContext;
101+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
102+
fileContext = (FileContext) ois.readObject();
108103
} catch (Exception e) {
109-
throw new ConnectorException(String.format("Unable to read from file %s; cause: %s", filePathInErrorMessage, e.getMessage()));
104+
throw new ConnectorException(String.format("Unable to read from file %s; cause: %s", filePath, e.getMessage()));
110105
}
106+
107+
if ("archive".equalsIgnoreCase(fileContext.getStringOption(Options.READ_FILES_TYPE))) {
108+
return buildIteratorForArchiveFile(filePath, fileContext);
109+
}
110+
111+
final String decodedPath = fileContext.decodeFilePath(filePath);
112+
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
113+
if (this.documentFormat != null) {
114+
streamHandle.withFormat(this.documentFormat);
115+
}
116+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
117+
return Stream.of(new DocBuilder.DocumentInputs(filePath, streamHandle, null, metadata)).iterator();
118+
}
119+
120+
private Iterator<DocBuilder.DocumentInputs> buildIteratorForArchiveFile(String filePath, FileContext fileContext) {
121+
FilePartition filePartition = new FilePartition(Arrays.asList(filePath));
122+
ArchiveFileReader archiveFileReader = new ArchiveFileReader(
123+
filePartition, fileContext, ArchiveFileReader.StreamingMode.STREAM_DURING_WRITER_PHASE
124+
);
125+
return new ArchiveFileIterator(archiveFileReader, this.documentFormat);
111126
}
112127
}

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ private void closeArchiveWriter() {
257257
* @param writeOp
258258
*/
259259
private void writeDocumentViaPutOperation(DocumentWriteOperation writeOp) {
260-
final String uri = writeOp.getUri();
260+
final String uri = replaceSpacesInUriForPutEndpoint(writeOp.getUri());
261261
try {
262262
this.documentManager.write(uri, writeOp.getMetadata(), (GenericWriteHandle) writeOp.getContent());
263263
this.successItemCount.incrementAndGet();
@@ -267,6 +267,15 @@ private void writeDocumentViaPutOperation(DocumentWriteOperation writeOp) {
267267
}
268268
}
269269

270+
/**
271+
* Sigh. Using URLEncoder.encode will convert forward slashes into "%2F", which a user almost certainly does not
272+
* want, since those are meaningful in MarkLogic URIs. The main problem to address with the PUT endpoint is that it
273+
* erroneously does not accept spaces (see MLE-17088). So this simply replaces spaces.
274+
*/
275+
private String replaceSpacesInUriForPutEndpoint(String uri) {
276+
return uri.replace(" ", "%20");
277+
}
278+
270279
private void captureFailure(String message, String documentUri) {
271280
Util.MAIN_LOGGER.error("Unable to write document with URI: {}; cause: {}", documentUri, message);
272281
failedItemCount.incrementAndGet();
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.file;
5+
6+
import com.marklogic.client.io.DocumentMetadataHandle;
7+
import com.marklogic.client.io.Format;
8+
import com.marklogic.client.io.InputStreamHandle;
9+
import com.marklogic.spark.reader.document.DocumentRowSchema;
10+
import com.marklogic.spark.reader.file.ArchiveFileReader;
11+
import com.marklogic.spark.writer.DocBuilder;
12+
import org.apache.spark.sql.catalyst.InternalRow;
13+
14+
import java.util.Iterator;
15+
16+
/**
17+
* Provides an {@code Iterator} interface on top of an {@code ArchiveFileReader}, thereby allowing a
18+
* {@code DocumentRowConverter} to build sets of document inputs from an archive file without reading any content entry
19+
* into memory - thus supporting streaming of an archive.
20+
*/
21+
public class ArchiveFileIterator implements Iterator<DocBuilder.DocumentInputs> {
22+
23+
private ArchiveFileReader archiveFileReader;
24+
private Format documentFormat;
25+
26+
public ArchiveFileIterator(ArchiveFileReader archiveFileReader, Format documentFormat) {
27+
this.archiveFileReader = archiveFileReader;
28+
this.documentFormat = documentFormat;
29+
}
30+
31+
@Override
32+
public boolean hasNext() {
33+
return archiveFileReader.next();
34+
}
35+
36+
@Override
37+
// Suppressing sonar warning about throwing a NoSuchElementException. We know this is only used by
38+
// DocumentRowConverter, which properly calls hasNext() before calling next().
39+
@SuppressWarnings("java:S2272")
40+
public DocBuilder.DocumentInputs next() {
41+
InternalRow row = archiveFileReader.get();
42+
InputStreamHandle contentHandle = archiveFileReader.getContentHandleForCurrentZipEntry();
43+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
44+
String uri = row.getString(0);
45+
if (this.documentFormat != null) {
46+
contentHandle.withFormat(this.documentFormat);
47+
}
48+
return new DocBuilder.DocumentInputs(uri, contentHandle, null, metadata);
49+
}
50+
}

0 commit comments

Comments
 (0)