Skip to content

Commit efcbee1

Browse files
authored
Merge pull request #302 from marklogic/feature/stream-normal-zip-file
MLE-17141 Can now stream normal zip files into MarkLogic
2 parents 759ca71 + a71b460 commit efcbee1

File tree

10 files changed

+205
-53
lines changed

10 files changed

+205
-53
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
import org.apache.spark.sql.catalyst.InternalRow;
1414
import org.apache.spark.sql.connector.read.PartitionReader;
1515

16-
import java.io.ByteArrayOutputStream;
1716
import java.io.IOException;
18-
import java.io.ObjectOutputStream;
1917
import java.util.ArrayList;
2018
import java.util.List;
2119
import java.util.zip.ZipEntry;
@@ -36,11 +34,6 @@ public class ArchiveFileReader implements PartitionReader<InternalRow> {
3634
// Legacy = content first, then metadata.
3735
private Boolean isLegacyFormat;
3836

39-
public enum StreamingMode {
40-
STREAM_DURING_READER_PHASE,
41-
STREAM_DURING_WRITER_PHASE
42-
}
43-
4437
ArchiveFileReader(FilePartition filePartition, FileContext fileContext) {
4538
this(
4639
filePartition, fileContext,
@@ -229,22 +222,13 @@ private byte[] readMetadataEntry(String zipEntryName) throws IOException {
229222
}
230223

231224
/**
232-
* Builds a row containing the file path, the serialized FileContext, and the metadata.
225+
* Builds a row to represent the archive file so that it can be opened during the writer phase.
233226
*/
234227
private InternalRow buildSingleRowForArchiveFile() {
235-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
236-
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
237-
oos.writeObject(this.fileContext);
238-
oos.flush();
239-
} catch (Exception ex) {
240-
String message = String.format("Unable to build row for archive file at %s; cause: %s",
241-
this.currentFilePath, ex.getMessage());
242-
throw new ConnectorException(message, ex);
243-
}
244-
228+
byte[] serializedFileContext = FileUtil.serializeFileContext(fileContext, currentFilePath);
245229
InternalRow row = new DocumentRowBuilder(this.metadataCategories)
246230
.withUri(this.currentFilePath)
247-
.withContent(baos.toByteArray())
231+
.withContent(serializedFileContext)
248232
.buildRow();
249233
this.currentFilePath = null;
250234
return row;

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public FileContext(Map<String, String> properties, SerializableConfiguration had
4040
}
4141
}
4242

43-
boolean isZip() {
43+
public boolean isZip() {
4444
return "zip".equalsIgnoreCase(getStringOption(Options.READ_FILES_COMPRESSION));
4545
}
4646

src/main/java/com/marklogic/spark/reader/file/FileUtil.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
*/
44
package com.marklogic.spark.reader.file;
55

6+
import com.marklogic.spark.ConnectorException;
7+
68
import java.io.ByteArrayOutputStream;
79
import java.io.IOException;
810
import java.io.InputStream;
11+
import java.io.ObjectOutputStream;
912
import java.util.ArrayList;
1013
import java.util.List;
1114
import java.util.zip.ZipEntry;
@@ -64,4 +67,17 @@ static FilePartition[] makeFilePartitions(String[] files, int numPartitions) {
6467
}
6568
return partitions;
6669
}
70+
71+
static byte[] serializeFileContext(FileContext fileContext, String currentFilePath) {
72+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
73+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
74+
oos.writeObject(fileContext);
75+
oos.flush();
76+
return baos.toByteArray();
77+
} catch (Exception ex) {
78+
String message = String.format("Unable to build row for file at %s; cause: %s",
79+
currentFilePath, ex.getMessage());
80+
throw new ConnectorException(message, ex);
81+
}
82+
}
6783
}

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.apache.spark.unsafe.types.ByteArray;
1212
import org.apache.spark.unsafe.types.UTF8String;
1313

14-
import java.io.ByteArrayOutputStream;
1514
import java.io.IOException;
1615
import java.io.InputStream;
17-
import java.io.ObjectOutputStream;
1816

1917
/**
2018
* "Generic" = read each file as-is with no special processing.
@@ -47,7 +45,9 @@ public boolean next() {
4745

4846
filePathIndex++;
4947
try {
50-
byte[] content = this.isStreaming ? serializeFileContext() : readFileIntoByteArray(path);
48+
byte[] content = this.isStreaming ?
49+
FileUtil.serializeFileContext(fileContext, path) :
50+
readFileIntoByteArray(path);
5151

5252
nextRowToReturn = new GenericInternalRow(new Object[]{
5353
UTF8String.fromString(path),
@@ -80,12 +80,4 @@ private byte[] readFileIntoByteArray(String path) throws IOException {
8080
return fileContext.readBytes(inputStream);
8181
}
8282
}
83-
84-
private byte[] serializeFileContext() throws IOException {
85-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
86-
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
87-
oos.writeObject(fileContext);
88-
}
89-
return baos.toByteArray();
90-
}
9183
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
/**
7+
* Used when streaming from a zip file or archive file.
8+
*/
9+
public enum StreamingMode {
10+
11+
STREAM_DURING_READER_PHASE,
12+
STREAM_DURING_WRITER_PHASE
13+
14+
}

src/main/java/com/marklogic/spark/reader/file/ZipFileReader.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
*/
44
package com.marklogic.spark.reader.file;
55

6+
import com.marklogic.client.io.InputStreamHandle;
67
import com.marklogic.spark.ConnectorException;
8+
import com.marklogic.spark.reader.document.DocumentRowBuilder;
79
import org.apache.commons.crypto.utils.IoUtils;
810
import org.apache.spark.sql.catalyst.InternalRow;
911
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
@@ -14,30 +16,47 @@
1416
import org.slf4j.LoggerFactory;
1517

1618
import java.io.IOException;
19+
import java.util.ArrayList;
1720
import java.util.zip.ZipEntry;
1821
import java.util.zip.ZipInputStream;
1922

2023

21-
class ZipFileReader implements PartitionReader<InternalRow> {
24+
public class ZipFileReader implements PartitionReader<InternalRow> {
2225

2326
private static final Logger logger = LoggerFactory.getLogger(ZipFileReader.class);
2427

2528
private final FilePartition filePartition;
2629
private final FileContext fileContext;
30+
private final StreamingMode streamingMode;
31+
2732
private int nextFilePathIndex;
2833
private String currentFilePath;
2934
private ZipInputStream currentZipInputStream;
3035
private ZipEntry currentZipEntry;
3136

3237
ZipFileReader(FilePartition filePartition, FileContext fileContext) {
38+
this(filePartition, fileContext, fileContext.isStreamingFiles() ? StreamingMode.STREAM_DURING_READER_PHASE : null);
39+
}
40+
41+
public ZipFileReader(FilePartition filePartition, FileContext fileContext, StreamingMode streamingMode) {
3342
this.filePartition = filePartition;
3443
this.fileContext = fileContext;
44+
this.streamingMode = streamingMode;
3545
openNextFile();
3646
}
3747

3848
@Override
39-
public boolean next() throws IOException {
40-
currentZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
49+
public boolean next() {
50+
if (StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode)) {
51+
return nextWhileStreamingDuringReaderPhase();
52+
}
53+
54+
try {
55+
currentZipEntry = FileUtil.findNextFileEntry(currentZipInputStream);
56+
} catch (IOException e) {
57+
throw new ConnectorException(String.format(
58+
"Unable to read from zip file %s; cause: %s", currentFilePath, e.getMessage(), e));
59+
}
4160
if (currentZipEntry != null) {
4261
return true;
4362
}
@@ -51,25 +70,56 @@ public boolean next() throws IOException {
5170

5271
@Override
5372
public InternalRow get() {
73+
if (StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode)) {
74+
return buildRowForZipFile();
75+
}
76+
5477
String zipEntryName = currentZipEntry.getName();
5578
if (logger.isTraceEnabled()) {
5679
logger.trace("Reading zip entry {} from zip file {}.", zipEntryName, this.currentFilePath);
5780
}
5881
String uri = zipEntryName.startsWith("/") ?
5982
this.currentFilePath + zipEntryName :
6083
this.currentFilePath + "/" + zipEntryName;
61-
byte[] content = readZipEntry();
84+
85+
Object content = StreamingMode.STREAM_DURING_WRITER_PHASE.equals(this.streamingMode) ? null
86+
: ByteArray.concat(readZipEntry());
87+
6288
return new GenericInternalRow(new Object[]{
63-
UTF8String.fromString(uri), ByteArray.concat(content),
64-
null, null, null, null, null, null
89+
UTF8String.fromString(uri), content, null, null, null, null, null, null
6590
});
6691
}
6792

93+
/**
94+
* Exposed for {@code ZipFileIterator} to be able to read from the zip stream when it produces a set of
95+
* document inputs.
96+
*
97+
* @return a {@code InputStreamHandle} to avoid reading a content zip entry into memory.
98+
*/
99+
public InputStreamHandle getContentHandleForCurrentZipEntry() {
100+
return new InputStreamHandle(currentZipInputStream);
101+
}
102+
68103
@Override
69104
public void close() {
70105
IoUtils.closeQuietly(this.currentZipInputStream);
71106
}
72107

108+
/**
109+
* Implementation of {@code next()} while streaming during the reader phase, where we don't want to actually read
110+
* anything from a zip file. We just want to build a row per zip file.
111+
*/
112+
private boolean nextWhileStreamingDuringReaderPhase() {
113+
if (currentFilePath != null) {
114+
return true;
115+
}
116+
if (nextFilePathIndex >= filePartition.getPaths().size()) {
117+
return false;
118+
}
119+
openNextFile();
120+
return true;
121+
}
122+
73123
private void openNextFile() {
74124
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
75125
nextFilePathIndex++;
@@ -84,4 +134,14 @@ private byte[] readZipEntry() {
84134
this.currentFilePath, e.getMessage()), e);
85135
}
86136
}
137+
138+
private InternalRow buildRowForZipFile() {
139+
byte[] serializedFileContext = FileUtil.serializeFileContext(fileContext, currentFilePath);
140+
InternalRow row = new DocumentRowBuilder(new ArrayList<>())
141+
.withUri(this.currentFilePath)
142+
.withContent(serializedFileContext)
143+
.buildRow();
144+
this.currentFilePath = null;
145+
return row;
146+
}
87147
}

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
import com.marklogic.spark.ConnectorException;
1414
import com.marklogic.spark.Options;
1515
import com.marklogic.spark.reader.document.DocumentRowSchema;
16-
import com.marklogic.spark.reader.file.ArchiveFileReader;
17-
import com.marklogic.spark.reader.file.FileContext;
18-
import com.marklogic.spark.reader.file.FilePartition;
16+
import com.marklogic.spark.reader.file.*;
1917
import com.marklogic.spark.writer.file.ArchiveFileIterator;
18+
import com.marklogic.spark.writer.file.ZipFileIterator;
2019
import org.apache.spark.sql.catalyst.InternalRow;
2120

2221
import java.io.ByteArrayInputStream;
@@ -106,8 +105,11 @@ private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath,
106105

107106
if ("archive".equalsIgnoreCase(fileContext.getStringOption(Options.READ_FILES_TYPE))) {
108107
return buildIteratorForArchiveFile(filePath, fileContext);
108+
} else if (fileContext.isZip()) {
109+
return buildIteratorForZipFile(filePath, fileContext);
109110
}
110111

112+
// If it's not an archive or normal zip file, we just have generic files that the user wants to stream.
111113
final String decodedPath = fileContext.decodeFilePath(filePath);
112114
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
113115
if (this.documentFormat != null) {
@@ -120,8 +122,14 @@ private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath,
120122
private Iterator<DocBuilder.DocumentInputs> buildIteratorForArchiveFile(String filePath, FileContext fileContext) {
121123
FilePartition filePartition = new FilePartition(Arrays.asList(filePath));
122124
ArchiveFileReader archiveFileReader = new ArchiveFileReader(
123-
filePartition, fileContext, ArchiveFileReader.StreamingMode.STREAM_DURING_WRITER_PHASE
125+
filePartition, fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE
124126
);
125127
return new ArchiveFileIterator(archiveFileReader, this.documentFormat);
126128
}
129+
130+
private Iterator<DocBuilder.DocumentInputs> buildIteratorForZipFile(String filePath, FileContext fileContext) {
131+
FilePartition filePartition = new FilePartition(Arrays.asList(filePath));
132+
ZipFileReader reader = new ZipFileReader(filePartition, fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE);
133+
return new ZipFileIterator(reader, this.documentFormat);
134+
}
127135
}

src/main/java/com/marklogic/spark/writer/file/ArchiveFileIterator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.client.io.DocumentMetadataHandle;
77
import com.marklogic.client.io.Format;
88
import com.marklogic.client.io.InputStreamHandle;
9+
import com.marklogic.spark.Util;
910
import com.marklogic.spark.reader.document.DocumentRowSchema;
1011
import com.marklogic.spark.reader.file.ArchiveFileReader;
1112
import com.marklogic.spark.writer.DocBuilder;
@@ -20,8 +21,8 @@
2021
*/
2122
public class ArchiveFileIterator implements Iterator<DocBuilder.DocumentInputs> {
2223

23-
private ArchiveFileReader archiveFileReader;
24-
private Format documentFormat;
24+
private final ArchiveFileReader archiveFileReader;
25+
private final Format documentFormat;
2526

2627
public ArchiveFileIterator(ArchiveFileReader archiveFileReader, Format documentFormat) {
2728
this.archiveFileReader = archiveFileReader;
@@ -39,9 +40,12 @@ public boolean hasNext() {
3940
@SuppressWarnings("java:S2272")
4041
public DocBuilder.DocumentInputs next() {
4142
InternalRow row = archiveFileReader.get();
43+
String uri = row.getString(0);
44+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
45+
Util.MAIN_LOGGER.debug("Creating input stream for entry {}", uri);
46+
}
4247
InputStreamHandle contentHandle = archiveFileReader.getContentHandleForCurrentZipEntry();
4348
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
44-
String uri = row.getString(0);
4549
if (this.documentFormat != null) {
4650
contentHandle.withFormat(this.documentFormat);
4751
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.file;
5+
6+
import com.marklogic.client.io.Format;
7+
import com.marklogic.client.io.InputStreamHandle;
8+
import com.marklogic.spark.Util;
9+
import com.marklogic.spark.reader.file.ZipFileReader;
10+
import com.marklogic.spark.writer.DocBuilder;
11+
import org.apache.spark.sql.catalyst.InternalRow;
12+
13+
import java.util.Iterator;
14+
15+
public class ZipFileIterator implements Iterator<DocBuilder.DocumentInputs> {
16+
17+
private final ZipFileReader zipFileReader;
18+
private final Format documentFormat;
19+
20+
public ZipFileIterator(ZipFileReader zipFileReader, Format documentFormat) {
21+
this.zipFileReader = zipFileReader;
22+
this.documentFormat = documentFormat;
23+
}
24+
25+
@Override
26+
public boolean hasNext() {
27+
return zipFileReader.next();
28+
}
29+
30+
@Override
31+
// Suppressing sonar warning about throwing a NoSuchElementException. We know this is only used by
32+
// DocumentRowConverter, which properly calls hasNext() before calling next().
33+
@SuppressWarnings("java:S2272")
34+
public DocBuilder.DocumentInputs next() {
35+
InternalRow row = zipFileReader.get();
36+
String uri = row.getString(0);
37+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
38+
Util.MAIN_LOGGER.debug("Creating input stream for entry {}", uri);
39+
}
40+
InputStreamHandle contentHandle = zipFileReader.getContentHandleForCurrentZipEntry();
41+
if (this.documentFormat != null) {
42+
contentHandle.withFormat(this.documentFormat);
43+
}
44+
return new DocBuilder.DocumentInputs(uri, contentHandle, null, null);
45+
}
46+
}

0 commit comments

Comments
 (0)