Skip to content

Commit d765ba2

Browse files
authored
Merge pull request #170 from marklogic/feature/mle-12437
MLE-12437 Allow for OkHttp timeouts to be configured
2 parents 690e2dd + c51c510 commit d765ba2

File tree

4 files changed

+110
-4
lines changed

4 files changed

+110
-4
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ dependencies {
3333
exclude module: 'jackson-dataformat-csv'
3434
}
3535

36+
// Need this so that an OkHttpClientConfigurator can be created.
37+
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
38+
3639
// Makes it possible to use lambdas in Java 8 to implement Spark's Function1 and Function2 interfaces
3740
// See https://github.com/scala/scala-java8-compat for more information
3841
implementation("org.scala-lang.modules:scala-java8-compat_2.12:1.0.2") {

src/main/java/com/marklogic/spark/ContextSupport.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.DatabaseClientFactory;
20+
import com.marklogic.client.extra.okhttpclient.OkHttpClientConfigurator;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -25,14 +26,22 @@
2526
import java.net.URLDecoder;
2627
import java.util.HashMap;
2728
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
2830

2931
public class ContextSupport implements Serializable {
3032

3133
protected static final Logger logger = LoggerFactory.getLogger(ContextSupport.class);
3234
private final Map<String, String> properties;
35+
private final boolean configuratorWasAdded;
36+
37+
// Java Client 6.5.0 has a bug in it (to be fixed in 6.5.1 or 6.6.0) where multiple threads that use a configurator
38+
// can run into a ConcurrentModificationException. So need to synchronize adding a configurator and creating a
39+
// client. Those two actions are rarely done, so the cost of synchronization will be negligible.
40+
private static final Object CLIENT_LOCK = new Object();
3341

3442
protected ContextSupport(Map<String, String> properties) {
3543
this.properties = properties;
44+
this.configuratorWasAdded = addOkHttpConfiguratorIfNecessary();
3645
}
3746

3847
public DatabaseClient connectToMarkLogic() {
@@ -50,10 +59,12 @@ public DatabaseClient connectToMarkLogic(String host) {
5059
connectionProps.put(Options.CLIENT_HOST, host);
5160
}
5261
DatabaseClient client;
53-
try {
54-
client = DatabaseClientFactory.newClient(propertyName -> connectionProps.get("spark." + propertyName));
55-
} catch (Exception e) {
56-
throw new ConnectorException(String.format("Unable to connect to MarkLogic; cause: %s", e.getMessage()), e);
62+
if (configuratorWasAdded) {
63+
synchronized (CLIENT_LOCK) {
64+
client = connect(connectionProps);
65+
}
66+
} else {
67+
client = connect(connectionProps);
5768
}
5869
DatabaseClient.ConnectionResult result = client.checkConnection();
5970
if (!result.isConnected()) {
@@ -62,6 +73,14 @@ public DatabaseClient connectToMarkLogic(String host) {
6273
return client;
6374
}
6475

76+
private DatabaseClient connect(Map<String, String> connectionProps) {
77+
try {
78+
return DatabaseClientFactory.newClient(propertyName -> connectionProps.get("spark." + propertyName));
79+
} catch (Exception e) {
80+
throw new ConnectorException(String.format("Unable to connect to MarkLogic; cause: %s", e.getMessage()), e);
81+
}
82+
}
83+
6584
protected final Map<String, String> buildConnectionProperties() {
6685
Map<String, String> connectionProps = new HashMap<>();
6786
connectionProps.put("spark.marklogic.client.authType", "digest");
@@ -154,4 +173,37 @@ public final boolean hasOption(String... options) {
154173
public Map<String, String> getProperties() {
155174
return properties;
156175
}
176+
177+
/**
178+
* @return true if a configurator was added
179+
*/
180+
private boolean addOkHttpConfiguratorIfNecessary() {
181+
final String prefix = "spark.marklogic.client.";
182+
final long defaultValue = -1;
183+
final long connectionTimeout = getNumericOption(prefix + "connectionTimeout", defaultValue, defaultValue);
184+
final long callTimeout = getNumericOption(prefix + "callTimeout", defaultValue, defaultValue);
185+
final long readTimeout = getNumericOption(prefix + "connectionTimeout", defaultValue, defaultValue);
186+
final long writeTimeout = getNumericOption(prefix + "writeTimeout", defaultValue, defaultValue);
187+
188+
if (connectionTimeout > -1 || callTimeout > -1 || readTimeout > -1 || writeTimeout > -1) {
189+
synchronized (CLIENT_LOCK) {
190+
DatabaseClientFactory.addConfigurator((OkHttpClientConfigurator) builder -> {
191+
if (connectionTimeout > -1) {
192+
builder.connectTimeout(connectionTimeout, TimeUnit.SECONDS);
193+
}
194+
if (callTimeout > -1) {
195+
builder.callTimeout(callTimeout, TimeUnit.SECONDS);
196+
}
197+
if (readTimeout > -1) {
198+
builder.readTimeout(readTimeout, TimeUnit.SECONDS);
199+
}
200+
if (writeTimeout > -1) {
201+
builder.writeTimeout(writeTimeout, TimeUnit.SECONDS);
202+
}
203+
});
204+
}
205+
return true;
206+
}
207+
return false;
208+
}
157209
}

src/test/java/com/marklogic/spark/ContextSupportTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,13 @@
55
import org.junit.jupiter.api.Test;
66

77
import java.net.UnknownHostException;
8+
import java.util.ArrayList;
89
import java.util.HashMap;
10+
import java.util.List;
911
import java.util.Map;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.Future;
1015

1116
import static org.junit.jupiter.api.Assertions.*;
1217

@@ -42,4 +47,32 @@ void isDirectConnection() {
4247
options.put(Options.CLIENT_CONNECTION_TYPE, "gateway");
4348
assertFalse(new ContextSupport(options).isDirectConnection());
4449
}
50+
51+
@Test
52+
void createManyConnectionsWithOkHttpConfigurator() throws Exception {
53+
options.put(Options.CLIENT_URI, makeClientUri());
54+
options.put("spark.marklogic.client.callTimeout", "10");
55+
options.put("spark.marklogic.client.connectionTimeout", "10");
56+
options.put("spark.marklogic.client.readTimeout", "10");
57+
options.put("spark.marklogic.client.writeTimeout", "10");
58+
59+
final int clientsToCreate = 100;
60+
61+
ExecutorService service = Executors.newFixedThreadPool(16);
62+
List<Future> futures = new ArrayList<>();
63+
for (int i = 0; i < clientsToCreate; i++) {
64+
futures.add(service.submit(() -> new ContextSupport(options).connectToMarkLogic().release()));
65+
}
66+
67+
int clientsCreated = 0;
68+
for (Future f : futures) {
69+
f.get();
70+
clientsCreated++;
71+
}
72+
service.shutdown();
73+
74+
assertEquals(clientsToCreate, clientsCreated, "The expectation is that many clients can be created by " +
75+
"multiple threads at the same time due to the use of synchronization in ContextSupport. This won't be " +
76+
"necessary once we upgrade to Java Clien 6.5.1 or higher.");
77+
}
4578
}

src/test/java/com/marklogic/spark/reader/customcode/ReadWithCustomCodeTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.marklogic.spark.reader.customcode;
22

33
import com.marklogic.client.FailedRequestException;
4+
import com.marklogic.client.MarkLogicIOException;
45
import com.marklogic.spark.AbstractIntegrationTest;
56
import com.marklogic.spark.Options;
7+
import org.apache.spark.SparkException;
68
import org.apache.spark.sql.DataFrameReader;
79
import org.apache.spark.sql.Dataset;
810
import org.apache.spark.sql.Row;
@@ -131,6 +133,22 @@ void badJavascriptForPartitions() {
131133
assertTrue(ex.getCause() instanceof FailedRequestException, "Unexpected cause: " + ex.getCause());
132134
}
133135

136+
@Test
137+
void verifyTimeoutWorks() {
138+
Dataset<Row> dataset = startRead()
139+
.option(Options.READ_XQUERY, "(xdmp:sleep(1500), 'abc')")
140+
.option("spark.marklogic.client.callTimeout", "1")
141+
.load();
142+
143+
SparkException ex = assertThrows(SparkException.class, () -> dataset.count());
144+
assertTrue(ex.getCause() instanceof MarkLogicIOException, "Unexpected cause: " + ex.getCause());
145+
assertTrue(ex.getCause().getMessage().contains("timeout"),
146+
"Expecting a timeout due to the callTimeout being set to 1sec. Curiously, the Java Client does not " +
147+
"attempt a retry on this sort of exception; it only attempts a retry if an HTTP response is received. " +
148+
"Actual message: " + ex.getCause().getMessage());
149+
}
150+
151+
134152
private List<Row> readRows(String option, String value) {
135153
return startRead()
136154
.option(option, value)

0 commit comments

Comments
 (0)