Skip to content

Commit 5ed867e

Browse files
committed
Fixes for Polaris race conditions
NodeConverter had a large amount of dead code - including a race condition! - that was deleted.
1 parent 6aeda28 commit 5ed867e

File tree

6 files changed

+67
-224
lines changed

6 files changed

+67
-224
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,8 @@ public void run() {
10041004
}
10051005
};
10061006
threadPool.execute(processBatch);
1007+
// Polaris warns of a race condition here, presumably based on the check on remainingCapacity. But the
1008+
// cost of executing a needless IteratorTask is small enough to accept the possible race condition.
10071009
// If the queue is almost full, stop producing and add a task to continue later
10081010
if (isSingleThreaded && threadPool.getQueue().remainingCapacity() <= 2 && iterator.hasNext()) {
10091011
threadPool.execute(new IteratorTask(batcher));

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/RowBatcherImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,9 @@ public synchronized void start(JobTicket ticket) {
381381
// not the same value that a user would have provided via withBatchSize. And we don't want to log it when it's
382382
// -1, which will be the case for a single batch.
383383
if (logger.isDebugEnabled() && this.batchSize > 0) {
384-
logger.debug("batch count: {}, calculated batch size: {}", batchCount, batchSize);
384+
logger.debug("batch count: {}, calculated batch size: {}", this.batchCount, this.batchSize);
385385
} else {
386-
logger.info("batch count: {}", batchCount);
386+
logger.info("batch count: {}", this.batchCount);
387387
}
388388

389389
if (this.hostInfos != null && getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {

marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOEndpointImpl.java

-3
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,6 @@ static class CallerThreadPoolExecutor<I,O> extends ThreadPoolExecutor {
300300
this.bulkIOEndpointCaller = bulkIOEndpointCaller;
301301
}
302302

303-
Boolean isAwaitingTermination() {
304-
return this.awaitingTermination;
305-
}
306303
synchronized void awaitTermination() throws InterruptedException {
307304
if (bulkIOEndpointCaller.getCallContextQueue().isEmpty() && getActiveCount()<=1) {
308305
shutdown();

marklogic-client-api/src/main/java/com/marklogic/client/impl/HTTPSamlAuthInterceptor.java

+41-26
Original file line numberDiff line numberDiff line change
@@ -4,62 +4,77 @@
44

55
package com.marklogic.client.impl;
66

7-
import java.io.IOException;
8-
import java.time.Instant;
9-
import java.util.concurrent.Executors;
10-
import java.util.concurrent.atomic.AtomicBoolean;
11-
127
import com.marklogic.client.DatabaseClientFactory.SAMLAuthContext.AuthorizerCallback;
138
import com.marklogic.client.DatabaseClientFactory.SAMLAuthContext.ExpiringSAMLAuth;
149
import com.marklogic.client.DatabaseClientFactory.SAMLAuthContext.RenewerCallback;
15-
1610
import okhttp3.Interceptor;
1711
import okhttp3.Request;
1812
import okhttp3.Response;
1913

14+
import java.io.IOException;
15+
import java.time.Instant;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
2019
public class HTTPSamlAuthInterceptor implements Interceptor {
2120

22-
private String authorizationTokenValue;
23-
private AuthorizerCallback authorizer;
21+
private final AuthorizerCallback authorizer;
22+
private final RenewerCallback renewer;
23+
24+
private String authorizationTokenValue;
2425
private ExpiringSAMLAuth expiringSAMLAuth;
2526
private long threshold;
26-
private RenewerCallback renewer;
2727
private AtomicBoolean isCallbackExecuting;
2828

2929
public HTTPSamlAuthInterceptor(String authToken) {
30-
this.authorizationTokenValue = authToken;
30+
this.authorizationTokenValue = authToken;
31+
this.authorizer = null;
32+
this.renewer = null;
3133
}
3234

3335
public HTTPSamlAuthInterceptor(AuthorizerCallback authorizer) {
3436
this.authorizer = authorizer;
37+
this.renewer = null;
3538
}
3639

3740
public HTTPSamlAuthInterceptor(ExpiringSAMLAuth authorization, RenewerCallback renew) {
3841
expiringSAMLAuth = authorization;
3942
renewer = renew;
4043
isCallbackExecuting = new AtomicBoolean(false);
44+
this.authorizer = null;
4145
}
4246

4347
@Override
4448
public Response intercept(Chain chain) throws IOException {
45-
Request request = chain.request();
46-
if (authorizer != null) {
47-
if(expiringSAMLAuth == null) {
48-
authorizeCallbackWrapper(null);
49-
} else if(threshold<=Instant.now().getEpochSecond()){
50-
authorizeCallbackWrapper(expiringSAMLAuth.getExpiry());
51-
}
52-
} else if (renewer != null && threshold <= Instant.now().getEpochSecond() && isCallbackExecuting.compareAndSet(false, true)) {
53-
RenewCallbackWrapper renewCallbackWrapper = new RenewCallbackWrapper(expiringSAMLAuth);
54-
Executors.defaultThreadFactory().newThread(renewCallbackWrapper).start();
55-
}
56-
String samlHeaderValue = RESTServices.AUTHORIZATION_TYPE_SAML + " " + RESTServices.AUTHORIZATION_PARAM_TOKEN
57-
+ "=" + authorizationTokenValue;
58-
Request authenticatedRequest = request.newBuilder().header(RESTServices.HEADER_AUTHORIZATION, samlHeaderValue)
59-
.build();
60-
return chain.proceed(authenticatedRequest);
49+
if (authorizer != null) {
50+
authorizeRequest();
51+
} else if (renewer != null && threshold <= Instant.now().getEpochSecond() && isCallbackExecuting.compareAndSet(false, true)) {
52+
RenewCallbackWrapper renewCallbackWrapper = new RenewCallbackWrapper(expiringSAMLAuth);
53+
Executors.defaultThreadFactory().newThread(renewCallbackWrapper).start();
54+
}
55+
56+
Request authenticatedRequest = chain.request().newBuilder()
57+
.header(RESTServices.HEADER_AUTHORIZATION, buildSamlHeader())
58+
.build();
59+
60+
return chain.proceed(authenticatedRequest);
6161
}
6262

63+
private synchronized void authorizeRequest() {
64+
if (expiringSAMLAuth == null) {
65+
authorizeCallbackWrapper(null);
66+
} else if (threshold <= Instant.now().getEpochSecond()) {
67+
authorizeCallbackWrapper(expiringSAMLAuth.getExpiry());
68+
}
69+
}
70+
71+
private synchronized String buildSamlHeader() {
72+
return String.format("%s %s=%s",
73+
RESTServices.AUTHORIZATION_TYPE_SAML,
74+
RESTServices.AUTHORIZATION_PARAM_TOKEN,
75+
this.authorizationTokenValue);
76+
}
77+
6378
private synchronized void authorizeCallbackWrapper(Instant expiry) {
6479

6580
if(expiry == null && expiringSAMLAuth != null) {

0 commit comments

Comments
 (0)