17
17
18
18
import com .marklogic .client .DatabaseClient ;
19
19
import com .marklogic .client .DatabaseClientFactory ;
20
+ import com .marklogic .client .extra .okhttpclient .OkHttpClientConfigurator ;
20
21
import org .slf4j .Logger ;
21
22
import org .slf4j .LoggerFactory ;
22
23
25
26
import java .net .URLDecoder ;
26
27
import java .util .HashMap ;
27
28
import java .util .Map ;
29
+ import java .util .concurrent .TimeUnit ;
28
30
29
31
public class ContextSupport implements Serializable {
30
32
31
33
protected static final Logger logger = LoggerFactory .getLogger (ContextSupport .class );
32
34
private final Map <String , String > properties ;
35
+ private final boolean configuratorWasAdded ;
36
+
37
+ // Java Client 6.5.0 has a bug in it (to be fixed in 6.5.1 or 6.6.0) where multiple threads that use a configurator
38
+ // can run into a ConcurrentModificationException. So need to synchronize adding a configurator and creating a
39
+ // client. Those two actions are rarely done, so the cost of synchronization will be negligible.
40
+ private static final Object CLIENT_LOCK = new Object ();
33
41
34
42
protected ContextSupport (Map <String , String > properties ) {
35
43
this .properties = properties ;
44
+ this .configuratorWasAdded = addOkHttpConfiguratorIfNecessary ();
36
45
}
37
46
38
47
public DatabaseClient connectToMarkLogic () {
@@ -50,10 +59,12 @@ public DatabaseClient connectToMarkLogic(String host) {
50
59
connectionProps .put (Options .CLIENT_HOST , host );
51
60
}
52
61
DatabaseClient client ;
53
- try {
54
- client = DatabaseClientFactory .newClient (propertyName -> connectionProps .get ("spark." + propertyName ));
55
- } catch (Exception e ) {
56
- throw new ConnectorException (String .format ("Unable to connect to MarkLogic; cause: %s" , e .getMessage ()), e );
62
+ if (configuratorWasAdded ) {
63
+ synchronized (CLIENT_LOCK ) {
64
+ client = connect (connectionProps );
65
+ }
66
+ } else {
67
+ client = connect (connectionProps );
57
68
}
58
69
DatabaseClient .ConnectionResult result = client .checkConnection ();
59
70
if (!result .isConnected ()) {
@@ -62,6 +73,14 @@ public DatabaseClient connectToMarkLogic(String host) {
62
73
return client ;
63
74
}
64
75
76
+ private DatabaseClient connect (Map <String , String > connectionProps ) {
77
+ try {
78
+ return DatabaseClientFactory .newClient (propertyName -> connectionProps .get ("spark." + propertyName ));
79
+ } catch (Exception e ) {
80
+ throw new ConnectorException (String .format ("Unable to connect to MarkLogic; cause: %s" , e .getMessage ()), e );
81
+ }
82
+ }
83
+
65
84
protected final Map <String , String > buildConnectionProperties () {
66
85
Map <String , String > connectionProps = new HashMap <>();
67
86
connectionProps .put ("spark.marklogic.client.authType" , "digest" );
@@ -154,4 +173,37 @@ public final boolean hasOption(String... options) {
154
173
public Map <String , String > getProperties () {
155
174
return properties ;
156
175
}
176
+
177
+ /**
178
+ * @return true if a configurator was added
179
+ */
180
+ private boolean addOkHttpConfiguratorIfNecessary () {
181
+ final String prefix = "spark.marklogic.client." ;
182
+ final long defaultValue = -1 ;
183
+ final long connectionTimeout = getNumericOption (prefix + "connectionTimeout" , defaultValue , defaultValue );
184
+ final long callTimeout = getNumericOption (prefix + "callTimeout" , defaultValue , defaultValue );
185
+ final long readTimeout = getNumericOption (prefix + "connectionTimeout" , defaultValue , defaultValue );
186
+ final long writeTimeout = getNumericOption (prefix + "writeTimeout" , defaultValue , defaultValue );
187
+
188
+ if (connectionTimeout > -1 || callTimeout > -1 || readTimeout > -1 || writeTimeout > -1 ) {
189
+ synchronized (CLIENT_LOCK ) {
190
+ DatabaseClientFactory .addConfigurator ((OkHttpClientConfigurator ) builder -> {
191
+ if (connectionTimeout > -1 ) {
192
+ builder .connectTimeout (connectionTimeout , TimeUnit .SECONDS );
193
+ }
194
+ if (callTimeout > -1 ) {
195
+ builder .callTimeout (callTimeout , TimeUnit .SECONDS );
196
+ }
197
+ if (readTimeout > -1 ) {
198
+ builder .readTimeout (readTimeout , TimeUnit .SECONDS );
199
+ }
200
+ if (writeTimeout > -1 ) {
201
+ builder .writeTimeout (writeTimeout , TimeUnit .SECONDS );
202
+ }
203
+ });
204
+ }
205
+ return true ;
206
+ }
207
+ return false ;
208
+ }
157
209
}
0 commit comments