|
7 | 7 | import com.marklogic.spark.writer.file.DocumentFileWriteBuilder;
|
8 | 8 | import org.apache.hadoop.conf.Configuration;
|
9 | 9 | import org.apache.hadoop.fs.FileStatus;
|
| 10 | +import org.apache.hadoop.fs.FileSystem; |
10 | 11 | import org.apache.hadoop.fs.Path;
|
11 | 12 | import org.apache.spark.sql.SparkSession;
|
12 | 13 | import org.apache.spark.sql.connector.read.ScanBuilder;
|
|
19 | 20 | import scala.Option;
|
20 | 21 | import scala.collection.Seq;
|
21 | 22 |
|
22 |
| -import java.io.IOException; |
23 |
| - |
24 | 23 | /**
|
25 | 24 | * Extends Spark's FileTable class so that it can make use of that class's file index capabilities, which includes
|
26 | 25 | * support for Spark options like recursiveFileLookup and pathGlobFilter as defined at
|
@@ -91,28 +90,37 @@ private boolean isWriteFilesOperation(CaseInsensitiveStringMap options, Seq<Stri
|
91 | 90 | if (paths.size() != 1) {
|
92 | 91 | return false;
|
93 | 92 | }
|
94 |
| - // Unfortunately not all "read files" options have a common base. The worst case though of |
95 |
| - // mis-identifying this as a "read" operation and making a directory automatically though is that |
96 |
| - // the user doesn't get an expected error for trying to read a path that doesn't exist. |
97 |
| - return options.keySet() |
98 |
| - .stream() |
| 93 | + return options.keySet().stream() |
| 94 | + // There are no required options to indicate whether this is a read or write. So we at least check for some |
| 95 | + // options that would indicate that the user is reading files, in which case we don't need to call mkdirs. |
99 | 96 | .noneMatch(key -> key.startsWith("spark.marklogic.read.files")
|
100 | 97 | || key.startsWith("spark.marklogic.read.aggregates.xml")
|
101 | 98 | );
|
102 | 99 | }
|
103 | 100 |
|
104 | 101 | private void makeWritePath(String path, SparkSession sparkSession) {
|
105 |
| - if (Util.MAIN_LOGGER.isDebugEnabled()) { |
106 |
| - Util.MAIN_LOGGER.debug("Calling mkdirs on path: {}", path); |
107 |
| - } |
108 | 102 | Configuration config = sparkSession.sparkContext().hadoopConfiguration();
|
109 |
| - Path hadoopPath = new Path(path); |
110 | 103 | try {
|
111 |
| - hadoopPath.getFileSystem(config).mkdirs(hadoopPath); |
| 104 | + Path hadoopPath = new Path(path); |
| 105 | + FileSystem fileSystem = hadoopPath.getFileSystem(config); |
| 106 | + if (!fileSystem.exists(hadoopPath)) { |
| 107 | + if (Util.MAIN_LOGGER.isDebugEnabled()) { |
| 108 | + Util.MAIN_LOGGER.debug("Calling mkdirs on path: {}", path); |
| 109 | + } |
| 110 | + fileSystem.mkdirs(hadoopPath); |
| 111 | + } |
112 | 112 | } catch (Exception ex) {
|
113 |
| - // The user is likely to get an AnalysisException from Spark due to the path not existing, which is the |
114 |
| - // better error to be propagated. |
115 |
| - Util.MAIN_LOGGER.error("Unable to call mkdirs on path: {}; cause: {}", path, ex.getMessage()); |
| 113 | + // We'll get an exception in 1 of 2 scenarios. First, this is a write operation and the mkdirs call |
| 114 | + // fails. In that scenario, the user will still get an AnalysisException from Spark noting that the path |
| 115 | + // does not exist. The user likely won't be able to make the path themselves - perhaps a permissions issue - |
| 116 | + // and thus has the info they need to proceed. In the other scenario, this is a read operation and the |
| 117 | + // mkdirs call didn't need to happen. In that scenario, the user will still get an AnalysisException because |
| 118 | + // the directory could not be found. So the user has enough information to proceed there as well. So this |
| 119 | + // is only being logged at the debug level as the AnalysisException should be sufficient for helping the |
| 120 | + // user to fix their problem. |
| 121 | + if (Util.MAIN_LOGGER.isDebugEnabled()) { |
| 122 | + Util.MAIN_LOGGER.debug("Unable to call mkdirs on path: {}; cause: {}", path, ex.getMessage()); |
| 123 | + } |
116 | 124 | }
|
117 | 125 | }
|
118 | 126 | }
|
0 commit comments