Skip to content

Commit 45d30dc

Browse files
authored
Merge pull request #351 from marklogic/feature/17730-plus-sign-filename
MLE-17730 Fixed file path encoding issues
2 parents 228340a + 77355de commit 45d30dc

20 files changed

+56
-54
lines changed

src/main/java/com/marklogic/spark/reader/file/ArchiveFileReader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,7 @@ private boolean readMetadataFollowedByContent() throws IOException {
189189

190190
private void openNextFile() {
191191
final boolean isStreamingDuringRead = StreamingMode.STREAM_DURING_READER_PHASE.equals(this.streamingMode);
192-
final String nextFilePath = filePartition.getPaths().get(nextFilePathIndex);
193-
194-
this.currentFilePath = isStreamingDuringRead ? nextFilePath : fileContext.decodeFilePath(nextFilePath);
192+
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
195193
nextFilePathIndex++;
196194

197195
if (!isStreamingDuringRead) {

src/main/java/com/marklogic/spark/reader/file/FileBatch.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex;
1414
import org.apache.spark.util.SerializableConfiguration;
1515

16+
import java.util.ArrayList;
17+
import java.util.List;
1618
import java.util.Map;
1719

1820
class FileBatch implements Batch {
@@ -27,17 +29,14 @@ class FileBatch implements Batch {
2729

2830
@Override
2931
public InputPartition[] planInputPartitions() {
30-
String[] inputFiles = fileIndex.inputFiles();
31-
int numPartitions = inputFiles.length;
32-
if (properties.containsKey(Options.READ_NUM_PARTITIONS)) {
33-
String value = properties.get(Options.READ_NUM_PARTITIONS);
34-
try {
35-
numPartitions = Integer.parseInt(value);
36-
} catch (NumberFormatException e) {
37-
throw new ConnectorException(String.format("Invalid value for number of partitions: %s", value));
38-
}
39-
}
40-
return FileUtil.makeFilePartitions(inputFiles, numPartitions);
32+
List<String> filePaths = new ArrayList<>();
33+
// Need to use allFiles and not inputFiles; the latter surprisingly URL-encodes each file path.
34+
// Would likely be better to soon refactor the FilePartition class to hold a FileStatus instead of a String so
35+
// that we don't need to convert it at all.
36+
fileIndex.allFiles().iterator().foreach(fileStatus -> filePaths.add(fileStatus.getPath().toString()));
37+
38+
int numPartitions = getNumberOfPartitions(filePaths);
39+
return FileUtil.makeFilePartitions(filePaths.toArray(new String[0]), numPartitions);
4140
}
4241

4342
@Override
@@ -48,4 +47,16 @@ public PartitionReaderFactory createReaderFactory() {
4847
FileContext fileContext = new FileContext(properties, new SerializableConfiguration(config));
4948
return new FilePartitionReaderFactory(fileContext);
5049
}
50+
51+
private int getNumberOfPartitions(List<String> filePaths) {
52+
if (properties.containsKey(Options.READ_NUM_PARTITIONS)) {
53+
String value = properties.get(Options.READ_NUM_PARTITIONS);
54+
try {
55+
return Integer.parseInt(value);
56+
} catch (NumberFormatException e) {
57+
throw new ConnectorException(String.format("Invalid value for number of partitions: %s", value));
58+
}
59+
}
60+
return filePaths.size();
61+
}
5162
}

src/main/java/com/marklogic/spark/reader/file/FileContext.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66
import com.marklogic.spark.ConnectorException;
77
import com.marklogic.spark.ContextSupport;
88
import com.marklogic.spark.Options;
9-
import com.marklogic.spark.Util;
109
import org.apache.hadoop.fs.FSDataInputStream;
10+
import org.apache.hadoop.fs.FileStatus;
1111
import org.apache.hadoop.fs.FileSystem;
1212
import org.apache.hadoop.fs.Path;
1313
import org.apache.spark.util.SerializableConfiguration;
1414

1515
import java.io.*;
16-
import java.net.URLDecoder;
1716
import java.nio.charset.Charset;
1817
import java.nio.charset.UnsupportedCharsetException;
1918
import java.util.Map;
@@ -53,7 +52,9 @@ public InputStream openFile(String filePath, boolean guessIfGzipped) {
5352
try {
5453
Path hadoopPath = new Path(filePath);
5554
FileSystem fileSystem = hadoopPath.getFileSystem(hadoopConfiguration.value());
56-
FSDataInputStream inputStream = fileSystem.open(hadoopPath);
55+
// Per the Spark BinaryFileFormat source code - calling getFileStatus seems to handle encoding in the file path.
56+
FileStatus fileStatus = fileSystem.getFileStatus(hadoopPath);
57+
FSDataInputStream inputStream = fileSystem.open(fileStatus.getPath());
5758
return isFileGzipped(filePath, guessIfGzipped) ? new GZIPInputStream(inputStream) : inputStream;
5859
} catch (Exception e) {
5960
throw new ConnectorException(String.format(
@@ -83,20 +84,6 @@ byte[] readBytes(InputStream inputStream) throws IOException {
8384
return this.encoding != null ? new String(bytes, this.encoding).getBytes() : bytes;
8485
}
8586

86-
public String decodeFilePath(String path) {
87-
try {
88-
if (this.encoding != null) {
89-
return URLDecoder.decode(path, this.encoding);
90-
}
91-
return URLDecoder.decode(path, Charset.defaultCharset());
92-
} catch (UnsupportedEncodingException e) {
93-
if (Util.MAIN_LOGGER.isDebugEnabled()) {
94-
Util.MAIN_LOGGER.debug("Cannot decode path '{}', so will use path as-is. Error: {}", path, e.getMessage());
95-
}
96-
return path;
97-
}
98-
}
99-
10087
private boolean isFileGzipped(String filePath, boolean guessIfGzipped) {
10188
if (this.isGzip()) {
10289
return true;

src/main/java/com/marklogic/spark/reader/file/GenericFileReader.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,9 @@ public boolean next() {
3838
return false;
3939
}
4040

41-
// If streaming, we want to put the unaltered file path in the row. The writer can then decode it and also use
42-
// its original value as the URI, as the PUT v1/documents endpoint does not allow e.g. spaces.
43-
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
44-
final String path = this.isStreaming ? originalFilePath : fileContext.decodeFilePath(originalFilePath);
45-
41+
final String path = filePartition.getPaths().get(filePathIndex);
4642
filePathIndex++;
43+
4744
try {
4845
byte[] content = this.isStreaming ?
4946
FileUtil.serializeFileContext(fileContext, path) :

src/main/java/com/marklogic/spark/reader/file/GzipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public boolean next() {
4848
return false;
4949
}
5050

51-
String currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
51+
String currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
5252
nextFilePathIndex++;
5353
String uri = makeURI(currentFilePath);
5454

src/main/java/com/marklogic/spark/reader/file/JsonLinesFileReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ public void close() {
6161
}
6262

6363
private void openNextFile() {
64-
final String originalFilePath = filePartition.getPaths().get(filePathIndex);
65-
this.currentFilePath = fileContext.decodeFilePath(originalFilePath);
64+
this.currentFilePath = filePartition.getPaths().get(filePathIndex);
6665
this.lineCounter = 1;
6766
this.filePathIndex++;
6867
// To mimic the behavior of the Spark JSON data source, this will guess if the file is gzipped based on its

src/main/java/com/marklogic/spark/reader/file/MlcpArchiveFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void close() {
9999
}
100100

101101
private void openNextFile() {
102-
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
102+
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
103103
nextFilePathIndex++;
104104
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
105105
}

src/main/java/com/marklogic/spark/reader/file/RdfFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void close() throws IOException {
7676
}
7777

7878
private boolean initializeRdfStreamReader() {
79-
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
79+
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
8080
if (logger.isDebugEnabled()) {
8181
logger.debug("Reading file {}", this.currentFilePath);
8282
}

src/main/java/com/marklogic/spark/reader/file/RdfZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public boolean next() {
6868
}
6969

7070
// Open up the next zip.
71-
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
71+
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
7272
nextFilePathIndex++;
7373
this.currentZipInputStream = new CustomZipInputStream(fileContext.openFile(currentFilePath));
7474
return next();

src/main/java/com/marklogic/spark/reader/file/ZipFileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private boolean nextWhileStreamingDuringReaderPhase() {
122122
}
123123

124124
private void openNextFile() {
125-
this.currentFilePath = fileContext.decodeFilePath(filePartition.getPaths().get(nextFilePathIndex));
125+
this.currentFilePath = filePartition.getPaths().get(nextFilePathIndex);
126126
nextFilePathIndex++;
127127
this.currentZipInputStream = new ZipInputStream(fileContext.openFile(this.currentFilePath));
128128
}

0 commit comments

Comments
 (0)