Skip to content

Commit a034ccc

Browse files
committed
Fix tokio task leak
1 parent c177d5b commit a034ccc

File tree

2 files changed

+46
-15
lines changed

2 files changed

+46
-15
lines changed

src/agent/services/oracle.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use {
1313
},
1414
state::oracle::Oracle,
1515
},
16-
anyhow::Result,
16+
anyhow::{Context, Result},
1717
solana_account_decoder::UiAccountEncoding,
1818
solana_client::{
1919
nonblocking::{
@@ -67,6 +67,12 @@ where
6767
)));
6868

6969
if config.oracle.subscriber_enabled {
70+
let number_of_workers = 100;
71+
let channel_size = 1000;
72+
let (sender, receiver) = tokio::sync::mpsc::channel(channel_size);
73+
let max_elapsed_time = Duration::from_secs(30);
74+
let sleep_time = Duration::from_secs(1);
75+
7076
handles.push(tokio::spawn(async move {
7177
loop {
7278
let current_time = Instant::now();
@@ -75,17 +81,34 @@ where
7581
network,
7682
state.clone(),
7783
key_store.pyth_oracle_program_key,
84+
sender.clone(),
7885
)
7986
.await
8087
{
81-
tracing::error!(err = ?err, "Subscriber exited unexpectedly.");
82-
if current_time.elapsed() < Duration::from_secs(30) {
83-
tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second.");
84-
tokio::time::sleep(Duration::from_secs(1)).await;
88+
tracing::error!(?err, "Subscriber exited unexpectedly");
89+
if current_time.elapsed() < max_elapsed_time {
90+
tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
91+
tokio::time::sleep(sleep_time).await;
8592
}
8693
}
8794
}
8895
}));
96+
97+
let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
98+
for _ in 0..number_of_workers {
99+
let receiver = receiver.clone();
100+
handles.push(tokio::spawn(async move {
101+
loop {
102+
let mut receiver = receiver.lock().await;
103+
if let Some(task) = receiver.recv().await {
104+
drop(receiver);
105+
if let Err(err) = task.await {
106+
tracing::error!(%err, "error running price update");
107+
}
108+
}
109+
}
110+
}));
111+
}
89112
}
90113

91114
handles
@@ -102,6 +125,7 @@ async fn subscriber<S>(
102125
network: Network,
103126
state: Arc<S>,
104127
program_key: Pubkey,
128+
sender: tokio::sync::mpsc::Sender<tokio::task::JoinHandle<()>>,
105129
) -> Result<()>
106130
where
107131
S: Oracle,
@@ -129,14 +153,17 @@ where
129153
Some(account) => {
130154
let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
131155
let state = state.clone();
132-
tokio::spawn(async move {
133-
if let Err(err) =
134-
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
135-
.await
136-
{
137-
tracing::error!(err = ?err, "Failed to handle account update.");
138-
}
139-
});
156+
sender
157+
.send(tokio::spawn(async move {
158+
if let Err(err) =
159+
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
160+
.await
161+
{
162+
tracing::error!(?err, "Failed to handle account update");
163+
}
164+
}))
165+
.await
166+
.context("sending handle_price_account_update task to worker")?;
140167
}
141168

142169
None => {

src/agent/state/oracle.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ where
247247
);
248248

249249
data.price_accounts.insert(*account_key, price_entry);
250+
drop(data);
250251

251252
Prices::update_global_price(
252253
self,
@@ -339,13 +340,16 @@ where
339340
let mut data = self.into().data.write().await;
340341
log_data_diff(&data, &new_data);
341342
*data = new_data;
343+
let data_publisher_permissions = data.publisher_permissions.clone();
344+
let data_publisher_buffer_key = data.publisher_buffer_key;
345+
drop(data);
342346

343347
Exporter::update_on_chain_state(
344348
self,
345349
network,
346350
publish_keypair,
347-
data.publisher_permissions.clone(),
348-
data.publisher_buffer_key,
351+
data_publisher_permissions,
352+
data_publisher_buffer_key,
349353
)
350354
.await?;
351355

0 commit comments

Comments
 (0)