Skip to content

Commit 78f7873

Browse files
authored
Merge pull request #163 from marklogic/feature/12219-forest-hosts
MLE-12219 Can now create direct connections to hosts
2 parents 3d90acd + 32a709d commit 78f7873

File tree

7 files changed

+102
-9
lines changed

7 files changed

+102
-9
lines changed

docs/reading-data/documents.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,9 @@ with more partition readers and a higher batch size.
210210
You can also adjust the level of parallelism by controlling how many threads Spark uses for executing partition reads.
211211
Please see your Spark distribution's documentation for further information.
212212

213+
### Direct connections to hosts
214+
215+
If your Spark program is able to connect to each host in your MarkLogic cluster, you can set the
216+
`spark.marklogic.client.connectionType` option to `direct`. Each partition reader will then connect to the
217+
host on which the reader's assigned forest resides. This will typically improve performance by reducing the network
218+
traffic, as the host that receives a request will not need to involve any other host in the processing of that request.

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,19 @@ protected ContextSupport(Map<String, String> properties) {
3636
}
3737

3838
public DatabaseClient connectToMarkLogic() {
39+
return connectToMarkLogic(null);
40+
}
41+
42+
/**
43+
* @param host if not null, overrides the user-defined host. Used for direct connections when a load balancer is
44+
* not in front of MarkLogic.
45+
* @return
46+
*/
47+
public DatabaseClient connectToMarkLogic(String host) {
3948
Map<String, String> connectionProps = buildConnectionProperties();
49+
if (host != null) {
50+
connectionProps.put(Options.CLIENT_HOST, host);
51+
}
4052
DatabaseClient client;
4153
try {
4254
client = DatabaseClientFactory.newClient(propertyName -> connectionProps.get("spark." + propertyName));
@@ -130,6 +142,11 @@ public final boolean isAbortOnFailure() {
130142
return !"false".equalsIgnoreCase(getProperties().get(Options.WRITE_ABORT_ON_FAILURE));
131143
}
132144

145+
public final boolean isDirectConnection() {
146+
String value = getProperties().get(Options.CLIENT_CONNECTION_TYPE);
147+
return value != null && value.equalsIgnoreCase(DatabaseClient.ConnectionType.DIRECT.name());
148+
}
149+
133150
public final boolean hasOption(String... options) {
134151
return Util.hasOption(this.properties, options);
135152
}

src/main/java/com/marklogic/spark/reader/document/ForestPartition.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
package com.marklogic.spark.reader.document;
22

3+
import com.marklogic.client.datamovement.Forest;
34
import org.apache.spark.sql.connector.read.InputPartition;
45

56
class ForestPartition implements InputPartition {
67

78
static final long serialVersionUID = 1;
89

910
private final String forestName;
11+
private final String host;
1012
private final long serverTimestamp;
1113
private final Long offsetStart;
1214
private final Long offsetEnd;
1315

14-
ForestPartition(String forestName, long serverTimestamp, Long offsetStart, Long offsetEnd) {
15-
this.forestName = forestName;
16+
ForestPartition(Forest forest, long serverTimestamp, Long offsetStart, Long offsetEnd) {
17+
this.forestName = forest.getForestName();
18+
this.host = forest.getHost();
1619
this.serverTimestamp = serverTimestamp;
1720
this.offsetStart = offsetStart;
1821
this.offsetEnd = offsetEnd;
@@ -22,6 +25,10 @@ String getForestName() {
2225
return forestName;
2326
}
2427

28+
String getHost() {
29+
return host;
30+
}
31+
2532
long getServerTimestamp() {
2633
return serverTimestamp;
2734
}

src/main/java/com/marklogic/spark/reader/document/ForestPartitionPlanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ ForestPartition[] makePartitions(long estimate, long serverTimestamp, Forest...
2828
break;
2929
}
3030
Long offsetEnd = j < (partitionsPerForest - 1) ? (urisPerPartition + offset - 1) : null;
31-
partitions.add(new ForestPartition(forest.getForestName(), serverTimestamp, offset, offsetEnd));
31+
partitions.add(new ForestPartition(forest, serverTimestamp, offset, offsetEnd));
3232
offset += urisPerPartition;
3333
}
3434
}

src/main/java/com/marklogic/spark/reader/document/ForestReader.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,17 @@ class ForestReader implements PartitionReader<InternalRow> {
5555
private int docCount;
5656

5757
ForestReader(ForestPartition forestPartition, DocumentContext context) {
58-
if (logger.isDebugEnabled()) {
59-
logger.debug("Will read from partition: {}", forestPartition);
60-
}
6158
this.forestPartition = forestPartition;
6259
this.limit = context.getLimit();
6360

64-
DatabaseClient client = context.connectToMarkLogic();
61+
DatabaseClient client = context.isDirectConnection() ?
62+
context.connectToMarkLogic(forestPartition.getHost()) :
63+
context.connectToMarkLogic();
64+
65+
if (logger.isDebugEnabled()) {
66+
logger.debug("Will read from host {} for partition: {}", client.getHost(), forestPartition);
67+
}
68+
6569
SearchQueryDefinition query = context.buildSearchQuery(client);
6670
this.uriBatcher = new UriBatcher(client, query, forestPartition, context.getBatchSize(), false);
6771

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.marklogic.spark;
2+
3+
import com.marklogic.client.DatabaseClient;
4+
import com.marklogic.client.MarkLogicIOException;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.net.UnknownHostException;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
import static org.junit.jupiter.api.Assertions.*;
12+
13+
class ContextSupportTest extends AbstractIntegrationTest {
14+
15+
private Map<String, String> options = new HashMap<>();
16+
17+
/**
18+
* For automated tests, we only have the single-node MarkLogic cluster and thus can't verify that direct
19+
* connections really work. But this test at least verifies that the "host" input overrides what is in the
20+
* options.
21+
*/
22+
@Test
23+
void directConnectionToHost() {
24+
options.put(Options.CLIENT_URI, makeClientUri());
25+
26+
ContextSupport support = new ContextSupport(options);
27+
DatabaseClient client = support.connectToMarkLogic(testConfig.getHost());
28+
assertTrue(client.checkConnection().isConnected());
29+
30+
MarkLogicIOException ex = assertThrows(MarkLogicIOException.class,
31+
() -> support.connectToMarkLogic("invalid-host"));
32+
assertTrue(ex.getCause() instanceof UnknownHostException);
33+
}
34+
35+
@Test
36+
void isDirectConnection() {
37+
assertFalse(new ContextSupport(options).isDirectConnection());
38+
39+
options.put(Options.CLIENT_CONNECTION_TYPE, "direct");
40+
assertTrue(new ContextSupport(options).isDirectConnection());
41+
42+
options.put(Options.CLIENT_CONNECTION_TYPE, "gateway");
43+
assertFalse(new ContextSupport(options).isDirectConnection());
44+
}
45+
}

src/test/java/com/marklogic/spark/reader/document/ReadDocumentRowsTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import com.fasterxml.jackson.databind.JsonNode;
44
import com.fasterxml.jackson.databind.node.ObjectNode;
5+
import com.marklogic.client.MarkLogicIOException;
56
import com.marklogic.spark.AbstractIntegrationTest;
67
import com.marklogic.spark.ConnectorException;
78
import com.marklogic.spark.Options;
9+
import org.apache.spark.SparkException;
810
import org.apache.spark.sql.DataFrameReader;
911
import org.apache.spark.sql.Dataset;
1012
import org.apache.spark.sql.Row;
@@ -14,8 +16,7 @@
1416
import java.util.List;
1517
import java.util.stream.Collectors;
1618

17-
import static org.junit.jupiter.api.Assertions.assertEquals;
18-
import static org.junit.jupiter.api.Assertions.assertTrue;
19+
import static org.junit.jupiter.api.Assertions.*;
1920

2021
class ReadDocumentRowsTest extends AbstractIntegrationTest {
2122

@@ -36,6 +37,19 @@ void readByCollection() {
3637
assertEquals("Vivianne", doc.get("ForeName").asText());
3738
}
3839

40+
@Test
41+
void readViaDirectConnect() {
42+
Dataset<Row> rows = startRead()
43+
.option(Options.READ_DOCUMENTS_COLLECTIONS, "author")
44+
.option(Options.CLIENT_CONNECTION_TYPE, "direct")
45+
.load();
46+
47+
SparkException ex = assertThrows(SparkException.class, () -> rows.count());
48+
assertTrue(ex.getCause() instanceof MarkLogicIOException, "This test is expected to fail when run against " +
49+
"the cluster created by docker-compose.yaml, as the host name of the MarkLogic cluster is not expected " +
50+
"to be accessible. Actual exception: " + ex.getCause());
51+
}
52+
3953
@Test
4054
void invalidBatchSize() {
4155
// Verify batch size doesn't cause an error.

0 commit comments

Comments
 (0)