Skip to content

Commit fa0ed65

Browse files
authored
fix(offchain): Avoid starvation in the offchain monitor (#4649)
This addresses both sides of the issue, by making sure the task holding the `CallAll` doesn't hang, and by removing the concurrency control done by `Buffer`, which may be the reason why PR #4570 didn't fully work.
1 parent 15941da commit fa0ed65

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

core/src/polling_monitor/ipfs_service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ pub fn ipfs_service(
3030
.service_fn(move |req| ipfs.cheap_clone().call_inner(req))
3131
.boxed();
3232

33-
// The `Buffer` makes it so the rate and concurrency limit are shared among clones.
34-
Buffer::new(svc, 1)
33+
// The `Buffer` makes it so the rate limit is shared among clones.
34+
// Make it unbounded to avoid any risk of starvation.
35+
Buffer::new(svc, u32::MAX as usize)
3536
}
3637

3738
#[derive(Clone)]

core/src/polling_monitor/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<T> Queue<T> {
9898
/// `Option`, to represent the object not being found.
9999
pub fn spawn_monitor<ID, S, E, Res: Send + 'static>(
100100
service: S,
101-
response_sender: mpsc::Sender<(ID, Res)>,
101+
response_sender: mpsc::UnboundedSender<(ID, Res)>,
102102
logger: Logger,
103103
metrics: PollingMonitorMetrics,
104104
) -> PollingMonitor<ID>
@@ -149,10 +149,13 @@ where
149149
let mut backoffs = Backoffs::new();
150150
let mut responses = service.call_all(queue_to_stream).unordered().boxed();
151151
while let Some(response) = responses.next().await {
152+
// Note: Be careful not to `await` within this loop, as that could block requests in
153+
// the `CallAll` from being polled. This can cause starvation as those requests may
154+
// be holding on to resources such as slots for concurrent calls.
152155
match response {
153156
Ok((id, Some(response))) => {
154157
backoffs.remove(&id);
155-
let send_result = response_sender.send((id, response)).await;
158+
let send_result = response_sender.send((id, response));
156159
if send_result.is_err() {
157160
// The receiver has been dropped, cancel this task.
158161
break;
@@ -250,10 +253,10 @@ mod tests {
250253
fn setup() -> (
251254
mock::Handle<&'static str, Option<&'static str>>,
252255
PollingMonitor<&'static str>,
253-
mpsc::Receiver<(&'static str, &'static str)>,
256+
mpsc::UnboundedReceiver<(&'static str, &'static str)>,
254257
) {
255258
let (svc, handle) = mock::pair();
256-
let (tx, rx) = mpsc::channel(10);
259+
let (tx, rx) = mpsc::unbounded_channel();
257260
let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock());
258261
(handle, monitor, rx)
259262
}
@@ -263,7 +266,7 @@ mod tests {
263266
let (svc, mut handle) = mock::pair();
264267
let shared_svc = tower::buffer::Buffer::new(tower::limit::ConcurrencyLimit::new(svc, 1), 1);
265268
let make_monitor = |svc| {
266-
let (tx, rx) = mpsc::channel(10);
269+
let (tx, rx) = mpsc::unbounded_channel();
267270
let metrics = PollingMonitorMetrics::mock();
268271
let monitor = spawn_monitor(svc, tx, log::discard(), metrics);
269272
(monitor, rx)

core/src/subgraph/context.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
185185

186186
pub struct OffchainMonitor {
187187
ipfs_monitor: PollingMonitor<CidFile>,
188-
ipfs_monitor_rx: mpsc::Receiver<(CidFile, Bytes)>,
188+
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
189189
}
190190

191191
impl OffchainMonitor {
@@ -195,7 +195,9 @@ impl OffchainMonitor {
195195
subgraph_hash: &DeploymentHash,
196196
ipfs_service: IpfsService,
197197
) -> Self {
198-
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::channel(10);
198+
// The channel is unbounded, as it is expected that `fn ready_offchain_events` is called
199+
// frequently, or at least with the same frequency that requests are sent.
200+
let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::unbounded_channel();
199201
let ipfs_monitor = spawn_monitor(
200202
ipfs_service,
201203
ipfs_monitor_tx,

0 commit comments

Comments
 (0)