Skip to content

Commit 48c5d6e

Browse files
authored
Merge pull request #368 from marklogic/feature/error-fix
Throw full exception on write failure
2 parents be7513a + c1e38f1 commit 48c5d6e

File tree

13 files changed

+30
-31
lines changed

13 files changed

+30
-31
lines changed

CONTRIBUTING.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ This will produce a single jar file for the connector in the `./build/libs` dire
9898

9999
You can then launch PySpark with the connector available via:
100100

101-
pyspark --jars build/libs/marklogic-spark-connector-2.4-SNAPSHOT.jar
101+
pyspark --jars marklogic-spark-connector/build/libs/marklogic-spark-connector-2.5-SNAPSHOT.jar
102102

103103
The below command is an example of loading data from the test application deployed via the instructions at the top of
104104
this page.
@@ -142,7 +142,7 @@ For a quick test of writing documents, use the following:
142142

143143
```
144144
145-
spark.read.option("header", True).csv("src/test/resources/data.csv")\
145+
spark.read.option("header", True).csv("marklogic-spark-connector/src/test/resources/data.csv")\
146146
.repartition(2)\
147147
.write.format("marklogic")\
148148
.option("spark.marklogic.client.uri", "spark-test-user:spark@localhost:8000")\
@@ -174,7 +174,7 @@ The Spark master GUI is at <http://localhost:8080>. You can use this to view det
174174

175175
Now that you have a Spark cluster running, you just need to tell PySpark to connect to it:
176176

177-
pyspark --master spark://NYWHYC3G0W:7077 --jars build/libs/marklogic-spark-connector-2.4-SNAPSHOT.jar
177+
pyspark --master spark://NYWHYC3G0W:7077 --jars marklogic-spark-connector/build/libs/marklogic-spark-connector-2.5-SNAPSHOT.jar
178178

179179
You can then run the same commands as shown in the PySpark section above. The Spark master GUI will allow you to
180180
examine details of each of the commands that you run.
@@ -193,12 +193,12 @@ You will need the connector jar available, so run `./gradlew clean shadowJar` if
193193
You can then run a test Python program in this repository via the following (again, change the master address as
194194
needed); note that you run this outside of PySpark, and `spark-submit` is available after having installed PySpark:
195195

196-
spark-submit --master spark://NYWHYC3G0W:7077 --jars build/libs/marklogic-spark-connector-2.4-SNAPSHOT.jar src/test/python/test_program.py
196+
spark-submit --master spark://NYWHYC3G0W:7077 --jars marklogic-spark-connector/build/libs/marklogic-spark-connector-2.5-SNAPSHOT.jar src/test/python/test_program.py
197197

198198
You can also test a Java program. To do so, first move the `com.marklogic.spark.TestProgram` class from `src/test/java`
199199
to `src/main/java`. Then run `./gradlew clean shadowJar` to rebuild the connector jar. Then run the following:
200200

201-
spark-submit --master spark://NYWHYC3G0W:7077 --class com.marklogic.spark.TestProgram build/libs/marklogic-spark-connector-2.4-SNAPSHOT.jar
201+
spark-submit --master spark://NYWHYC3G0W:7077 --class com.marklogic.spark.TestProgram marklogic-spark-connector/build/libs/marklogic-spark-connector-2.5-SNAPSHOT.jar
202202

203203
Be sure to move `TestProgram` back to `src/test/java` when you are done.
204204

marklogic-spark-connector/build.gradle

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,6 @@ dependencies {
100100
testImplementation "org.skyscreamer:jsonassert:1.5.1"
101101
}
102102

103-
//test {
104-
// // Allows mlHost to override the value in gradle.properties, which the test plumbing will default to.
105-
// environment "mlHost", mlHost
106-
//}
107-
108103
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_17)) {
109104
test {
110105
// See https://stackoverflow.com/questions/72724816/running-unit-tests-with-spark-3-3-0-on-java-17-fails-with-illegalaccesserror-cl

marklogic-spark-connector/src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.Map;
1717
import java.util.Set;
1818
import java.util.concurrent.TimeUnit;
19-
import java.util.stream.Stream;
2019

2120
public class ContextSupport extends Context implements Serializable {
2221

marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/WriteBatcherDataWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,12 @@ private synchronized void throwWriteFailureIfExists() {
227227
if (failure instanceof ConnectorException) {
228228
throw (ConnectorException) failure;
229229
}
230-
// Only including the message seems sufficient here, as Spark is logging the stacktrace. And the user
231-
// most likely only needs to know the message.
232-
throw new ConnectorException(failure.getMessage());
230+
// Originally, only the failure message was included under the impression that the Spark environment was
231+
// logging the full stacktrace. That either was not the case or is no longer the case on Spark 3.5.x.
232+
// So the original exception is retained. But oddly, this results in a SparkException with a null cause - ???.
233+
// That doesn't really impact a user - it's a SparkException regardless - but caused some tests to no longer
234+
// be able to catch a ConnectorException.
235+
throw new ConnectorException(failure.getMessage(), failure);
233236
}
234237
}
235238

marklogic-spark-connector/src/main/java/com/marklogic/spark/writer/file/ContentWriter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.marklogic.spark.ConnectorException;
1212
import com.marklogic.spark.ContextSupport;
1313
import com.marklogic.spark.Options;
14-
import com.marklogic.spark.Util;
1514
import com.marklogic.spark.reader.document.DocumentRowSchema;
1615
import org.apache.commons.io.IOUtils;
1716
import org.apache.spark.sql.catalyst.InternalRow;

marklogic-spark-connector/src/test/java/com/marklogic/spark/AbstractIntegrationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ protected final ConnectorException assertThrowsConnectorException(Runnable r) {
158158
SparkException ex = assertThrows(SparkException.class, () -> r.run());
159159
assertTrue(ex.getCause() instanceof ConnectorException,
160160
"Expect the Spark-thrown SparkException to wrap our ConnectorException, which is an exception that we " +
161-
"intentionally throw when an error condition is detected.");
161+
"intentionally throw when an error condition is detected. " +
162+
"Actual exception cause type: " + ex.getCause());
162163
return (ConnectorException) ex.getCause();
163164
}
164165

marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/ReadGenericFilesTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.marklogic.spark.AbstractIntegrationTest;
1212
import com.marklogic.spark.ConnectorException;
1313
import com.marklogic.spark.Options;
14+
import org.apache.spark.SparkException;
1415
import org.apache.spark.sql.DataFrameWriter;
1516
import org.apache.spark.sql.Dataset;
1617
import org.apache.spark.sql.Row;
@@ -72,7 +73,7 @@ void wrongEncoding() {
7273
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
7374
.mode(SaveMode.Append);
7475

75-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
76+
SparkException ex = assertThrows(SparkException.class, () -> writer.save());
7677
assertTrue(ex.getMessage().contains("document is not UTF-8 encoded"), "Actual error: " + ex.getMessage());
7778
}
7879

marklogic-spark-connector/src/test/java/com/marklogic/spark/reader/file/StreamGenericFilesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
package com.marklogic.spark.reader.file;
55

66
import com.marklogic.spark.AbstractIntegrationTest;
7-
import com.marklogic.spark.ConnectorException;
87
import com.marklogic.spark.Options;
8+
import org.apache.spark.SparkException;
99
import org.apache.spark.sql.DataFrameWriter;
1010
import org.apache.spark.sql.Dataset;
1111
import org.apache.spark.sql.Row;
@@ -73,7 +73,7 @@ void handleFailureWhileStreaming() {
7373
.option(Options.WRITE_PERMISSIONS, "not-an-actual-role,read")
7474
.mode(SaveMode.Append);
7575

76-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
76+
SparkException ex = assertThrows(SparkException.class, () -> writer.save());
7777
assertTrue(ex.getMessage().contains("SEC-ROLEDNE: xdmp:role(\"not-an-actual-role\")"),
7878
"This verifies that when the connector uses GenericDocumentManager to PUT a single document, any error " +
7979
"is still wrapped in a ConnectorException. Actual error message: " + ex.getMessage());

marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WritePartialBatchTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
*/
44
package com.marklogic.spark.writer;
55

6-
import com.marklogic.spark.ConnectorException;
76
import com.marklogic.spark.Options;
7+
import org.apache.spark.SparkException;
88
import org.apache.spark.sql.DataFrameWriter;
99
import org.apache.spark.sql.SaveMode;
1010
import org.junit.jupiter.api.Test;
1111

12+
import static org.junit.jupiter.api.Assertions.assertThrows;
1213
import static org.junit.jupiter.api.Assertions.assertTrue;
1314

1415
class WritePartialBatchTest extends AbstractWriteTest {
@@ -39,7 +40,7 @@ void shouldThrowError() {
3940
.option(Options.WRITE_PERMISSIONS, DEFAULT_PERMISSIONS)
4041
.mode(SaveMode.Append);
4142

42-
ConnectorException ex = assertThrowsConnectorException(() -> writer.save());
43+
SparkException ex = assertThrows(SparkException.class, () -> writer.save());
4344
assertTrue(ex.getMessage().contains("Document is not JSON"), "Verifying that trying to write non-JSON " +
4445
"documents with a .json extension should produce an error; unexpected error: " + ex.getMessage());
4546
}

marklogic-spark-connector/src/test/java/com/marklogic/spark/writer/WriteRowsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ void invalidBatchSize() {
151151
*/
152152
@Test
153153
void userNotPermittedToWriteAndFailOnCommit() {
154-
ConnectorException ex = assertThrowsConnectorException(() -> newWriter()
154+
SparkException ex = assertThrows(SparkException.class, () -> newWriter()
155155
.option(Options.CLIENT_USERNAME, "spark-no-write-user")
156156
.option(Options.WRITE_BATCH_SIZE, 500)
157157
.save()
@@ -181,7 +181,7 @@ void invalidPassword() {
181181
*/
182182
@Test
183183
void userNotPermittedToWriteAndFailOnWrite() {
184-
ConnectorException ex = assertThrowsConnectorException(() -> newWriter()
184+
SparkException ex = assertThrows(SparkException.class, () -> newWriter()
185185
.option(Options.CLIENT_USERNAME, "spark-no-write-user")
186186
.option(Options.WRITE_BATCH_SIZE, 1)
187187
.option(Options.WRITE_THREAD_COUNT, 1)
@@ -224,7 +224,7 @@ void dontAbortOnFailure() {
224224
assertEquals(1, successCount.get());
225225
}
226226

227-
private void verifyFailureIsDueToLackOfPermission(ConnectorException ex) {
227+
private void verifyFailureIsDueToLackOfPermission(SparkException ex) {
228228
assertTrue(ex.getMessage().contains("Server Message: You do not have permission to this method and URL"),
229229
"Unexpected cause message: " + ex.getMessage());
230230
verifyNoDocsWereWritten();

0 commit comments

Comments
 (0)