Skip to content

Commit cc2d859

Browse files
committed
MLE-17095 Refactor, removed some duplication
1 parent 9ea7ce0 commit cc2d859

File tree

9 files changed

+19
-17
lines changed

9 files changed

+19
-17
lines changed

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ public final String getStringOption(String option) {
145145
return hasOption(option) ? properties.get(option).trim() : null;
146146
}
147147

148+
public final boolean isStreamingFiles() {
149+
return "true".equalsIgnoreCase(getStringOption(Options.STREAM_FILES));
150+
}
151+
148152
public Map<String, String> getProperties() {
149153
return properties;
150154
}

src/main/java/com/marklogic/spark/reader/document/DocumentContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Set<DocumentManager.Metadata> getRequestedMetadata() {
4343
}
4444

4545
boolean contentWasRequested() {
46-
if ("true".equals(getStringOption(Options.STREAM_FILES))) {
46+
if (isStreamingFiles()) {
4747
return false;
4848
}
4949
if (!hasOption(Options.READ_DOCUMENTS_CATEGORIES)) {

src/main/java/com/marklogic/spark/reader/document/DocumentScanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class DocumentScanBuilder implements ScanBuilder, SupportsPushDownLimit {
1717

1818
DocumentScanBuilder(CaseInsensitiveStringMap options, StructType schema) {
1919
this.context = new DocumentContext(options, schema);
20-
if ("true".equalsIgnoreCase(this.context.getStringOption(Options.STREAM_FILES)) && Util.MAIN_LOGGER.isInfoEnabled()) {
20+
if (this.context.isStreamingFiles() && Util.MAIN_LOGGER.isInfoEnabled()) {
2121
Util.MAIN_LOGGER.info("Will defer reading documents from MarkLogic so they can be streamed to files during the writer phase.");
2222
}
2323
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ForestReader implements PartitionReader<InternalRow> {
5454
ForestReader(ForestPartition forestPartition, DocumentContext context) {
5555
this.forestPartition = forestPartition;
5656
this.limit = context.getLimit();
57-
this.isStreamingFiles = "true".equalsIgnoreCase(context.getStringOption(Options.STREAM_FILES));
57+
this.isStreamingFiles = context.isStreamingFiles();
5858

5959
DatabaseClient client = context.isDirectConnection() ?
6060
context.connectToMarkLogic(forestPartition.getHost()) :

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ public enum StreamingMode {
4444
ArchiveFileReader(FilePartition filePartition, FileContext fileContext) {
4545
this(
4646
filePartition, fileContext,
47-
// Will refactor this later to avoid duplication of this comparison.
48-
// Should be a nice little method in FileContext.
49-
"true".equalsIgnoreCase(fileContext.getStringOption(Options.STREAM_FILES)) ? StreamingMode.STREAM_DURING_READER_PHASE : null
47+
fileContext.isStreamingFiles() ? StreamingMode.STREAM_DURING_READER_PHASE : null
5048
);
5149
}
5250

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class GenericFileReader implements PartitionReader<InternalRow> {
3232
GenericFileReader(FilePartition filePartition, FileContext fileContext) {
3333
this.filePartition = filePartition;
3434
this.fileContext = fileContext;
35-
this.isStreaming = "true".equalsIgnoreCase(fileContext.getStringOption(Options.STREAM_FILES));
35+
this.isStreaming = fileContext.isStreamingFiles();
3636
}
3737

3838
@Override

src/main/java/com/marklogic/spark/writer/DocumentRowConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class DocumentRowConverter implements RowConverter {
4242
this.uriTemplate = writeContext.getStringOption(Options.WRITE_URI_TEMPLATE);
4343
this.documentFormat = writeContext.getDocumentFormat();
4444
this.objectMapper = new ObjectMapper();
45-
this.isStreamingFromFiles = writeContext.hasOption(Options.STREAM_FILES);
45+
this.isStreamingFromFiles = writeContext.isStreamingFiles();
4646
}
4747

4848
@Override

src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class WriteBatcherDataWriter implements DataWriter<InternalRow> {
7171
this.docBuilder = this.writeContext.newDocBuilder();
7272
this.databaseClient = writeContext.connectToMarkLogic();
7373
this.rowConverter = determineRowConverter();
74-
this.isStreamingFiles = "true".equals(writeContext.getStringOption(Options.STREAM_FILES));
74+
this.isStreamingFiles = writeContext.isStreamingFiles();
7575
this.documentManager = this.isStreamingFiles ? databaseClient.newDocumentManager() : null;
7676

7777
if (writeContext.isAbortOnFailure()) {

src/main/java/com/marklogic/spark/writer/file/ContentWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ class ContentWriter {
4242
private final GenericDocumentManager documentManager;
4343

4444
ContentWriter(Map<String, String> properties) {
45-
this.encoding = determineEncoding(properties);
46-
this.prettyPrint = "true".equalsIgnoreCase(properties.get(Options.WRITE_FILES_PRETTY_PRINT));
45+
ContextSupport context = new ContextSupport(properties);
46+
this.encoding = determineEncoding(context);
47+
this.prettyPrint = "true".equalsIgnoreCase(context.getStringOption(Options.WRITE_FILES_PRETTY_PRINT));
4748
if (this.prettyPrint) {
4849
this.objectMapper = new ObjectMapper();
4950
this.transformer = newTransformer();
@@ -52,9 +53,8 @@ class ContentWriter {
5253
this.objectMapper = null;
5354
}
5455

55-
this.isStreamingFiles = "true".equalsIgnoreCase(properties.get(Options.STREAM_FILES));
56-
this.documentManager = this.isStreamingFiles ?
57-
new ContextSupport(properties).connectToMarkLogic().newDocumentManager() : null;
56+
this.isStreamingFiles = context.isStreamingFiles();
57+
this.documentManager = this.isStreamingFiles ? context.connectToMarkLogic().newDocumentManager() : null;
5858
}
5959

6060
void writeContent(InternalRow row, OutputStream outputStream) throws IOException {
@@ -82,9 +82,9 @@ void writeMetadata(InternalRow row, OutputStream outputStream) throws IOExceptio
8282
}
8383
}
8484

85-
private Charset determineEncoding(Map<String, String> properties) {
86-
String encodingValue = properties.get(Options.WRITE_FILES_ENCODING);
87-
if (encodingValue != null && encodingValue.trim().length() > 0) {
85+
private Charset determineEncoding(ContextSupport context) {
86+
if (context.hasOption(Options.WRITE_FILES_ENCODING)) {
87+
String encodingValue = context.getStringOption(Options.WRITE_FILES_ENCODING);
8888
try {
8989
return Charset.forName(encodingValue);
9090
} catch (Exception ex) {

0 commit comments

Comments
 (0)