Skip to content

Commit d08b0e1

Browse files
authored
Merge pull request #353 from marklogic/feature/row-converter-fix
Refactored RowConverter so that an Iterator is always returned
2 parents d4751c1 + c82e851 commit d08b0e1

File tree

7 files changed

+73
-57
lines changed

7 files changed

+73
-57
lines changed

src/main/java/com/marklogic/spark/writer/ArbitraryRowConverter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import org.apache.spark.sql.types.StructField;
2020
import org.apache.spark.sql.types.StructType;
2121

22-
import java.util.ArrayList;
2322
import java.util.Iterator;
24-
import java.util.List;
2523
import java.util.UUID;
2624
import java.util.stream.Stream;
2725

@@ -108,8 +106,8 @@ else if (deserializedJson != null) {
108106
}
109107

110108
@Override
111-
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
112-
return new ArrayList<>();
109+
public Iterator<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
110+
return Stream.<DocBuilder.DocumentInputs>empty().iterator();
113111
}
114112

115113
/**

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import java.io.ByteArrayInputStream;
2525
import java.io.IOException;
2626
import java.io.ObjectInputStream;
27-
import java.util.ArrayList;
2827
import java.util.Iterator;
29-
import java.util.List;
3028
import java.util.stream.Stream;
3129

3230
/**
@@ -60,8 +58,8 @@ public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
6058
}
6159

6260
@Override
63-
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
64-
return new ArrayList<>();
61+
public Iterator<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
62+
return Stream.<DocBuilder.DocumentInputs>empty().iterator();
6563
}
6664

6765
private Iterator<DocBuilder.DocumentInputs> readContentFromRow(String uri, InternalRow row) {

src/main/java/com/marklogic/spark/writer/FileRowConverter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
import org.apache.spark.sql.types.DataTypes;
1414

1515
import java.io.IOException;
16-
import java.util.ArrayList;
1716
import java.util.Iterator;
18-
import java.util.List;
1917
import java.util.Optional;
2018
import java.util.stream.Stream;
2119

@@ -44,8 +42,8 @@ public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
4442
}
4543

4644
@Override
47-
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
48-
return new ArrayList<>();
45+
public Iterator<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
46+
return Stream.<DocBuilder.DocumentInputs>empty().iterator();
4947
}
5048

5149
// Telling Sonar to not tell us to remove this code, since we can't until 3.0.

src/main/java/com/marklogic/spark/writer/RowConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.apache.spark.sql.catalyst.InternalRow;
77

88
import java.util.Iterator;
9-
import java.util.List;
109

1110
/**
1211
* Strategy interface for how a Spark row is converted into a set of inputs for writing a document to MarkLogic.
@@ -26,5 +25,5 @@ public interface RowConverter {
2625
*
2726
* @return
2827
*/
29-
List<DocBuilder.DocumentInputs> getRemainingDocumentInputs();
28+
Iterator<DocBuilder.DocumentInputs> getRemainingDocumentInputs();
3029
}

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.io.Closeable;
3434
import java.util.ArrayList;
3535
import java.util.Iterator;
36-
import java.util.List;
3736
import java.util.Set;
3837
import java.util.concurrent.atomic.AtomicInteger;
3938
import java.util.concurrent.atomic.AtomicReference;
@@ -99,9 +98,49 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
9998

10099
@Override
101100
public void write(InternalRow row) {
101+
throwWriteFailureIfExists();
102+
buildAndWriteDocuments(rowConverter.convertRow(row));
103+
}
104+
105+
@Override
106+
public WriterCommitMessage commit() {
107+
// The RDF row converter may have "pending" rows as it has not yet reached the max number of triples to include
108+
// in a document. Those are retrieved here.
109+
buildAndWriteDocuments(rowConverter.getRemainingDocumentInputs());
110+
111+
this.writeBatcher.flushAndWait();
112+
102113
throwWriteFailureIfExists();
103114

104-
Iterator<DocBuilder.DocumentInputs> iterator = rowConverter.convertRow(row);
115+
Set<String> graphs = getGraphNames();
116+
return new CommitMessage(successItemCount.get(), failedItemCount.get(), graphs);
117+
}
118+
119+
@Override
120+
public void abort() {
121+
Util.MAIN_LOGGER.warn("Abort called.");
122+
stopJobAndRelease();
123+
closeArchiveWriter();
124+
Util.MAIN_LOGGER.info("Finished abort.");
125+
}
126+
127+
@Override
128+
public void close() {
129+
if (logger.isDebugEnabled()) {
130+
logger.debug("Close called.");
131+
}
132+
stopJobAndRelease();
133+
closeArchiveWriter();
134+
}
135+
136+
/**
137+
* Processes the document inputs returned by the RowConverter for a single row. A row can return multiple instances
138+
* of document inputs. Each instance is run through the document processor if it's not null, which can produce
139+
* additional documents.
140+
*
141+
* @param iterator
142+
*/
143+
private void buildAndWriteDocuments(Iterator<DocBuilder.DocumentInputs> iterator) {
105144
try {
106145
iterator.forEachRemaining(documentInputs -> {
107146
DocumentWriteOperation sourceDocument = this.docBuilder.build(documentInputs);
@@ -128,44 +167,16 @@ private void writeDocument(DocumentWriteOperation writeOp) {
128167
}
129168
}
130169

131-
@Override
132-
public WriterCommitMessage commit() {
133-
List<DocBuilder.DocumentInputs> documentInputs = rowConverter.getRemainingDocumentInputs();
134-
if (documentInputs != null) {
135-
documentInputs.forEach(inputs -> {
136-
DocumentWriteOperation writeOp = this.docBuilder.build(inputs);
137-
this.writeBatcher.add(writeOp);
138-
});
139-
}
140-
this.writeBatcher.flushAndWait();
141-
142-
throwWriteFailureIfExists();
143-
144-
// Need this hack so that the complete set of graphs can be reported back to MarkLogicWrite, which handles
145-
// creating the graphs after all documents have been written.
146-
Set<String> graphs = null;
147-
if (this.rowConverter instanceof RdfRowConverter) {
148-
graphs = ((RdfRowConverter) rowConverter).getGraphs();
149-
}
150-
151-
return new CommitMessage(successItemCount.get(), failedItemCount.get(), graphs);
152-
}
153-
154-
@Override
155-
public void abort() {
156-
Util.MAIN_LOGGER.warn("Abort called.");
157-
stopJobAndRelease();
158-
closeArchiveWriter();
159-
Util.MAIN_LOGGER.info("Finished abort.");
160-
}
161-
162-
@Override
163-
public void close() {
164-
if (logger.isDebugEnabled()) {
165-
logger.debug("Close called.");
166-
}
167-
stopJobAndRelease();
168-
closeArchiveWriter();
170+
/**
171+
* This provides a mechanism for capturing the list of graph names detected while processing RDF rows. These need
172+
* to be sent back to MarkLogicWrite, where each graph is written to MarkLogic as a graph document.
173+
*
174+
* @return
175+
*/
176+
private Set<String> getGraphNames() {
177+
return this.rowConverter instanceof RdfRowConverter ?
178+
((RdfRowConverter) rowConverter).getGraphs() :
179+
null;
169180
}
170181

171182
private void addBatchListeners(WriteBatcher writeBatcher) {

src/main/java/com/marklogic/spark/writer/rdf/RdfRowConverter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.slf4j.LoggerFactory;
1414

1515
import java.util.*;
16-
import java.util.stream.Collectors;
1716
import java.util.stream.Stream;
1817

1918
/**
@@ -87,10 +86,10 @@ public Iterator<DocBuilder.DocumentInputs> convertRow(InternalRow row) {
8786
* @return
8887
*/
8988
@Override
90-
public List<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
89+
public Iterator<DocBuilder.DocumentInputs> getRemainingDocumentInputs() {
9190
return this.triplesDocuments.values().stream()
9291
.map(TriplesDocument::buildDocument)
93-
.collect(Collectors.toList());
92+
.iterator();
9493
}
9594

9695
/**

src/test/java/com/marklogic/spark/AbstractIntegrationTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,19 @@ public void closeSparkSession() {
5858
if (sparkSession != null) {
5959
sparkSession.close();
6060
}
61+
smallDelayUntilNextTest();
62+
}
63+
64+
// Tell Sonar not to worry about this for now.
65+
@SuppressWarnings({"java:S2925"})
66+
private void smallDelayUntilNextTest() {
67+
// Hopefully a temporary hack to see if we get fewer random failures on Jenkins due to connectivity issues that
68+
// are likely due to Docker restarting MarkLogic due to insufficient memory.
69+
try {
70+
Thread.sleep(100);
71+
} catch (InterruptedException e) {
72+
// No need to handle.
73+
}
6174
}
6275

6376
@Override

0 commit comments

Comments
 (0)