Skip to content

Commit 59dc9e8

Browse files
authored
Merge pull request #303 from marklogic/feature/17141-write-stream-zip-file
MLE-17141 Verifying normal zip files can be streamed
2 parents efcbee1 + 6a24785 commit 59dc9e8

File tree

3 files changed

+37
-2
lines changed

3 files changed

+37
-2
lines changed

src/main/java/com/marklogic/spark/MarkLogicFileTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class MarkLogicFileTable extends FileTable {
4343
@Override
4444
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
4545
if ("true".equalsIgnoreCase(options.get(Options.STREAM_FILES)) && Util.MAIN_LOGGER.isInfoEnabled()) {
46-
Util.MAIN_LOGGER.info("Will defer reading of file contents so they can be streamed during the writer phase.");
46+
Util.MAIN_LOGGER.info("File streaming is enabled; will read files during writer phase.");
4747
}
4848
return new FileScanBuilder(options.asCaseSensitiveMap(), super.fileIndex());
4949
}

src/main/java/com/marklogic/spark/reader/document/DocumentScanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class DocumentScanBuilder implements ScanBuilder, SupportsPushDownLimit {
1717
DocumentScanBuilder(CaseInsensitiveStringMap options, StructType schema) {
1818
this.context = new DocumentContext(options, schema);
1919
if (this.context.isStreamingFiles() && Util.MAIN_LOGGER.isInfoEnabled()) {
20-
Util.MAIN_LOGGER.info("Will defer reading documents from MarkLogic so they can be streamed to files during the writer phase.");
20+
Util.MAIN_LOGGER.info("File streaming is enabled; will read documents from MarkLogic during writer phase.");
2121
}
2222
}
2323

src/test/java/com/marklogic/spark/writer/file/WriteDocumentZipFilesTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,41 @@ void opaqueURI(@TempDir @NotNull Path tempDir) throws IOException {
9191
"'schema-specific part', which is just example/123.xml.");
9292
}
9393

94+
/**
95+
* Verifies that streaming documents to a zip file "just works" on account of supporting streaming of archive files
96+
* first. The same ZipFileWriter is used for both. The only difference with archive files is that it will also
97+
* check for metadata in each Spark row and include a metadata entry in the archive file.
98+
*
99+
* @param tempDir
100+
* @throws Exception
101+
*/
102+
@Test
103+
void streamZipFile(@TempDir Path tempDir) throws Exception {
104+
Dataset<Row> dataset = newSparkSession().read()
105+
.format(CONNECTOR_IDENTIFIER)
106+
.option(Options.READ_DOCUMENTS_PARTITIONS_PER_FOREST, 1)
107+
.option(Options.CLIENT_URI, makeClientUri())
108+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
109+
.option(Options.STREAM_FILES, true)
110+
.load();
111+
112+
assertEquals(15, dataset.count(), "Should have 1 row per author document.");
113+
dataset.collectAsList().forEach(row -> {
114+
assertFalse(row.isNullAt(0), "The URI column should be non-null.");
115+
assertTrue(row.isNullAt(1), "The content column should be empty. The document will be read during the " +
116+
"writer phase instead.");
117+
});
118+
119+
dataset.write().format(CONNECTOR_IDENTIFIER)
120+
.option(Options.STREAM_FILES, true)
121+
.option(Options.CLIENT_URI, makeClientUri())
122+
.option(Options.WRITE_FILES_COMPRESSION, "zip")
123+
.mode(SaveMode.Append)
124+
.save(tempDir.toFile().getAbsolutePath());
125+
126+
verifyZipFilesHaveExpectedFilenames(tempDir);
127+
verifyZipFilesContainFifteenAuthors(tempDir);
128+
}
94129

95130
private Dataset<Row> readAuthorCollection() {
96131
return newSparkSession().read()

0 commit comments

Comments
 (0)