Skip to content

Commit 9a4ac2d

Browse files
authored
Merge pull request #295 from marklogic/feature/17084-part-3
MLE-17084 Fix for streaming files with spaces in filename
2 parents ca1c503 + ada6aa9 commit 9a4ac2d

12 files changed

+24
-18
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void close() {
8787
}
8888

8989
private void openNextFile() {
90-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
90+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
9191
nextFilePathIndex++;
9292
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
9393
}

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ byte[] readBytes(InputStream inputStream) throws IOException {
7272
return this.encoding != null ? new String(bytes, this.encoding).getBytes() : bytes;
7373
}
7474

75-
public String getDecodedFilePath(FilePartition filePartition, int index) {
76-
String path = filePartition.getPaths().get(index);
75+
public String decodeFilePath(String path) {
7776
try {
7877
if (this.encoding != null) {
7978
return URLDecoder.decode(path, this.encoding);

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ public boolean next() {
4141
return false;
4242
}
4343

44-
final String path = fileContext.getDecodedFilePath(filePartition, filePathIndex);
44+
// If streaming, we want to put the unaltered file path in the row. The writer can then decode it and also use
45+
// its original value as the URI, as the PUT v1/documents endpoint does not allow e.g. spaces.
46+
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
47+
final String path = this.isStreaming ? originalFilePath : fileContext.decodeFilePath(originalFilePath);
48+
4549
filePathIndex++;
4650
try {
4751
byte[] content = this.isStreaming ? serializeFileContext() : readFileIntoByteArray(path);

src/main/java/com/marklogic/spark/reader/file/GzipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public boolean next() {
3838
return false;
3939
}
4040

41-
String currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
41+
String currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
4242
nextFilePathIndex++;
4343
InputStream gzipInputStream = null;
4444
try {

src/main/java/com/marklogic/spark/reader/file/MlcpArchiveFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void close() {
9999
}
100100

101101
private void openNextFile() {
102-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
102+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
103103
nextFilePathIndex++;
104104
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
105105
}

src/main/java/com/marklogic/spark/reader/file/RdfFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void close() throws IOException {
7676
}
7777

7878
private boolean initializeRdfStreamReader() {
79-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
79+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
8080
if (logger.isDebugEnabled()) {
8181
logger.debug("Reading file {}", this.currentFilePath);
8282
}

src/main/java/com/marklogic/spark/reader/file/RdfZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public boolean next() {
6868
}
6969

7070
// Open up the next zip.
71-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
71+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
7272
nextFilePathIndex++;
7373
this.currentZipInputStream = new CustomZipInputStream(fileContext.openFile(currentFilePath));
7474
return next();

src/main/java/com/marklogic/spark/reader/file/ZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void close() {
7171
}
7272

7373
private void openNextFile() {
74-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
74+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
7575
nextFilePathIndex++;
7676
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
7777
}

src/main/java/com/marklogic/spark/reader/file/xml/AggregateXmlFileReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public boolean next() {
5353
}
5454

5555
try {
56-
String path = fileContext.getDecodedFilePath(filePartition, filePathIndex);
56+
String path = fileContext.decodeFilePath(filePartition.getPaths().get(filePathIndex));
5757
nextRowToReturn = this.aggregateXMLSplitter.nextRow(path);
5858
return true;
5959
} catch (RuntimeException ex) {
@@ -81,7 +81,7 @@ private boolean initializeAggregateXMLSplitter() {
8181
return false;
8282
}
8383

84-
final String filePath = fileContext.getDecodedFilePath(filePartition, filePathIndex);
84+
final String filePath = fileContext.decodeFilePath(filePartition.getPaths().get(filePathIndex));
8585
try {
8686
this.inputStream = fileContext.openFile(filePath);
8787
String identifierForError = "file " + filePath;

src/main/java/com/marklogic/spark/reader/file/xml/ZipAggregateXmlFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void close() {
8383
}
8484

8585
private void openNextFile() {
86-
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
86+
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
8787
nextFilePathIndex++;
8888
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
8989
}

0 commit comments

Comments
 (0)