Skip to content

Commit 2c1270b

Browse files
committed
MLE-17095 Refactoring; changed row converter to return iterator
This allows for the archive streaming feature to return more than one document per Spark row. Has no effect on existing behavior. No longer need the `Content` class, so removed that. Also did some refactoring in `ArchiveFileReader` to shorten methods and make it more readable.
1 parent 88cfce3 commit 2c1270b

File tree

7 files changed

+62
-73
lines changed

7 files changed

+62
-73
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,8 @@ public boolean next() {
5656
if (isLegacyFormat == null) {
5757
isLegacyFormat = !nextZipEntry.getName().endsWith(".metadata");
5858
}
59-
if (!isLegacyFormat) {
60-
return readMetadataFollowedByContentEntry();
61-
}
62-
63-
byte[] content = fileContext.readBytes(currentZipInputStream);
64-
if (content == null || content.length == 0) {
65-
return openNextFileAndReadNextEntry();
66-
}
67-
final String zipEntryName = nextZipEntry.getName();
68-
69-
byte[] metadataBytes = readMetadataEntry(zipEntryName);
70-
if (metadataBytes == null || metadataBytes.length == 0) {
71-
return openNextFileAndReadNextEntry();
72-
}
7359

74-
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
75-
metadata.fromBuffer(metadataBytes);
76-
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
77-
.withUri(zipEntryName).withContent(content).withMetadata(metadata)
78-
.buildRow();
79-
return true;
60+
return isLegacyFormat ? readContentFollowedByMetadata(nextZipEntry) : readMetadataFollowedByContent();
8061
} catch (IOException e) {
8162
String message = String.format("Unable to read archive file at %s; cause: %s", this.currentFilePath, e.getMessage());
8263
if (fileContext.isReadAbortOnFailure()) {
@@ -97,7 +78,35 @@ public void close() {
9778
IOUtils.closeQuietly(this.currentZipInputStream);
9879
}
9980

100-
private boolean readMetadataFollowedByContentEntry() throws IOException {
81+
/**
82+
* This is the Flux 1.0 "legacy" approach, where content was written first, followed by metadata. This does not
83+
* support streaming.
84+
*/
85+
private boolean readContentFollowedByMetadata(ZipEntry contentZipEntry) throws IOException {
86+
byte[] content = fileContext.readBytes(currentZipInputStream);
87+
if (content == null || content.length == 0) {
88+
return openNextFileAndReadNextEntry();
89+
}
90+
final String zipEntryName = contentZipEntry.getName();
91+
92+
byte[] metadataBytes = readMetadataEntry(zipEntryName);
93+
if (metadataBytes == null || metadataBytes.length == 0) {
94+
return openNextFileAndReadNextEntry();
95+
}
96+
97+
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
98+
metadata.fromBuffer(metadataBytes);
99+
100+
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
101+
.withUri(zipEntryName).withContent(content).withMetadata(metadata)
102+
.buildRow();
103+
return true;
104+
}
105+
106+
/**
107+
* This is the Flux 1.1+ approach, where the metadata entry is written first. This supports streaming.
108+
*/
109+
private boolean readMetadataFollowedByContent() throws IOException {
101110
byte[] metadataBytes = fileContext.readBytes(currentZipInputStream);
102111
if (metadataBytes == null || metadataBytes.length == 0) {
103112
return openNextFileAndReadNextEntry();

src/main/java/com/marklogic/spark/writer/ArbitraryRowConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import org.apache.spark.sql.types.StructType;
2121

2222
import java.util.ArrayList;
23+
import java.util.Iterator;
2324
import java.util.List;
24-
import java.util.Optional;
2525
import java.util.UUID;
26+
import java.util.stream.Stream;
2627

2728
/**
2829
* Handles building a document from an "arbitrary" row - i.e. one with an unknown schema, where the row will be
@@ -53,7 +54,7 @@ class ArbitraryRowConverter implements RowConverter {
5354
}
5455

5556
@Override
56-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
57+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
5758
String initialUri = null;
5859
if (this.filePathIndex > -1) {
5960
initialUri = row.getString(this.filePathIndex) + "/" + UUID.randomUUID();
@@ -103,7 +104,7 @@ else if (deserializedJson != null) {
103104
}
104105
}
105106

106-
return Optional.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null));
107+
return Stream.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null)).iterator();
107108
}
108109

109110
@Override

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.marklogic.client.io.DocumentMetadataHandle;
1111
import com.marklogic.client.io.Format;
1212
import com.marklogic.client.io.InputStreamHandle;
13-
import com.marklogic.client.io.marker.AbstractWriteHandle;
1413
import com.marklogic.spark.ConnectorException;
1514
import com.marklogic.spark.Options;
1615
import com.marklogic.spark.reader.document.DocumentRowSchema;
@@ -21,8 +20,9 @@
2120
import java.io.IOException;
2221
import java.io.ObjectInputStream;
2322
import java.util.ArrayList;
23+
import java.util.Iterator;
2424
import java.util.List;
25-
import java.util.Optional;
25+
import java.util.stream.Stream;
2626

2727
/**
2828
* Knows how to build a document from a row corresponding to our {@code DocumentRowSchema}.
@@ -42,31 +42,24 @@ class DocumentRowConverter implements RowConverter {
4242
}
4343

4444
@Override
45-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
45+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
4646
final String uri = row.getString(0);
4747

4848
final boolean isNakedProperties = row.isNullAt(1);
4949
if (isNakedProperties) {
5050
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
51-
return Optional.of(new DocBuilder.DocumentInputs(uri, null, null, metadata));
51+
return Stream.of(new DocBuilder.DocumentInputs(uri, null, null, metadata)).iterator();
5252
}
5353

54-
Content content = this.isStreamingFromFiles ?
55-
readContentFromFile(uri, row) :
56-
readContentFromRow(uri, row);
57-
58-
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
59-
return Optional.of(new DocBuilder.DocumentInputs(
60-
uri, content.contentHandle, content.uriTemplateValues, metadata)
61-
);
54+
return this.isStreamingFromFiles ? readContentFromFile(uri, row) : readContentFromRow(uri, row);
6255
}
6356

6457
@Override
6558
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
6659
return new ArrayList<>();
6760
}
6861

69-
private Content readContentFromRow(String uri, InternalRow row) {
62+
private Iterator<DocBuilder.DocumentInputs> readContentFromRow(String uri, InternalRow row) {
7063
BytesHandle bytesHandle = new BytesHandle(row.getBinary(1));
7164
if (this.documentFormat != null) {
7265
bytesHandle.withFormat(this.documentFormat);
@@ -76,7 +69,8 @@ private Content readContentFromRow(String uri, InternalRow row) {
7669
String format = row.isNullAt(2) ? null : row.getString(2);
7770
uriTemplateValues = deserializeContentToJson(uri, bytesHandle, format);
7871
}
79-
return new Content(bytesHandle, uriTemplateValues);
72+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
73+
return Stream.of(new DocBuilder.DocumentInputs(uri, bytesHandle, uriTemplateValues, metadata)).iterator();
8074
}
8175

8276
private JsonNode deserializeContentToJson(String initialUri, BytesHandle contentHandle, String format) {
@@ -97,7 +91,7 @@ private JsonNode deserializeContentToJson(String initialUri, BytesHandle content
9791
* In a scenario where the user wants to stream a file into MarkLogic, the content column will contain a serialized
9892
* instance of {@code FileContext}, which is used to stream the file into a {@code InputStreamHandle}.
9993
*/
100-
private Content readContentFromFile(String filePath, InternalRow row) {
94+
private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath, InternalRow row) {
10195
byte[] bytes = row.getBinary(1);
10296
String filePathInErrorMessage = filePath;
10397
try {
@@ -109,27 +103,10 @@ private Content readContentFromFile(String filePath, InternalRow row) {
109103
if (this.documentFormat != null) {
110104
streamHandle.withFormat(this.documentFormat);
111105
}
112-
return new Content(streamHandle, null);
106+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
107+
return Stream.of(new DocBuilder.DocumentInputs(filePath, streamHandle, null, metadata)).iterator();
113108
} catch (Exception e) {
114109
throw new ConnectorException(String.format("Unable to read from file %s; cause: %s", filePathInErrorMessage, e.getMessage()));
115110
}
116111
}
117-
118-
private static class Content {
119-
private final AbstractWriteHandle contentHandle;
120-
private final JsonNode uriTemplateValues;
121-
122-
public Content(AbstractWriteHandle contentHandle, JsonNode uriTemplateValues) {
123-
this.contentHandle = contentHandle;
124-
this.uriTemplateValues = uriTemplateValues;
125-
}
126-
127-
AbstractWriteHandle getContentHandle() {
128-
return contentHandle;
129-
}
130-
131-
JsonNode getUriTemplateValues() {
132-
return uriTemplateValues;
133-
}
134-
}
135112
}

src/main/java/com/marklogic/spark/writer/FileRowConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
import java.io.IOException;
1616
import java.util.ArrayList;
17+
import java.util.Iterator;
1718
import java.util.List;
1819
import java.util.Optional;
20+
import java.util.stream.Stream;
1921

2022
/**
2123
* Knows how to build a document from a row corresponding to our {@code FileRowSchema}.
@@ -33,12 +35,12 @@ class FileRowConverter implements RowConverter {
3335
}
3436

3537
@Override
36-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
38+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
3739
final String path = row.getString(writeContext.getFileSchemaPathPosition());
3840
BytesHandle contentHandle = new BytesHandle(row.getBinary(writeContext.getFileSchemaContentPosition()));
3941
forceFormatIfNecessary(contentHandle);
4042
Optional<JsonNode> uriTemplateValues = deserializeContentToJson(path, contentHandle, row);
41-
return Optional.of(new DocBuilder.DocumentInputs(path, contentHandle, uriTemplateValues.orElse(null), null));
43+
return Stream.of(new DocBuilder.DocumentInputs(path, contentHandle, uriTemplateValues.orElse(null), null)).iterator();
4244
}
4345

4446
@Override

src/main/java/com/marklogic/spark/writer/RowConverter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,20 @@
55

66
import org.apache.spark.sql.catalyst.InternalRow;
77

8+
import java.util.Iterator;
89
import java.util.List;
9-
import java.util.Optional;
1010

1111
/**
1212
* Strategy interface for how a Spark row is converted into a set of inputs for writing a document to MarkLogic.
1313
*/
1414
public interface RowConverter {
1515

1616
/**
17-
* An implementation can return an empty Optional, which will happen when the row will be used with other rows to
18-
* form a document.
19-
*
2017
* @param row
21-
* @return
18+
* @return an iterator of inputs for creating documents to write to MarkLogic. An iterator is used to allow the
19+
* implementor to return multiple documents if necessary.
2220
*/
23-
Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row);
21+
Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row);
2422

2523
/**
2624
* Called when {@code WriteBatcherDataWriter} has no more rows to send, but the implementation may have one or

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.ArrayList;
33+
import java.util.Iterator;
3334
import java.util.List;
34-
import java.util.Optional;
3535
import java.util.Set;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737
import java.util.concurrent.atomic.AtomicReference;
@@ -92,9 +92,10 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
9292
@Override
9393
public void write(InternalRow row) {
9494
throwWriteFailureIfExists();
95-
Optional<DocBuilder.DocumentInputs> document = rowConverter.convertRow(row);
96-
if (document.isPresent()) {
97-
DocumentWriteOperation writeOp = this.docBuilder.build(document.get());
95+
96+
Iterator<DocBuilder.DocumentInputs> iterator = rowConverter.convertRow(row);
97+
while (iterator.hasNext()) {
98+
DocumentWriteOperation writeOp = this.docBuilder.build(iterator.next());
9899
if (this.isStreamingFiles) {
99100
writeDocumentViaPutOperation(writeOp);
100101
} else {

src/main/java/com/marklogic/spark/writer/rdf/RdfRowConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.util.*;
1616
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1718

1819
/**
1920
* Converts each row into a sem:triple element, which is then added to a sem:triples XML document associated with a
@@ -60,7 +61,7 @@ public RdfRowConverter(WriteContext writeContext) {
6061
}
6162

6263
@Override
63-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
64+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
6465
final String graph = determineGraph(row);
6566
graphs.add(graph);
6667

@@ -75,9 +76,9 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
7576
triplesDocument.addTriple(row);
7677
if (triplesDocument.hasMaxTriples()) {
7778
triplesDocuments.remove(graph);
78-
return Optional.of(triplesDocument.buildDocument());
79+
return Stream.of(triplesDocument.buildDocument()).iterator();
7980
}
80-
return Optional.empty();
81+
return Stream.<DocBuilder.DocumentInputs>empty().iterator();
8182
}
8283

8384
/**

0 commit comments

Comments
 (0)