Skip to content

Commit ea415a2

Browse files
authored
Merge pull request #307 from marklogic/feature/closeable-fix
MLE-17142 Ensuring input streams are closed
2 parents 7338f54 + 21d880a commit ea415a2

File tree

10 files changed

+197
-64
lines changed

10 files changed

+197
-64
lines changed

CONTRIBUTING.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ you've introduced on the feature branch you're working on. You can then click on
8383
Note that if you only need results on code smells and vulnerabilities, you can repeatedly run `./gradlew sonar`
8484
without having to re-run the tests.
8585

86+
You can also force Gradle to run `sonar` if any tests fail:
87+
88+
./gradlew clean test sonar --continue
89+
8690
## Accessing MarkLogic logs in Grafana
8791

8892
This project's `docker-compose-3nodes.yaml` file includes

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.marklogic.spark.reader.document.DocumentRowSchema;
1616
import com.marklogic.spark.reader.file.*;
1717
import com.marklogic.spark.writer.file.ArchiveFileIterator;
18+
import com.marklogic.spark.writer.file.FileIterator;
19+
import com.marklogic.spark.writer.file.GzipFileIterator;
1820
import com.marklogic.spark.writer.file.ZipFileIterator;
1921
import org.apache.spark.sql.catalyst.InternalRow;
2022

@@ -114,12 +116,12 @@ private Iterator<DocBuilder.DocumentInputs> streamContentFromFile(String filePat
114116

115117
private Iterator<DocBuilder.DocumentInputs> buildIteratorForGenericFile(InternalRow row, String filePath, FileContext fileContext) {
116118
final String decodedPath = fileContext.decodeFilePath(filePath);
117-
InputStreamHandle streamHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
119+
InputStreamHandle contentHandle = new InputStreamHandle(fileContext.openFile(decodedPath));
118120
if (this.documentFormat != null) {
119-
streamHandle.withFormat(this.documentFormat);
121+
contentHandle.withFormat(this.documentFormat);
120122
}
121123
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
122-
return Stream.of(new DocBuilder.DocumentInputs(filePath, streamHandle, null, metadata)).iterator();
124+
return new FileIterator(contentHandle, new DocBuilder.DocumentInputs(filePath, contentHandle, null, metadata));
123125
}
124126

125127
private Iterator<DocBuilder.DocumentInputs> buildIteratorForArchiveFile(String filePath, FileContext fileContext) {
@@ -138,12 +140,6 @@ private Iterator<DocBuilder.DocumentInputs> buildIteratorForZipFile(String fileP
138140

139141
private Iterator<DocBuilder.DocumentInputs> buildIteratorForGzipFile(String filePath, FileContext fileContext) {
140142
GzipFileReader reader = new GzipFileReader(new FilePartition(filePath), fileContext, StreamingMode.STREAM_DURING_WRITER_PHASE);
141-
reader.next();
142-
String uri = reader.get().getString(0);
143-
InputStreamHandle contentHandle = reader.getStreamingContentHandle();
144-
if (this.documentFormat != null) {
145-
contentHandle.withFormat(this.documentFormat);
146-
}
147-
return Stream.of(new DocBuilder.DocumentInputs(uri, contentHandle, null, null)).iterator();
143+
return new GzipFileIterator(reader, this.documentFormat);
148144
}
149145
}

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.marklogic.spark.reader.file.TripleRowSchema;
2121
import com.marklogic.spark.writer.file.ZipFileWriter;
2222
import com.marklogic.spark.writer.rdf.RdfRowConverter;
23+
import org.apache.commons.io.IOUtils;
2324
import org.apache.spark.sql.catalyst.InternalRow;
2425
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2526
import org.apache.spark.sql.connector.write.DataWriter;
@@ -29,6 +30,7 @@
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132

33+
import java.io.Closeable;
3234
import java.util.ArrayList;
3335
import java.util.Iterator;
3436
import java.util.List;
@@ -94,12 +96,20 @@ public void write(InternalRow row) {
9496
throwWriteFailureIfExists();
9597

9698
Iterator<DocBuilder.DocumentInputs> iterator = rowConverter.convertRow(row);
97-
while (iterator.hasNext()) {
98-
DocumentWriteOperation writeOp = this.docBuilder.build(iterator.next());
99-
if (this.isStreamingFiles) {
100-
writeDocumentViaPutOperation(writeOp);
101-
} else {
102-
this.writeBatcher.add(writeOp);
99+
try {
100+
while (iterator.hasNext()) {
101+
DocumentWriteOperation writeOp = this.docBuilder.build(iterator.next());
102+
if (this.isStreamingFiles) {
103+
writeDocumentViaPutOperation(writeOp);
104+
} else {
105+
this.writeBatcher.add(writeOp);
106+
}
107+
}
108+
} finally {
109+
// This is needed for when files are being streamed into MarkLogic; gives a chance for the file reader to
110+
// close the associated InputStream.
111+
if (iterator instanceof Closeable) {
112+
IOUtils.closeQuietly((Closeable) iterator);
103113
}
104114
}
105115
}

src/main/java/com/marklogic/spark/writer/file/ArchiveFileIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,18 @@
1010
import com.marklogic.spark.reader.document.DocumentRowSchema;
1111
import com.marklogic.spark.reader.file.ArchiveFileReader;
1212
import com.marklogic.spark.writer.DocBuilder;
13+
import org.apache.commons.io.IOUtils;
1314
import org.apache.spark.sql.catalyst.InternalRow;
1415

16+
import java.io.Closeable;
1517
import java.util.Iterator;
1618

1719
/**
1820
* Provides an {@code Iterator} interface on top of an {@code ArchiveFileReader}, thereby allowing a
1921
* {@code DocumentRowConverter} to build sets of document inputs from an archive file without reading any content entry
2022
* into memory - thus supporting streaming of an archive.
2123
*/
22-
public class ArchiveFileIterator implements Iterator<DocBuilder.DocumentInputs> {
24+
public class ArchiveFileIterator implements Iterator<DocBuilder.DocumentInputs>, Closeable {
2325

2426
private final ArchiveFileReader archiveFileReader;
2527
private final Format documentFormat;
@@ -51,4 +53,9 @@ public DocBuilder.DocumentInputs next() {
5153
}
5254
return new DocBuilder.DocumentInputs(uri, contentHandle, null, metadata);
5355
}
56+
57+
@Override
58+
public void close() {
59+
IOUtils.closeQuietly(archiveFileReader);
60+
}
5461
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.file;
5+
6+
import com.marklogic.client.io.InputStreamHandle;
7+
import com.marklogic.spark.writer.DocBuilder;
8+
import org.apache.commons.io.IOUtils;
9+
10+
import java.io.Closeable;
11+
import java.util.Iterator;
12+
import java.util.stream.Stream;
13+
14+
/**
15+
* Exists solely to provide an implementation of {@code Closeable} so that the {@code InputStreamHandle} can be closed
16+
* after the corresponding document is written to MarkLogic.
17+
*/
18+
public class FileIterator implements Iterator<DocBuilder.DocumentInputs>, Closeable {
19+
20+
private final InputStreamHandle contentHandle;
21+
private final Iterator<DocBuilder.DocumentInputs> iterator;
22+
23+
public FileIterator(InputStreamHandle contentHandle, DocBuilder.DocumentInputs inputs) {
24+
this.contentHandle = contentHandle;
25+
this.iterator = Stream.of(inputs).iterator();
26+
}
27+
28+
@Override
29+
public boolean hasNext() {
30+
return this.iterator.hasNext();
31+
}
32+
33+
@Override
34+
public DocBuilder.DocumentInputs next() {
35+
return this.iterator.next();
36+
}
37+
38+
@Override
39+
public void close() {
40+
IOUtils.closeQuietly(contentHandle);
41+
}
42+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.writer.file;
5+
6+
import com.marklogic.client.io.Format;
7+
import com.marklogic.client.io.InputStreamHandle;
8+
import com.marklogic.spark.reader.file.GzipFileReader;
9+
import com.marklogic.spark.writer.DocBuilder;
10+
import org.apache.commons.io.IOUtils;
11+
12+
import java.io.Closeable;
13+
import java.util.Iterator;
14+
import java.util.stream.Stream;
15+
16+
/**
17+
* Exists solely to provide an implementation of {@code Closeable} so that the {@code GzipFileReader} can be closed
18+
* after the corresponding document is written to MarkLogic.
19+
*/
20+
public class GzipFileIterator implements Iterator<DocBuilder.DocumentInputs>, Closeable {
21+
22+
private final GzipFileReader gzipFileReader;
23+
private Iterator<DocBuilder.DocumentInputs> iterator;
24+
25+
public GzipFileIterator(GzipFileReader reader, Format documentFormat) {
26+
this.gzipFileReader = reader;
27+
reader.next();
28+
String uri = reader.get().getString(0);
29+
InputStreamHandle contentHandle = reader.getStreamingContentHandle();
30+
if (documentFormat != null) {
31+
contentHandle.withFormat(documentFormat);
32+
}
33+
this.iterator = Stream.of(new DocBuilder.DocumentInputs(uri, contentHandle, null, null)).iterator();
34+
}
35+
36+
@Override
37+
public boolean hasNext() {
38+
return this.iterator.hasNext();
39+
}
40+
41+
@Override
42+
public DocBuilder.DocumentInputs next() {
43+
return this.iterator.next();
44+
}
45+
46+
@Override
47+
public void close() {
48+
IOUtils.closeQuietly(gzipFileReader);
49+
}
50+
}

src/main/java/com/marklogic/spark/writer/file/ZipFileIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
import com.marklogic.spark.Util;
99
import com.marklogic.spark.reader.file.ZipFileReader;
1010
import com.marklogic.spark.writer.DocBuilder;
11+
import org.apache.commons.crypto.utils.IoUtils;
1112
import org.apache.spark.sql.catalyst.InternalRow;
1213

14+
import java.io.Closeable;
1315
import java.util.Iterator;
1416

15-
public class ZipFileIterator implements Iterator<DocBuilder.DocumentInputs> {
17+
public class ZipFileIterator implements Iterator<DocBuilder.DocumentInputs>, Closeable {
1618

1719
private final ZipFileReader zipFileReader;
1820
private final Format documentFormat;
@@ -43,4 +45,9 @@ public DocBuilder.DocumentInputs next() {
4345
}
4446
return new DocBuilder.DocumentInputs(uri, contentHandle, null, null);
4547
}
48+
49+
@Override
50+
public void close() {
51+
IoUtils.closeQuietly(zipFileReader);
52+
}
4653
}

src/test/java/com/marklogic/spark/reader/file/ReadGzipFilesTest.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,14 @@
33
*/
44
package com.marklogic.spark.reader.file;
55

6-
import com.fasterxml.jackson.databind.JsonNode;
7-
import com.marklogic.junit5.XmlNode;
86
import com.marklogic.spark.AbstractIntegrationTest;
97
import com.marklogic.spark.ConnectorException;
108
import com.marklogic.spark.Options;
119
import org.apache.spark.SparkException;
1210
import org.apache.spark.sql.Dataset;
1311
import org.apache.spark.sql.Row;
14-
import org.apache.spark.sql.SaveMode;
1512
import org.junit.jupiter.api.Test;
1613

17-
import java.io.ByteArrayInputStream;
18-
import java.io.ObjectInputStream;
1914
import java.util.List;
2015

2116
import static org.junit.jupiter.api.Assertions.*;
@@ -78,46 +73,6 @@ void dontAbortOnFailure() {
7873
"error for the non-gzipped mixed-files.zip file being logged as a warning but not causing a failure.");
7974
}
8075

81-
@Test
82-
void streamThreeGZIPFiles() throws Exception {
83-
Dataset<Row> dataset = newSparkSession().read()
84-
.format(CONNECTOR_IDENTIFIER)
85-
.option(Options.READ_FILES_COMPRESSION, "gzip")
86-
.option("recursiveFileLookup", "true")
87-
.option(Options.STREAM_FILES, true)
88-
.load("src/test/resources/gzip-files");
89-
90-
List<Row> rows = dataset.collectAsList();
91-
assertEquals(3, rows.size());
92-
for (Row row : rows) {
93-
assertFalse(row.isNullAt(0), "The URI column should be populated.");
94-
byte[] content = (byte[]) row.get(1);
95-
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content))) {
96-
FileContext fileContext = (FileContext) ois.readObject();
97-
assertNotNull(fileContext);
98-
}
99-
}
100-
101-
// Write the streaming files to MarkLogic.
102-
dataset.write().format(CONNECTOR_IDENTIFIER)
103-
.option(Options.STREAM_FILES, true)
104-
.option(Options.CLIENT_URI, makeClientUri())
105-
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
106-
.option(Options.WRITE_COLLECTIONS, "streamed-files")
107-
.option(Options.WRITE_URI_REPLACE, ".*gzip-files,'/gzip-files'")
108-
.mode(SaveMode.Append)
109-
.save();
110-
111-
assertCollectionSize("streamed-files", 3);
112-
XmlNode doc = readXmlDocument("/gzip-files/hello.xml");
113-
doc.assertElementValue("/hello", "world");
114-
115-
// Because each streamed file has to be sent via a PUT request, and the PUT endpoint does not allow spaces -
116-
// see MLE-17088 - the URI will be encoded.
117-
JsonNode node = readJsonDocument("/gzip-files/level1/level2/hello%20world.json");
118-
assertEquals("world", node.get("hello").asText());
119-
}
120-
12176
private void verifyRow(Row row, String expectedUriSuffix, String expectedContent) {
12277
String uri = row.getString(0);
12378
assertTrue(uri.endsWith(expectedUriSuffix), "Unexpected URI: " + uri);

src/test/java/com/marklogic/spark/reader/file/ReadGenericFilesStreamingTest.java renamed to src/test/java/com/marklogic/spark/reader/file/StreamGenericFilesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* into memory by postponing reading of the file until the writer phase, where it can then be streamed from disk into
2323
* MarkLogic.
2424
*/
25-
class ReadGenericFilesStreamingTest extends AbstractIntegrationTest {
25+
class StreamGenericFilesTest extends AbstractIntegrationTest {
2626

2727
@Test
2828
void stream() throws Exception {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.file;
5+
6+
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.marklogic.junit5.XmlNode;
8+
import com.marklogic.spark.AbstractIntegrationTest;
9+
import com.marklogic.spark.Options;
10+
import org.apache.spark.sql.Dataset;
11+
import org.apache.spark.sql.Row;
12+
import org.apache.spark.sql.SaveMode;
13+
import org.junit.jupiter.api.Test;
14+
15+
import java.io.ByteArrayInputStream;
16+
import java.io.ObjectInputStream;
17+
import java.util.List;
18+
19+
import static org.junit.jupiter.api.Assertions.*;
20+
21+
class StreamGzipFilesTest extends AbstractIntegrationTest {
22+
23+
@Test
24+
void streamThreeGZIPFiles() throws Exception {
25+
Dataset<Row> dataset = newSparkSession().read()
26+
.format(CONNECTOR_IDENTIFIER)
27+
.option(Options.READ_FILES_COMPRESSION, "gzip")
28+
.option("recursiveFileLookup", "true")
29+
.option(Options.STREAM_FILES, true)
30+
.load("src/test/resources/gzip-files");
31+
32+
List<Row> rows = dataset.collectAsList();
33+
assertEquals(3, rows.size());
34+
for (Row row : rows) {
35+
assertFalse(row.isNullAt(0), "The URI column should be populated.");
36+
byte[] content = (byte[]) row.get(1);
37+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content))) {
38+
FileContext fileContext = (FileContext) ois.readObject();
39+
assertNotNull(fileContext);
40+
}
41+
}
42+
43+
// Write the streaming files to MarkLogic.
44+
dataset.write().format(CONNECTOR_IDENTIFIER)
45+
.option(Options.STREAM_FILES, true)
46+
.option(Options.CLIENT_URI, makeClientUri())
47+
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
48+
.option(Options.WRITE_COLLECTIONS, "streamed-files")
49+
.option(Options.WRITE_URI_REPLACE, ".*gzip-files,'/gzip-files'")
50+
.mode(SaveMode.Append)
51+
.save();
52+
53+
assertCollectionSize("streamed-files", 3);
54+
XmlNode doc = readXmlDocument("/gzip-files/hello.xml");
55+
doc.assertElementValue("/hello", "world");
56+
57+
// Because each streamed file has to be sent via a PUT request, and the PUT endpoint does not allow spaces -
58+
// see MLE-17088 - the URI will be encoded.
59+
JsonNode node = readJsonDocument("/gzip-files/level1/level2/hello%20world.json");
60+
assertEquals("world", node.get("hello").asText());
61+
}
62+
}

0 commit comments

Comments
 (0)