Skip to content

Commit 5aaf883

Browse files
authored
Merge pull request #297 from marklogic/feature/17095-add-iterator
MLE-17095 Refactoring; changed row converter to return iterator
2 parents 88cfce3 + 2c1270b commit 5aaf883

File tree

7 files changed

+62
-73
lines changed

7 files changed

+62
-73
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,8 @@ public boolean next() {
5656
if (isLegacyFormat == null) {
5757
isLegacyFormat = !nextZipEntry.getName().endsWith(".metadata");
5858
}
59-
if (!isLegacyFormat) {
60-
return readMetadataFollowedByContentEntry();
61-
}
62-
63-
byte[] content = fileContext.readBytes(currentZipInputStream);
64-
if (content == null || content.length == 0) {
65-
return openNextFileAndReadNextEntry();
66-
}
67-
final String zipEntryName = nextZipEntry.getName();
68-
69-
byte[] metadataBytes = readMetadataEntry(zipEntryName);
70-
if (metadataBytes == null || metadataBytes.length == 0) {
71-
return openNextFileAndReadNextEntry();
72-
}
7359

74-
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
75-
metadata.fromBuffer(metadataBytes);
76-
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
77-
.withUri(zipEntryName).withContent(content).withMetadata(metadata)
78-
.buildRow();
79-
return true;
60+
return isLegacyFormat ? readContentFollowedByMetadata(nextZipEntry) : readMetadataFollowedByContent();
8061
} catch (IOException e) {
8162
String message = String.format("Unable to read archive file at %s; cause: %s", this.currentFilePath, e.getMessage());
8263
if (fileContext.isReadAbortOnFailure()) {
@@ -97,7 +78,35 @@ public void close() {
9778
IOUtils.closeQuietly(this.currentZipInputStream);
9879
}
9980

100-
private boolean readMetadataFollowedByContentEntry() throws IOException {
81+
/**
82+
* This is the Flux 1.0 "legacy" approach, where content was written first, followed by metadata. This does not
83+
* support streaming.
84+
*/
85+
private boolean readContentFollowedByMetadata(ZipEntry contentZipEntry) throws IOException {
86+
byte[] content = fileContext.readBytes(currentZipInputStream);
87+
if (content == null || content.length == 0) {
88+
return openNextFileAndReadNextEntry();
89+
}
90+
final String zipEntryName = contentZipEntry.getName();
91+
92+
byte[] metadataBytes = readMetadataEntry(zipEntryName);
93+
if (metadataBytes == null || metadataBytes.length == 0) {
94+
return openNextFileAndReadNextEntry();
95+
}
96+
97+
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
98+
metadata.fromBuffer(metadataBytes);
99+
100+
this.nextRowToReturn = new DocumentRowBuilder(this.metadataCategories)
101+
.withUri(zipEntryName).withContent(content).withMetadata(metadata)
102+
.buildRow();
103+
return true;
104+
}
105+
106+
/**
107+
* This is the Flux 1.1+ approach, where the metadata entry is written first. This supports streaming.
108+
*/
109+
private boolean readMetadataFollowedByContent() throws IOException {
101110
byte[] metadataBytes = fileContext.readBytes(currentZipInputStream);
102111
if (metadataBytes == null || metadataBytes.length == 0) {
103112
return openNextFileAndReadNextEntry();

src/main/java/com/marklogic/spark/writer/ArbitraryRowConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import org.apache.spark.sql.types.StructType;
2121

2222
import java.util.ArrayList;
23+
import java.util.Iterator;
2324
import java.util.List;
24-
import java.util.Optional;
2525
import java.util.UUID;
26+
import java.util.stream.Stream;
2627

2728
/**
2829
* Handles building a document from an "arbitrary" row - i.e. one with an unknown schema, where the row will be
@@ -53,7 +54,7 @@ class ArbitraryRowConverter implements RowConverter {
5354
}
5455

5556
@Override
56-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
57+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
5758
String initialUri = null;
5859
if (this.filePathIndex > -1) {
5960
initialUri = row.getString(this.filePathIndex) + "/" + UUID.randomUUID();
@@ -103,7 +104,7 @@ else if (deserializedJson != null) {
103104
}
104105
}
105106

106-
return Optional.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null));
107+
return Stream.of(new DocBuilder.DocumentInputs(initialUri, contentHandle, uriTemplateValues, null)).iterator();
107108
}
108109

109110
@Override

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.marklogic.client.io.DocumentMetadataHandle;
1111
import com.marklogic.client.io.Format;
1212
import com.marklogic.client.io.InputStreamHandle;
13-
import com.marklogic.client.io.marker.AbstractWriteHandle;
1413
import com.marklogic.spark.ConnectorException;
1514
import com.marklogic.spark.Options;
1615
import com.marklogic.spark.reader.document.DocumentRowSchema;
@@ -21,8 +20,9 @@
2120
import java.io.IOException;
2221
import java.io.ObjectInputStream;
2322
import java.util.ArrayList;
23+
import java.util.Iterator;
2424
import java.util.List;
25-
import java.util.Optional;
25+
import java.util.stream.Stream;
2626

2727
/**
2828
* Knows how to build a document from a row corresponding to our {@code DocumentRowSchema}.
@@ -42,31 +42,24 @@ class DocumentRowConverter implements RowConverter {
4242
}
4343

4444
@Override
45-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
45+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
4646
final String uri = row.getString(0);
4747

4848
final boolean isNakedProperties = row.isNullAt(1);
4949
if (isNakedProperties) {
5050
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
51-
return Optional.of(new DocBuilder.DocumentInputs(uri, null, null, metadata));
51+
return Stream.of(new DocBuilder.DocumentInputs(uri, null, null, metadata)).iterator();
5252
}
5353

54-
Content content = this.isStreamingFromFiles ?
55-
readContentFromFile(uri, row) :
56-
readContentFromRow(uri, row);
57-
58-
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
59-
return Optional.of(new DocBuilder.DocumentInputs(
60-
uri, content.contentHandle, content.uriTemplateValues, metadata)
61-
);
54+
return this.isStreamingFromFiles ? readContentFromFile(uri, row) : readContentFromRow(uri, row);
6255
}
6356

6457
@Override
6558
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
6659
return new ArrayList<>();
6760
}
6861

69-
private Content readContentFromRow(String uri, InternalRow row) {
62+
private Iterator<DocBuilder.DocumentInputs> readContentFromRow(String uri, InternalRow row) {
7063
BytesHandle bytesHandle = new BytesHandle(row.getBinary(1));
7164
if (this.documentFormat != null) {
7265
bytesHandle.withFormat(this.documentFormat);
@@ -76,7 +69,8 @@ private Content readContentFromRow(String uri, InternalRow row) {
7669
String format = row.isNullAt(2) ? null : row.getString(2);
7770
uriTemplateValues = deserializeContentToJson(uri, bytesHandle, format);
7871
}
79-
return new Content(bytesHandle, uriTemplateValues);
72+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
73+
return Stream.of(new DocBuilder.DocumentInputs(uri, bytesHandle, uriTemplateValues, metadata)).iterator();
8074
}
8175

8276
private JsonNode deserializeContentToJson(String initialUri, BytesHandle contentHandle, String format) {
@@ -97,7 +91,7 @@ private JsonNode deserializeContentToJson(String initialUri, BytesHandle content
9791
* In a scenario where the user wants to stream a file into MarkLogic, the content column will contain a serialized
9892
* instance of {@code FileContext}, which is used to stream the file into a {@code InputStreamHandle}.
9993
*/
100-
private Content readContentFromFile(String filePath, InternalRow row) {
94+
private Iterator<DocBuilder.DocumentInputs> readContentFromFile(String filePath, InternalRow row) {
10195
byte[] bytes = row.getBinary(1);
10296
String filePathInErrorMessage = filePath;
10397
try {
@@ -109,27 +103,10 @@ private Content readContentFromFile(String filePath, InternalRow row) {
109103
if (this.documentFormat != null) {
110104
streamHandle.withFormat(this.documentFormat);
111105
}
112-
return new Content(streamHandle, null);
106+
DocumentMetadataHandle metadata = DocumentRowSchema.makeDocumentMetadata(row);
107+
return Stream.of(new DocBuilder.DocumentInputs(filePath, streamHandle, null, metadata)).iterator();
113108
} catch (Exception e) {
114109
throw new ConnectorException(String.format("Unable to read from file %s; cause: %s", filePathInErrorMessage, e.getMessage()));
115110
}
116111
}
117-
118-
private static class Content {
119-
private final AbstractWriteHandle contentHandle;
120-
private final JsonNode uriTemplateValues;
121-
122-
public Content(AbstractWriteHandle contentHandle, JsonNode uriTemplateValues) {
123-
this.contentHandle = contentHandle;
124-
this.uriTemplateValues = uriTemplateValues;
125-
}
126-
127-
AbstractWriteHandle getContentHandle() {
128-
return contentHandle;
129-
}
130-
131-
JsonNode getUriTemplateValues() {
132-
return uriTemplateValues;
133-
}
134-
}
135112
}

src/main/java/com/marklogic/spark/writer/FileRowConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
import java.io.IOException;
1616
import java.util.ArrayList;
17+
import java.util.Iterator;
1718
import java.util.List;
1819
import java.util.Optional;
20+
import java.util.stream.Stream;
1921

2022
/**
2123
* Knows how to build a document from a row corresponding to our {@code FileRowSchema}.
@@ -33,12 +35,12 @@ class FileRowConverter implements RowConverter {
3335
}
3436

3537
@Override
36-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
38+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
3739
final String path = row.getString(writeContext.getFileSchemaPathPosition());
3840
BytesHandle contentHandle = new BytesHandle(row.getBinary(writeContext.getFileSchemaContentPosition()));
3941
forceFormatIfNecessary(contentHandle);
4042
Optional<JsonNode> uriTemplateValues = deserializeContentToJson(path, contentHandle, row);
41-
return Optional.of(new DocBuilder.DocumentInputs(path, contentHandle, uriTemplateValues.orElse(null), null));
43+
return Stream.of(new DocBuilder.DocumentInputs(path, contentHandle, uriTemplateValues.orElse(null), null)).iterator();
4244
}
4345

4446
@Override

src/main/java/com/marklogic/spark/writer/RowConverter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,20 @@
55

66
import org.apache.spark.sql.catalyst.InternalRow;
77

8+
import java.util.Iterator;
89
import java.util.List;
9-
import java.util.Optional;
1010

1111
/**
1212
* Strategy interface for how a Spark row is converted into a set of inputs for writing a document to MarkLogic.
1313
*/
1414
public interface RowConverter {
1515

1616
/**
17-
* An implementation can return an empty Optional, which will happen when the row will be used with other rows to
18-
* form a document.
19-
*
2017
* @param row
21-
* @return
18+
* @return an iterator of inputs for creating documents to write to MarkLogic. An iterator is used to allow the
19+
* implementor to return multiple documents if necessary.
2220
*/
23-
Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row);
21+
Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row);
2422

2523
/**
2624
* Called when {@code WriteBatcherDataWriter} has no more rows to send, but the implementation may have one or

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.ArrayList;
33+
import java.util.Iterator;
3334
import java.util.List;
34-
import java.util.Optional;
3535
import java.util.Set;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737
import java.util.concurrent.atomic.AtomicReference;
@@ -92,9 +92,10 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
9292
@Override
9393
public void write(InternalRow row) {
9494
throwWriteFailureIfExists();
95-
Optional<DocBuilder.DocumentInputs> document = rowConverter.convertRow(row);
96-
if (document.isPresent()) {
97-
DocumentWriteOperation writeOp = this.docBuilder.build(document.get());
95+
96+
Iterator<DocBuilder.DocumentInputs> iterator = rowConverter.convertRow(row);
97+
while (iterator.hasNext()) {
98+
DocumentWriteOperation writeOp = this.docBuilder.build(iterator.next());
9899
if (this.isStreamingFiles) {
99100
writeDocumentViaPutOperation(writeOp);
100101
} else {

src/main/java/com/marklogic/spark/writer/rdf/RdfRowConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.util.*;
1616
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1718

1819
/**
1920
* Converts each row into a sem:triple element, which is then added to a sem:triples XML document associated with a
@@ -60,7 +61,7 @@ public RdfRowConverter(WriteContext writeContext) {
6061
}
6162

6263
@Override
63-
public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
64+
public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
6465
final String graph = determineGraph(row);
6566
graphs.add(graph);
6667

@@ -75,9 +76,9 @@ public Optional<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
7576
triplesDocument.addTriple(row);
7677
if (triplesDocument.hasMaxTriples()) {
7778
triplesDocuments.remove(graph);
78-
return Optional.of(triplesDocument.buildDocument());
79+
return Stream.of(triplesDocument.buildDocument()).iterator();
7980
}
80-
return Optional.empty();
81+
return Stream.<DocBuilder.DocumentInputs>empty().iterator();
8182
}
8283

8384
/**

0 commit comments

Comments
 (0)