6
6
import com .fasterxml .jackson .databind .JsonNode ;
7
7
import com .fasterxml .jackson .databind .ObjectMapper ;
8
8
import com .marklogic .client .document .GenericDocumentManager ;
9
- import com .marklogic .client .io .BytesHandle ;
9
+ import com .marklogic .client .io .InputStreamHandle ;
10
10
import com .marklogic .spark .ConnectorException ;
11
11
import com .marklogic .spark .ContextSupport ;
12
12
import com .marklogic .spark .Options ;
13
13
import com .marklogic .spark .reader .document .DocumentRowSchema ;
14
+ import org .apache .commons .io .IOUtils ;
14
15
import org .apache .spark .sql .catalyst .InternalRow ;
15
16
16
17
import javax .xml .XMLConstants ;
19
20
import javax .xml .transform .stream .StreamSource ;
20
21
import java .io .ByteArrayInputStream ;
21
22
import java .io .IOException ;
23
+ import java .io .InputStream ;
22
24
import java .io .OutputStream ;
23
25
import java .nio .charset .Charset ;
24
26
import java .util .Map ;
@@ -35,6 +37,7 @@ class ContentWriter {
35
37
private final boolean prettyPrint ;
36
38
private final Charset encoding ;
37
39
40
+ private final boolean isStreamingFiles ;
38
41
// Only set when streaming.
39
42
private final GenericDocumentManager documentManager ;
40
43
@@ -49,22 +52,22 @@ class ContentWriter {
49
52
this .objectMapper = null ;
50
53
}
51
54
52
- this .documentManager = "true" .equalsIgnoreCase (properties .get (Options .STREAM_FILES )) ?
55
+ this .isStreamingFiles = "true" .equalsIgnoreCase (properties .get (Options .STREAM_FILES ));
56
+ this .documentManager = this .isStreamingFiles ?
53
57
new ContextSupport (properties ).connectToMarkLogic ().newDocumentManager () : null ;
54
58
}
55
59
56
60
void writeContent (InternalRow row , OutputStream outputStream ) throws IOException {
57
- if (this .prettyPrint ) {
61
+ if (this .isStreamingFiles ) {
62
+ streamDocumentToFile (row , outputStream );
63
+ } else if (this .prettyPrint ) {
58
64
prettyPrintContent (row , outputStream );
65
+ } else if (this .encoding != null ) {
66
+ // We know the string from MarkLogic is UTF-8, so we use getBytes to convert it to the user's
67
+ // specified encoding (as opposed to new String(bytes, encoding)).
68
+ outputStream .write (new String (row .getBinary (1 )).getBytes (this .encoding ));
59
69
} else {
60
- byte [] bytes = getContentBytes (row );
61
- if (this .encoding != null ) {
62
- // We know the string from MarkLogic is UTF-8, so we use getBytes to convert it to the user's
63
- // specified encoding (as opposed to new String(bytes, encoding)).
64
- outputStream .write (new String (bytes ).getBytes (this .encoding ));
65
- } else {
66
- outputStream .write (bytes );
67
- }
70
+ outputStream .write (row .getBinary (1 ));
68
71
}
69
72
}
70
73
@@ -116,7 +119,7 @@ private Transformer newTransformer() {
116
119
}
117
120
118
121
private void prettyPrintContent (InternalRow row , OutputStream outputStream ) throws IOException {
119
- final byte [] content = getContentBytes ( row );
122
+ final byte [] content = row . getBinary ( 1 );
120
123
final String format = row .isNullAt (2 ) ? null : row .getString (2 );
121
124
if ("JSON" .equalsIgnoreCase (format )) {
122
125
prettyPrintJson (content , outputStream );
@@ -151,11 +154,10 @@ private void prettyPrintXml(byte[] content, OutputStream outputStream) {
151
154
}
152
155
}
153
156
154
- private byte [] getContentBytes (InternalRow row ) {
155
- if (this .documentManager != null ) {
156
- String uri = row .getString (0 );
157
- return documentManager .read (uri , new BytesHandle ()).get ();
158
- }
159
- return row .getBinary (1 );
157
+ private void streamDocumentToFile (InternalRow row , OutputStream outputStream ) throws IOException {
158
+ String uri = row .getString (0 );
159
+ InputStream inputStream = documentManager .read (uri , new InputStreamHandle ()).get ();
160
+ // commons-io is a dependency of Spark and a common utility for copying between two steams.
161
+ IOUtils .copy (inputStream , outputStream );
160
162
}
161
163
}
0 commit comments