Skip to content

Commit a1463c6

Browse files
Do not abort CI Visibility spans dispatch on interrupt (#6926)
1 parent 652193c commit a1463c6

File tree

8 files changed

+145
-45
lines changed

8 files changed

+145
-45
lines changed

communication/src/main/java/datadog/communication/http/HttpRetryPolicy.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.communication.http;
22

3+
import java.io.IOException;
4+
import java.io.InterruptedIOException;
5+
import java.net.ConnectException;
36
import java.util.concurrent.ThreadLocalRandom;
47
import java.util.concurrent.TimeUnit;
58
import javax.annotation.Nullable;
@@ -34,7 +37,7 @@
3437
* instance.
3538
*/
3639
@NotThreadSafe
37-
public class HttpRetryPolicy {
40+
public class HttpRetryPolicy implements AutoCloseable {
3841

3942
private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);
4043

@@ -47,12 +50,35 @@ public class HttpRetryPolicy {
4750

4851
private int retriesLeft;
4952
private long delay;
53+
private boolean interrupted;
5054
private final double delayFactor;
55+
private final boolean suppressInterrupts;
5156

52-
private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
57+
private HttpRetryPolicy(
58+
int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
5359
this.retriesLeft = retriesLeft;
5460
this.delay = delay;
5561
this.delayFactor = delayFactor;
62+
this.suppressInterrupts = suppressInterrupts;
63+
}
64+
65+
public boolean shouldRetry(Exception e) {
66+
if (e instanceof ConnectException) {
67+
return shouldRetry((okhttp3.Response) null);
68+
}
69+
if (e instanceof InterruptedIOException) {
70+
if (suppressInterrupts) {
71+
return shouldRetry((okhttp3.Response) null);
72+
}
73+
}
74+
if (e instanceof InterruptedException) {
75+
if (suppressInterrupts) {
76+
// remember interrupted status to restore the thread's interrupted flag later
77+
interrupted = true;
78+
return shouldRetry((okhttp3.Response) null);
79+
}
80+
}
81+
return false;
5682
}
5783

5884
public boolean shouldRetry(@Nullable okhttp3.Response response) {
@@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
106132
}
107133
}
108134

109-
public long backoff() {
135+
long getBackoffDelay() {
110136
long currentDelay = delay;
111137
delay = (long) (delay * delayFactor);
112138
return currentDelay;
113139
}
114140

141+
public void backoff() throws IOException {
142+
try {
143+
Thread.sleep(getBackoffDelay());
144+
} catch (InterruptedException e) {
145+
if (suppressInterrupts) {
146+
// remember interrupted status to restore the thread's interrupted flag later
147+
interrupted = true;
148+
} else {
149+
Thread.currentThread().interrupt();
150+
throw new InterruptedIOException("thread interrupted");
151+
}
152+
}
153+
}
154+
155+
@Override
156+
public void close() {
157+
if (interrupted) {
158+
Thread.currentThread().interrupt();
159+
}
160+
}
161+
115162
public static class Factory {
116163
private final int maxRetries;
117164
private final long initialDelay;
118165
private final double delayFactor;
166+
private final boolean retryInterrupts;
119167

120168
public Factory(int maxRetries, int initialDelay, double delayFactor) {
169+
this(maxRetries, initialDelay, delayFactor, false);
170+
}
171+
172+
public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
121173
this.maxRetries = maxRetries;
122174
this.initialDelay = initialDelay;
123175
this.delayFactor = delayFactor;
176+
this.retryInterrupts = retryInterrupts;
124177
}
125178

126179
public HttpRetryPolicy create() {
127-
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
180+
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
128181
}
129182
}
130183
}

communication/src/main/java/datadog/communication/http/OkHttpUtils.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import datadog.trace.util.AgentProxySelector;
1212
import java.io.File;
1313
import java.io.IOException;
14-
import java.net.ConnectException;
1514
import java.net.InetSocketAddress;
1615
import java.net.Proxy;
1716
import java.nio.ByteBuffer;
@@ -357,30 +356,27 @@ public void writeTo(BufferedSink sink) throws IOException {
357356
}
358357

359358
public static Response sendWithRetries(
360-
OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
361-
while (true) {
362-
try {
363-
okhttp3.Response response = httpClient.newCall(request).execute();
364-
if (response.isSuccessful()) {
365-
return response;
359+
OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
360+
throws IOException {
361+
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
362+
while (true) {
363+
try {
364+
Response response = httpClient.newCall(request).execute();
365+
if (response.isSuccessful()) {
366+
return response;
367+
}
368+
if (!retryPolicy.shouldRetry(response)) {
369+
return response;
370+
} else {
371+
closeQuietly(response);
372+
}
373+
} catch (Exception ex) {
374+
if (!retryPolicy.shouldRetry(ex)) {
375+
throw ex;
376+
}
366377
}
367-
if (!retryPolicy.shouldRetry(response)) {
368-
return response;
369-
} else {
370-
closeQuietly(response);
371-
}
372-
} catch (ConnectException ex) {
373-
if (!retryPolicy.shouldRetry(null)) {
374-
throw ex;
375-
}
376-
}
377-
// If we get here, there has been an error, and we still have retries left
378-
long backoffMs = retryPolicy.backoff();
379-
try {
380-
Thread.sleep(backoffMs);
381-
} catch (InterruptedException e) {
382-
Thread.currentThread().interrupt();
383-
throw new IOException(e);
378+
// If we get here, there has been an error, and we still have retries left
379+
retryPolicy.backoff();
384380
}
385381
}
386382
}

communication/src/test/groovy/datadog/communication/http/HttpRetryPolicyTest.groovy

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {
1717

1818
when:
1919
while (retry <= maxRetries) {
20-
def shouldRetry = retryPolicy.shouldRetry()
20+
def shouldRetry = retryPolicy.shouldRetry((Response) null)
2121
shouldRetries << shouldRetry
2222
if (shouldRetry) {
23-
backoffs << retryPolicy.backoff()
23+
backoffs << retryPolicy.getBackoffDelay()
2424
}
2525
retry += 1
2626
}
@@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
4444
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()
4545

4646
def responseBuilder = new Response.Builder()
47-
.code(responseCode)
48-
.request(GroovyMock(Request))
49-
.protocol(Protocol.HTTP_1_1)
50-
.message("")
47+
.code(responseCode)
48+
.request(GroovyMock(Request))
49+
.protocol(Protocol.HTTP_1_1)
50+
.message("")
5151
if (rateLimitHeader != null) {
5252
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
5353
}
@@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
7373
500 | null | 5
7474
501 | null | 5
7575
}
76+
77+
def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
78+
setup:
79+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()
80+
81+
expect:
82+
retryPolicy.shouldRetry(exception) == shouldRetry
83+
84+
where:
85+
exception | suppressInterrupts | shouldRetry
86+
new NullPointerException() | false | false
87+
new IllegalArgumentException() | false | false
88+
new ConnectException() | false | true
89+
new InterruptedIOException() | false | false
90+
new InterruptedIOException() | true | true
91+
new InterruptedException() | false | false
92+
new InterruptedException() | true | true
93+
}
94+
95+
def "test interrupt flag is preserved when suppressing interrupts"() {
96+
setup:
97+
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()
98+
99+
when:
100+
retryPolicy.shouldRetry(new InterruptedException())
101+
retryPolicy.close()
102+
103+
then:
104+
Thread.interrupted()
105+
}
106+
107+
def "test interrupt flag is preserved if interrupted while backing off"() {
108+
setup:
109+
boolean[] b = new boolean[2]
110+
111+
Runnable r = () -> {
112+
def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
113+
retryPolicy.backoff()
114+
115+
b[0] = Thread.currentThread().isInterrupted()
116+
retryPolicy.close()
117+
b[1] = Thread.interrupted()
118+
}
119+
Thread t = new Thread(r, "test-http-retry-policy-interrupts")
120+
121+
when:
122+
t.start()
123+
t.interrupt()
124+
t.join()
125+
126+
then:
127+
!b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
128+
b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
129+
}
76130
}

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/BackendApiFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
2323
}
2424

2525
public @Nullable BackendApi createBackendApi() {
26-
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
26+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
2727

2828
if (config.isCiVisibilityAgentlessEnabled()) {
2929
HttpUrl agentlessUrl = getAgentlessUrl();

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/EvpProxyApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ public <T> T post(
8181

8282
final Request request = requestBuilder.post(requestBody).build();
8383

84-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8584
try (okhttp3.Response response =
86-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
85+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
8786
if (response.isSuccessful()) {
8887
log.debug("Request to {} returned successful response: {}", uri, response.code());
8988

dd-java-agent/agent-ci-visibility/src/main/java/datadog/trace/civisibility/communication/IntakeApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ public <T> T post(
8585
}
8686

8787
Request request = requestBuilder.build();
88-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
8988
try (okhttp3.Response response =
90-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
89+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
9190
if (response.isSuccessful()) {
9291
log.debug("Request to {} returned successful response: {}", uri, response.code());
9392
InputStream responseBodyStream = response.body().byteStream();

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public DDEvpProxyApi build() {
9292
? httpClient
9393
: OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis);
9494

95-
final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
95+
final HttpRetryPolicy.Factory retryPolicyFactory =
96+
new HttpRetryPolicy.Factory(5, 100, 2.0, true);
9697

9798
log.debug("proxiedApiUrl: {}", proxiedApiUrl);
9899
return new DDEvpProxyApi(
@@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) {
141142
totalTraces += payload.traceCount();
142143
receivedTraces += payload.traceCount();
143144

144-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
145145
try (okhttp3.Response response =
146-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
146+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
147147
if (response.isSuccessful()) {
148148
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
149149
return Response.success(response.code());

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder {
4343

4444
HttpUrl hostUrl = null;
4545
OkHttpClient httpClient = null;
46-
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
46+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
4747

4848
private String apiKey;
4949

@@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) {
134134
totalTraces += payload.traceCount();
135135
receivedTraces += payload.traceCount();
136136

137-
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
138137
try (okhttp3.Response response =
139-
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
138+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
140139
if (response.isSuccessful()) {
141140
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
142141
return Response.success(response.code());

0 commit comments

Comments
 (0)