Skip to content

Commit 94004ec

Browse files
authored
Merge pull request #384 from marklogic/feature/mkdir-write-files
MLE-19124 Fixed bug with failing when export path doesn't exist
2 parents 532ba7d + f5e2a08 commit 94004ec

File tree

4 files changed

+82
-2
lines changed

4 files changed

+82
-2
lines changed

marklogic-spark-connector/src/main/java/com/marklogic/spark/MarkLogicFileTable.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
import com.marklogic.spark.reader.file.FileScanBuilder;
77
import com.marklogic.spark.writer.file.DocumentFileWriteBuilder;
8+
import org.apache.hadoop.conf.Configuration;
89
import org.apache.hadoop.fs.FileStatus;
10+
import org.apache.hadoop.fs.Path;
911
import org.apache.spark.sql.SparkSession;
1012
import org.apache.spark.sql.connector.read.ScanBuilder;
1113
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
@@ -17,6 +19,8 @@
1719
import scala.Option;
1820
import scala.collection.Seq;
1921

22+
import java.io.IOException;
23+
2024
/**
2125
* Extends Spark's FileTable class so that it can make use of that class's file index capabilities, which includes
2226
* support for Spark options like recursiveFileLookup and pathGlobFilter as defined at
@@ -36,6 +40,9 @@ class MarkLogicFileTable extends FileTable {
3640

3741
MarkLogicFileTable(SparkSession sparkSession, CaseInsensitiveStringMap options, Seq<String> paths, StructType schema) {
3842
super(sparkSession, options, paths, Option.apply(schema));
43+
if (isWriteFilesOperation(options, paths)) {
44+
makeWritePath(paths.head(), sparkSession);
45+
}
3946
this.options = options;
4047
this.schema = schema;
4148
}
@@ -77,4 +84,35 @@ public Class<? extends FileFormat> fallbackFileFormat() {
7784
// so null is returned.
7885
return null;
7986
}
87+
88+
private boolean isWriteFilesOperation(CaseInsensitiveStringMap options, Seq<String> paths) {
89+
// When writing files, a user is limited to a single path. So if the user provides multiple paths when
90+
// reading files, we immediately know it's not a write operation.
91+
if (paths.size() != 1) {
92+
return false;
93+
}
94+
// Unfortunately not all "read files" options have a common base. The worst case though of
95+
// mis-identifying this as a "read" operation and making a directory automatically though is that
96+
// the user doesn't get an expected error for trying to read a path that doesn't exist.
97+
return options.keySet()
98+
.stream()
99+
.noneMatch(key -> key.startsWith("spark.marklogic.read.files")
100+
|| key.startsWith("spark.marklogic.read.aggregates.xml")
101+
);
102+
}
103+
104+
private void makeWritePath(String path, SparkSession sparkSession) {
105+
if (Util.MAIN_LOGGER.isDebugEnabled()) {
106+
Util.MAIN_LOGGER.debug("Calling mkdirs on path: {}", path);
107+
}
108+
Configuration config = sparkSession.sparkContext().hadoopConfiguration();
109+
Path hadoopPath = new Path(path);
110+
try {
111+
hadoopPath.getFileSystem(config).mkdirs(hadoopPath);
112+
} catch (Exception ex) {
113+
// The user is likely to get an AnalysisException from Spark due to the path not existing, which is the
114+
// better error to be propagated.
115+
Util.MAIN_LOGGER.error("Unable to call mkdirs on path: {}; cause: {}", path, ex.getMessage());
116+
}
117+
}
80118
}

marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/ReadAggregateXmlFilesTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.marklogic.spark.AbstractIntegrationTest;
88
import com.marklogic.spark.ConnectorException;
99
import com.marklogic.spark.Options;
10+
import org.apache.spark.sql.AnalysisException;
1011
import org.apache.spark.sql.Dataset;
1112
import org.apache.spark.sql.Row;
1213
import org.jdom2.Namespace;
@@ -216,6 +217,16 @@ void invalidEncoding() {
216217
"Actual error: " + ex.getMessage());
217218
}
218219

220+
@Test
221+
void pathDoesntExist() {
222+
AnalysisException ex = assertThrows(AnalysisException.class, () -> newSparkSession().read()
223+
.format(CONNECTOR_IDENTIFIER)
224+
.option(Options.READ_AGGREGATES_XML_ELEMENT, "MedlineCitation")
225+
.load("path-doesnt-exist"));
226+
227+
assertTrue(ex.getMessage().contains("Path does not exist"), "Unexpected error: " + ex.getMessage());
228+
}
229+
219230
private void verifyRow(Row row, String expectedUriSuffix, String rootPath, String name, int age) {
220231
String uri = row.getString(0);
221232
assertTrue(uri.endsWith(expectedUriSuffix), format("URI %s doesn't end with %s", uri, expectedUriSuffix));

marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/ReadArchiveFileTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.marklogic.spark.ConnectorException;
99
import com.marklogic.spark.Options;
1010
import com.marklogic.spark.TestUtil;
11+
import org.apache.spark.sql.AnalysisException;
1112
import org.apache.spark.sql.Dataset;
1213
import org.apache.spark.sql.Row;
1314
import org.apache.spark.sql.SaveMode;
@@ -222,6 +223,16 @@ void customEncoding() {
222223
doc.assertElementExists("/MedlineCitationSet");
223224
}
224225

226+
@Test
227+
void pathDoesntExist() {
228+
AnalysisException ex = assertThrows(AnalysisException.class, () -> newSparkSession().read()
229+
.format(CONNECTOR_IDENTIFIER)
230+
.option(Options.READ_FILES_TYPE, "archive")
231+
.load("path-doesnt-exist"));
232+
233+
assertTrue(ex.getMessage().contains("Path does not exist"), "Unexpected error: " + ex.getMessage());
234+
}
235+
225236
private void verifyAllMetadata(Path tempDir, int rowCount) {
226237
List<Row> rows = sparkSession.read().format(CONNECTOR_IDENTIFIER)
227238
.option(Options.READ_FILES_TYPE, "archive")

marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/file/WriteDocumentFilesTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.apache.spark.sql.SaveMode;
1717
import org.junit.jupiter.api.Test;
1818
import org.junit.jupiter.api.io.TempDir;
19+
import org.junit.jupiter.params.ParameterizedTest;
20+
import org.junit.jupiter.params.provider.ValueSource;
1921
import org.springframework.util.FileCopyUtils;
2022

2123
import java.io.File;
@@ -25,8 +27,7 @@
2527
import java.util.List;
2628
import java.util.stream.Collectors;
2729

28-
import static org.junit.jupiter.api.Assertions.assertEquals;
29-
import static org.junit.jupiter.api.Assertions.assertTrue;
30+
import static org.junit.jupiter.api.Assertions.*;
3031

3132
class WriteDocumentFilesTest extends AbstractIntegrationTest {
3233

@@ -45,6 +46,25 @@ void writeFifteenAuthorFiles(@TempDir Path tempDir) throws Exception {
4546
verifyAuthorFilesWereCorrectlyWritten(tempDir);
4647
}
4748

49+
@ParameterizedTest
50+
@ValueSource(strings = {"doesntexist", "has space", "has+plus"})
51+
void pathDoesntExist(String directoryName, @TempDir Path tempDir) {
52+
File dir = new File(tempDir.toFile(), directoryName);
53+
assertFalse(dir.exists());
54+
55+
newSparkSession().read().format(CONNECTOR_IDENTIFIER)
56+
.option(Options.CLIENT_URI, makeClientUri())
57+
.option(Options.READ_DOCUMENTS_URIS, "/author/author1.json")
58+
.load()
59+
.write().format(CONNECTOR_IDENTIFIER)
60+
.mode(SaveMode.Append)
61+
.save(dir.getAbsolutePath());
62+
63+
assertTrue(dir.exists(), "Directory was not created: " + dir.getAbsolutePath());
64+
assertEquals(1, dir.listFiles().length);
65+
assertTrue(new File(dir, "author").exists());
66+
}
67+
4868
@Test
4969
void streamAuthorDocuments(@TempDir Path tempDir) throws Exception {
5070
Dataset<Row> dataset = newSparkSession().read()

0 commit comments

Comments
 (0)