File tree Expand file tree Collapse file tree 2 files changed +21
-4
lines changed
main/java/com/marklogic/spark/writer
test/java/com/marklogic/spark/reader/document Expand file tree Collapse file tree 2 files changed +21
-4
lines changed Original file line number Diff line number Diff line change 25
25
import com .marklogic .spark .Util ;
26
26
import org .apache .spark .sql .types .StructType ;
27
27
28
- import java .util .ArrayList ;
29
28
import java .util .Arrays ;
30
29
import java .util .List ;
31
30
import java .util .Map ;
@@ -69,9 +68,9 @@ WriteBatcher newWriteBatcher(DataMovementManager dataMovementManager) {
69
68
// WriteBatcherImpl has its own warn-level logging which is a bit verbose, including more than just the
70
69
// message from the server. This is intended to always show up and be associated with our Spark connector
71
70
// and also to be more brief, just capturing the main message from the server.
72
- .onBatchFailure ((( batch , failure ) -> {
73
- Util .MAIN_LOGGER .error ("Failed to write documents: {}" , failure .getMessage ());
74
- }) );
71
+ .onBatchFailure ((batch , failure ) ->
72
+ Util .MAIN_LOGGER .error ("Failed to write documents: {}" , failure .getMessage ())
73
+ );
75
74
76
75
if (logger .isDebugEnabled ()) {
77
76
writeBatcher .onBatchSuccess (this ::logBatchOnSuccess );
Original file line number Diff line number Diff line change @@ -50,6 +50,24 @@ void invalidValue() {
50
50
assertEquals ("Value of 'spark.marklogic.read.documents.partitionsPerForest' option must be numeric." , ex .getMessage ());
51
51
}
52
52
53
+ @ ParameterizedTest
54
+ @ ValueSource (strings = {
55
+ "{\" ctsquery\" : {\" collectionQuery\" : {\" uris\" : [\" author\" ]}}}" ,
56
+ "{\" query\" : {\" collection-query\" : {\" uri\" : [\" author\" ]}}}" ,
57
+ "{\" search\" : {\" query\" : {\" collection-query\" : {\" uri\" : [\" author\" ]}}}}"
58
+ })
59
+ void complexQuery (String query ) {
60
+ long count = newSparkSession ().read ()
61
+ .format (CONNECTOR_IDENTIFIER )
62
+ .option (Options .CLIENT_URI , makeClientUri ())
63
+ .option (Options .READ_DOCUMENTS_QUERY , query )
64
+ .option (Options .READ_DOCUMENTS_PARTITIONS_PER_FOREST , 3 )
65
+ .load ()
66
+ .count ();
67
+
68
+ assertEquals (15 , count , "Unexpected count for query: " + query );
69
+ }
70
+
53
71
private Dataset <Row > readAuthors (int partitionsPerForest ) {
54
72
return newSparkSession ().read ()
55
73
.format (CONNECTOR_IDENTIFIER )
You can’t perform that action at this time.
0 commit comments