Skip to content

Commit 75639e3

Browse files
[🍒 7325] Fix tracer freeze when CI Visibility is enabled (#7335)
1 parent 1914f03 commit 75639e3

File tree

4 files changed

+64
-111
lines changed

4 files changed

+64
-111
lines changed

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public static class DDAgentWriterBuilder {
3939
Monitoring monitoring = Monitoring.DISABLED;
4040
boolean traceAgentV05Enabled = Config.get().isTraceAgentV05Enabled();
4141
boolean metricsReportingEnabled = Config.get().isTracerMetricsEnabled();
42+
private int flushTimeout = 1;
43+
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
4244
boolean alwaysFlush = false;
4345

4446
private DDAgentApi agentApi;
@@ -116,6 +118,12 @@ public DDAgentWriterBuilder featureDiscovery(DDAgentFeaturesDiscovery featureDis
116118
return this;
117119
}
118120

121+
public DDAgentWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
122+
this.flushTimeout = flushTimeout;
123+
this.flushTimeoutUnit = flushTimeoutUnit;
124+
return this;
125+
}
126+
119127
public DDAgentWriterBuilder alwaysFlush(boolean alwaysFlush) {
120128
this.alwaysFlush = alwaysFlush;
121129
return this;
@@ -157,15 +165,23 @@ public DDAgentWriter build() {
157165
singleSpanSampler,
158166
null);
159167

160-
return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
168+
return new DDAgentWriter(
169+
traceProcessingWorker,
170+
dispatcher,
171+
healthMetrics,
172+
flushTimeout,
173+
flushTimeoutUnit,
174+
alwaysFlush);
161175
}
162176
}
163177

164178
DDAgentWriter(
165179
TraceProcessingWorker worker,
166180
PayloadDispatcher dispatcher,
167181
HealthMetrics healthMetrics,
182+
int flushTimeout,
183+
TimeUnit flushTimeoutUnit,
168184
boolean alwaysFlush) {
169-
super(worker, dispatcher, healthMetrics, alwaysFlush);
185+
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
170186
}
171187
}

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 31 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -74,26 +74,16 @@ public TraceProcessingWorker(
7474
spanSamplingWorker.getSpanSamplingQueue(),
7575
droppingPolicy);
7676

77-
boolean runAsDaemon = !Config.get().isCiVisibilityEnabled();
7877
this.serializingHandler =
79-
runAsDaemon
80-
? new DaemonTraceSerializingHandler(
81-
primaryQueue,
82-
secondaryQueue,
83-
healthMetrics,
84-
dispatcher,
85-
flushInterval,
86-
timeUnit,
87-
spanPostProcessor)
88-
: new NonDaemonTraceSerializingHandler(
89-
primaryQueue,
90-
secondaryQueue,
91-
healthMetrics,
92-
dispatcher,
93-
flushInterval,
94-
timeUnit,
95-
spanPostProcessor);
96-
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
78+
new TraceSerializingHandler(
79+
primaryQueue,
80+
secondaryQueue,
81+
healthMetrics,
82+
dispatcher,
83+
flushInterval,
84+
timeUnit,
85+
spanPostProcessor);
86+
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler);
9787
}
9888

9989
public void start() {
@@ -144,91 +134,7 @@ private static MpscBlockingConsumerArrayQueue<Object> createQueue(int capacity)
144134
return new MpscBlockingConsumerArrayQueue<>(capacity);
145135
}
146136

147-
private static class DaemonTraceSerializingHandler extends TraceSerializingHandler {
148-
public DaemonTraceSerializingHandler(
149-
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
150-
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
151-
HealthMetrics healthMetrics,
152-
PayloadDispatcher payloadDispatcher,
153-
long flushInterval,
154-
TimeUnit timeUnit,
155-
SpanPostProcessor spanPostProcessor) {
156-
super(
157-
primaryQueue,
158-
secondaryQueue,
159-
healthMetrics,
160-
payloadDispatcher,
161-
flushInterval,
162-
timeUnit,
163-
spanPostProcessor);
164-
}
165-
166-
@Override
167-
public void run() {
168-
try {
169-
runDutyCycle();
170-
} catch (InterruptedException e) {
171-
Thread.currentThread().interrupt();
172-
}
173-
log.debug("Datadog trace processor exited. Publishing traces stopped");
174-
}
175-
176-
private void runDutyCycle() throws InterruptedException {
177-
Thread thread = Thread.currentThread();
178-
while (!thread.isInterrupted()) {
179-
consumeFromPrimaryQueue();
180-
consumeFromSecondaryQueue();
181-
flushIfNecessary();
182-
}
183-
}
184-
}
185-
186-
private static class NonDaemonTraceSerializingHandler extends TraceSerializingHandler {
187-
private static final double SHUTDOWN_TIMEOUT_MILLIS = 5_000;
188-
private Long shutdownSignalTimestamp;
189-
190-
public NonDaemonTraceSerializingHandler(
191-
MpscBlockingConsumerArrayQueue<Object> primaryQueue,
192-
MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
193-
HealthMetrics healthMetrics,
194-
PayloadDispatcher payloadDispatcher,
195-
long flushInterval,
196-
TimeUnit timeUnit,
197-
SpanPostProcessor spanPostProcessor) {
198-
super(
199-
primaryQueue,
200-
secondaryQueue,
201-
healthMetrics,
202-
payloadDispatcher,
203-
flushInterval,
204-
timeUnit,
205-
spanPostProcessor);
206-
}
207-
208-
@Override
209-
public void run() {
210-
while (!shouldShutdown()) {
211-
try {
212-
consumeFromPrimaryQueue();
213-
consumeFromSecondaryQueue();
214-
flushIfNecessary();
215-
} catch (InterruptedException e) {
216-
if (shutdownSignalTimestamp == null) {
217-
shutdownSignalTimestamp = System.currentTimeMillis();
218-
}
219-
}
220-
}
221-
log.debug("Datadog trace processor exited. Unpublished traces left: " + !queuesAreEmpty());
222-
}
223-
224-
private boolean shouldShutdown() {
225-
return shutdownSignalTimestamp != null
226-
&& (shutdownSignalTimestamp + SHUTDOWN_TIMEOUT_MILLIS <= System.currentTimeMillis()
227-
|| queuesAreEmpty());
228-
}
229-
}
230-
231-
public abstract static class TraceSerializingHandler implements Runnable {
137+
public static class TraceSerializingHandler implements Runnable {
232138

233139
private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
234140
private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
@@ -261,6 +167,27 @@ public TraceSerializingHandler(
261167
this.spanPostProcessor = spanPostProcessor;
262168
}
263169

170+
@Override
171+
public void run() {
172+
try {
173+
runDutyCycle();
174+
} catch (InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
}
177+
log.debug(
178+
"Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: "
179+
+ !queuesAreEmpty());
180+
}
181+
182+
private void runDutyCycle() throws InterruptedException {
183+
Thread thread = Thread.currentThread();
184+
while (!thread.isInterrupted()) {
185+
consumeFromPrimaryQueue();
186+
consumeFromSecondaryQueue();
187+
flushIfNecessary();
188+
}
189+
}
190+
264191
@SuppressWarnings("unchecked")
265192
public void onEvent(Object event) {
266193
// publish an incomplete batch if

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import datadog.trace.common.writer.ddintake.DDIntakeTrackTypeResolver;
2525
import datadog.trace.core.monitor.HealthMetrics;
2626
import datadog.trace.util.Strings;
27+
import java.util.concurrent.TimeUnit;
2728
import okhttp3.HttpUrl;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -105,6 +106,10 @@ public static Writer createWriter(
105106
.singleSpanSampler(singleSpanSampler)
106107
.flushIntervalMilliseconds(flushIntervalMilliseconds);
107108

109+
if (config.isCiVisibilityEnabled()) {
110+
builder.flushTimeout(5, TimeUnit.SECONDS);
111+
}
112+
108113
if (config.isCiVisibilityCodeCoverageEnabled()) {
109114
final RemoteApi coverageApi =
110115
createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.CITESTCOV);
@@ -140,7 +145,7 @@ public static Writer createWriter(
140145
ddAgentApi.addResponseListener((RemoteResponseListener) sampler);
141146
}
142147

143-
remoteWriter =
148+
DDAgentWriter.DDAgentWriterBuilder builder =
144149
DDAgentWriter.builder()
145150
.agentApi(ddAgentApi)
146151
.featureDiscovery(featuresDiscovery)
@@ -149,8 +154,13 @@ public static Writer createWriter(
149154
.monitoring(commObjects.monitoring)
150155
.alwaysFlush(alwaysFlush)
151156
.spanSamplingRules(singleSpanSampler)
152-
.flushIntervalMilliseconds(flushIntervalMilliseconds)
153-
.build();
157+
.flushIntervalMilliseconds(flushIntervalMilliseconds);
158+
159+
if (config.isCiVisibilityEnabled()) {
160+
builder.flushTimeout(5, TimeUnit.SECONDS);
161+
}
162+
163+
remoteWriter = builder.build();
154164
}
155165

156166
return remoteWriter;

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
3535
def dispatcher = new PayloadDispatcherImpl(new DDAgentMapperDiscovery(discovery), api, monitor, monitoring)
3636

3737
@Subject
38-
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
38+
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)
3939

4040
// Only used to create spans
4141
def dummyTracer = tracerBuilder().writer(new ListWriter()).build()
@@ -176,7 +176,7 @@ class DDAgentWriterTest extends DDCoreSpecification {
176176
def worker = Mock(TraceProcessingWorker)
177177
def monitor = Stub(HealthMetrics)
178178
def dispatcher = Mock(PayloadDispatcherImpl)
179-
def writer = new DDAgentWriter(worker, dispatcher, monitor, false)
179+
def writer = new DDAgentWriter(worker, dispatcher, monitor, 1, TimeUnit.SECONDS, false)
180180
def p0 = newSpan()
181181
p0.setSamplingPriority(PrioritySampling.SAMPLER_DROP)
182182
def trace = [p0, newSpan()]

0 commit comments

Comments
 (0)