Skip to content

Commit 091fa7a

Browse files
authored
Merge pull request #290 from marklogic/feature/streaming-read-fix
Optimized streaming read
2 parents 8e82315 + dfbfb88 commit 091fa7a

File tree

3 files changed

+168
-2
lines changed

3 files changed

+168
-2
lines changed

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class DocumentContext extends ContextSupport {
2828
Set<DocumentManager.Metadata> getRequestedMetadata() {
2929
Set<DocumentManager.Metadata> set = new HashSet<>();
3030
if (hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {
31-
for (String category : getProperties().get(Options.READ_DOCUMENTS_CATEGORIES).split(",")) {
31+
for (String category : getStringOption(Options.READ_DOCUMENTS_CATEGORIES).split(",")) {
3232
if ("content".equalsIgnoreCase(category)) {
3333
continue;
3434
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class ForestReader implements PartitionReader<InternalRow> {
4040
private final Set<DocumentManager.Metadata> requestedMetadata;
4141
private final boolean contentWasRequested;
4242
private final Integer limit;
43+
private final boolean isStreamingFiles;
4344

4445
// Only used for logging.
4546
private final ForestPartition forestPartition;
@@ -53,6 +54,7 @@ class ForestReader implements PartitionReader<InternalRow> {
5354
ForestReader(ForestPartition forestPartition, DocumentContext context) {
5455
this.forestPartition = forestPartition;
5556
this.limit = context.getLimit();
57+
this.isStreamingFiles = "true".equalsIgnoreCase(context.getStringOption(Options.STREAM_FILES));
5658

5759
DatabaseClient client = context.isDirectConnection() ?
5860
context.connectToMarkLogic(forestPartition.getHost()) :
@@ -99,7 +101,9 @@ public boolean next() {
99101
}
100102
return false;
101103
}
102-
this.currentDocumentPage = readPage(uris);
104+
105+
// When streaming, we don't want to retrieve the documents yet - they'll be retrieved in the writer phase.
106+
this.currentDocumentPage = this.isStreamingFiles ? new UrisPage(uris.iterator()) : readPage(uris);
103107
}
104108

105109
return currentDocumentPage.hasNext();
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright © 2024 MarkLogic Corporation. All Rights Reserved.
3+
*/
4+
package com.marklogic.spark.reader.document;
5+
6+
import com.marklogic.client.document.DocumentDescriptor;
7+
import com.marklogic.client.document.DocumentPage;
8+
import com.marklogic.client.document.DocumentRecord;
9+
import com.marklogic.client.io.Format;
10+
import com.marklogic.client.io.marker.AbstractReadHandle;
11+
import com.marklogic.client.io.marker.DocumentMetadataReadHandle;
12+
13+
import java.util.Iterator;
14+
15+
/**
16+
* Used for streaming documents from MarkLogic to files. Allows {@code ForestReader} to avoid the additional call to
17+
* MarkLogic to retrieve documents - which we don't want in the reader phase when streaming - while still depending
18+
* on {@code DocumentPage} and {@code DocumentRecord} as abstractions.
19+
*/
20+
class UrisPage implements DocumentPage {
21+
22+
private Iterator<String> uris;
23+
24+
UrisPage(Iterator<String> uris) {
25+
this.uris = uris;
26+
}
27+
28+
@Override
29+
public <T extends AbstractReadHandle> T nextContent(T contentHandle) {
30+
return null;
31+
}
32+
33+
@Override
34+
public void close() {
35+
// Nothing to do here.
36+
}
37+
38+
@Override
39+
public Iterator<DocumentRecord> iterator() {
40+
return null;
41+
}
42+
43+
@Override
44+
public boolean hasNext() {
45+
return uris.hasNext();
46+
}
47+
48+
@Override
49+
public DocumentRecord next() {
50+
// This is the only method that ForestReader will invoke.
51+
return new UriRecord(uris.next());
52+
}
53+
54+
@Override
55+
public long getStart() {
56+
return 0;
57+
}
58+
59+
@Override
60+
public long getPageSize() {
61+
return 0;
62+
}
63+
64+
@Override
65+
public long getTotalSize() {
66+
return 0;
67+
}
68+
69+
@Override
70+
public long size() {
71+
return 0;
72+
}
73+
74+
@Override
75+
public long getTotalPages() {
76+
return 0;
77+
}
78+
79+
@Override
80+
public boolean hasContent() {
81+
return false;
82+
}
83+
84+
@Override
85+
public boolean hasNextPage() {
86+
return false;
87+
}
88+
89+
@Override
90+
public boolean hasPreviousPage() {
91+
return false;
92+
}
93+
94+
@Override
95+
public long getPageNumber() {
96+
return 0;
97+
}
98+
99+
@Override
100+
public boolean isFirstPage() {
101+
return false;
102+
}
103+
104+
@Override
105+
public boolean isLastPage() {
106+
return false;
107+
}
108+
109+
private static class UriRecord implements DocumentRecord {
110+
111+
private String uri;
112+
113+
public UriRecord(String uri) {
114+
this.uri = uri;
115+
}
116+
117+
@Override
118+
public String getUri() {
119+
return uri;
120+
}
121+
122+
@Override
123+
public DocumentDescriptor getDescriptor() {
124+
return null;
125+
}
126+
127+
@Override
128+
public Format getFormat() {
129+
return null;
130+
}
131+
132+
@Override
133+
public String getMimetype() {
134+
return null;
135+
}
136+
137+
@Override
138+
public long getLength() {
139+
return 0;
140+
}
141+
142+
@Override
143+
public <T extends DocumentMetadataReadHandle> T getMetadata(T metadataHandle) {
144+
return null;
145+
}
146+
147+
@Override
148+
public <T> T getMetadataAs(Class<T> as) {
149+
return null;
150+
}
151+
152+
@Override
153+
public <T extends AbstractReadHandle> T getContent(T contentHandle) {
154+
return null;
155+
}
156+
157+
@Override
158+
public <T> T getContentAs(Class<T> as) {
159+
return null;
160+
}
161+
}
162+
}

0 commit comments

Comments
 (0)