Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ impl AgentInfoFetcher {

(fetcher, response_observer)
}

/// Drain message from the trigger channel.
pub fn drain(&mut self) {
// We read only once as the channel has a capacity of 1
if let Some(rx) = &mut self.trigger_rx {
let _ = rx.try_recv();
}
}
}

impl Worker for AgentInfoFetcher {
Expand Down Expand Up @@ -272,6 +280,11 @@ impl ResponseObserver {
}
}
}

/// Manually send a message to the trigger channel.
pub fn manual_trigger(&self) {
let _ = self.trigger_tx.try_send(());
}
}

#[cfg(test)]
Expand Down
11 changes: 9 additions & 2 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,14 @@ impl TraceExporter {
};
});
}
// Drop runtime to shutdown all threads
// When the info fetcher is paused, the trigger channel keeps a reference to the runtime's
// IoStack as a waker. This prevents the IoStack from being dropped when shutting
// down runtime. By manually sending a message to the trigger channel we trigger the
// waker releasing the reference to the IoStack.
if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
self.info_response_observer.manual_trigger();
worker.drain();
}
drop(runtime);
}

Expand Down Expand Up @@ -462,7 +469,7 @@ impl TraceExporter {
/// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
/// available.
///
/// The `send`` function will check agent_info when running, which will only be available if the
/// The `send` function will check agent_info when running, which will only be available if the
/// fetcher had time to reach to the agent.
/// Since agent_info can enable CSS computation, waiting for this during testing can make
/// snapshots non-deterministic.
Expand Down
Loading