Skip to content

Commit ca1c503

Browse files
authored
Merge pull request #294 from marklogic/feature/space-in-filename
MLE-17084 Initial fix for handling spaces in filenames
2 parents c364d42 + 1d40d0e commit ca1c503

15 files changed

+127
-11
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void close() {
8787
}
8888

8989
private void openNextFile() {
90-
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
90+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
9191
nextFilePathIndex++;
9292
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
9393
}

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.marklogic.spark.ConnectorException;
77
import com.marklogic.spark.ContextSupport;
88
import com.marklogic.spark.Options;
9+
import com.marklogic.spark.Util;
910
import org.apache.hadoop.fs.FSDataInputStream;
1011
import org.apache.hadoop.fs.FileSystem;
1112
import org.apache.hadoop.fs.Path;
@@ -14,6 +15,8 @@
1415
import java.io.IOException;
1516
import java.io.InputStream;
1617
import java.io.Serializable;
18+
import java.io.UnsupportedEncodingException;
19+
import java.net.URLDecoder;
1720
import java.nio.charset.Charset;
1821
import java.nio.charset.UnsupportedCharsetException;
1922
import java.util.Map;
@@ -68,4 +71,19 @@ byte[] readBytes(InputStream inputStream) throws IOException {
6871
byte[] bytes = FileUtil.readBytes(inputStream);
6972
return this.encoding != null ? new String(bytes, this.encoding).getBytes() : bytes;
7073
}
74+
75+
public String getDecodedFilePath(FilePartition filePartition, int index) {
76+
String path = filePartition.getPaths().get(index);
77+
try {
78+
if (this.encoding != null) {
79+
return URLDecoder.decode(path, this.encoding);
80+
}
81+
return URLDecoder.decode(path, Charset.defaultCharset());
82+
} catch (UnsupportedEncodingException e) {
83+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
84+
Util.MAIN_LOGGER.debug("Cannot decode path '{}', so will use path as-is. Error: {}", path, e.getMessage());
85+
}
86+
return path;
87+
}
88+
}
7189
}

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ public boolean next() {
4141
return false;
4242
}
4343

44-
final String path = filePartition.getPaths().get(filePathIndex);
44+
final String path = fileContext.getDecodedFilePath(filePartition, filePathIndex);
4545
filePathIndex++;
4646
try {
4747
byte[] content = this.isStreaming ? serializeFileContext() : readFileIntoByteArray(path);
48-
48+
4949
nextRowToReturn = new GenericInternalRow(new Object[]{
5050
UTF8String.fromString(path),
5151
ByteArray.concat(content),

src/main/java/com/marklogic/spark/reader/file/GzipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public boolean next() {
3838
return false;
3939
}
4040

41-
String currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
41+
String currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
4242
nextFilePathIndex++;
4343
InputStream gzipInputStream = null;
4444
try {

src/main/java/com/marklogic/spark/reader/file/MlcpArchiveFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void close() {
9999
}
100100

101101
private void openNextFile() {
102-
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
102+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
103103
nextFilePathIndex++;
104104
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
105105
}

src/main/java/com/marklogic/spark/reader/file/RdfFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void close() throws IOException {
7676
}
7777

7878
private boolean initializeRdfStreamReader() {
79-
this.currentFilePath = this.filePartition.getPaths().get(nextFilePathIndex);
79+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
8080
if (logger.isDebugEnabled()) {
8181
logger.debug("Reading file {}", this.currentFilePath);
8282
}

src/main/java/com/marklogic/spark/reader/file/RdfZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public boolean next() {
6868
}
6969

7070
// Open up the next zip.
71-
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
71+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
7272
nextFilePathIndex++;
7373
this.currentZipInputStream = new CustomZipInputStream(fileContext.openFile(currentFilePath));
7474
return next();

src/main/java/com/marklogic/spark/reader/file/ZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void close() {
7171
}
7272

7373
private void openNextFile() {
74-
this.currentFilePath = this.filePartition.getPaths().get(nextFilePathIndex);
74+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
7575
nextFilePathIndex++;
7676
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
7777
}

src/main/java/com/marklogic/spark/reader/file/xml/AggregateXmlFileReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public boolean next() {
5353
}
5454

5555
try {
56-
nextRowToReturn = this.aggregateXMLSplitter.nextRow(filePartition.getPaths().get(filePathIndex));
56+
String path = fileContext.getDecodedFilePath(filePartition, filePathIndex);
57+
nextRowToReturn = this.aggregateXMLSplitter.nextRow(path);
5758
return true;
5859
} catch (RuntimeException ex) {
5960
// Error is expected to be friendly already.
@@ -80,7 +81,7 @@ private boolean initializeAggregateXMLSplitter() {
8081
return false;
8182
}
8283

83-
final String filePath = filePartition.getPaths().get(filePathIndex);
84+
final String filePath = fileContext.getDecodedFilePath(filePartition, filePathIndex);
8485
try {
8586
this.inputStream = fileContext.openFile(filePath);
8687
String identifierForError = "file " + filePath;

src/main/java/com/marklogic/spark/reader/file/xml/ZipAggregateXmlFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void close() {
8383
}
8484

8585
private void openNextFile() {
86-
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
86+
this.currentFilePath = fileContext.getDecodedFilePath(filePartition, nextFilePathIndex);
8787
nextFilePathIndex++;
8888
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
8989
}

0 commit comments

Comments
 (0)