Skip to content

Commit 06ebd7e

Browse files
committed
Using config
1 parent a034ccc commit 06ebd7e

File tree

2 files changed

+41
-6
lines changed

2 files changed

+41
-6
lines changed

src/agent/services/oracle.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ where
6767
)));
6868

6969
if config.oracle.subscriber_enabled {
70-
let number_of_workers = 100;
71-
let channel_size = 1000;
72-
let (sender, receiver) = tokio::sync::mpsc::channel(channel_size);
73-
let max_elapsed_time = Duration::from_secs(30);
74-
let sleep_time = Duration::from_secs(1);
70+
let number_of_workers = config.oracle.handle_price_account_update_worker_poll_size;
71+
let (sender, receiver) =
72+
tokio::sync::mpsc::channel(config.oracle.handle_price_account_update_channel_size);
73+
let min_elapsed_time = config.oracle.subscriber_finished_min_time;
74+
let sleep_time = config.oracle.subscriber_finished_sleep_time;
7575

7676
handles.push(tokio::spawn(async move {
7777
loop {
@@ -86,7 +86,7 @@ where
8686
.await
8787
{
8888
tracing::error!(?err, "Subscriber exited unexpectedly");
89-
if current_time.elapsed() < max_elapsed_time {
89+
if current_time.elapsed() < min_elapsed_time {
9090
tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
9191
tokio::time::sleep(sleep_time).await;
9292
}

src/agent/state/oracle.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ pub struct Data {
137137
pub publisher_buffer_key: Option<Pubkey>,
138138
}
139139

140+
fn default_handle_price_account_update_channel_size() -> usize {
141+
1000
142+
}
143+
144+
fn default_handle_price_account_update_worker_poll_size() -> usize {
145+
50
146+
}
147+
148+
fn default_subscriber_finished_min_time() -> Duration {
149+
Duration::from_secs(30)
150+
}
151+
152+
fn default_subscriber_finished_sleep_time() -> Duration {
153+
Duration::from_secs(1)
154+
}
155+
140156
#[derive(Clone, Serialize, Deserialize, Debug)]
141157
#[serde(default)]
142158
pub struct Config {
@@ -158,6 +174,19 @@ pub struct Config {
158174
/// socket count at bay, the batches are looked up sequentially,
159175
/// trading off overall time it takes to fetch all symbols.
160176
pub max_lookup_batch_size: usize,
177+
178+
/// Number of workers used to wait for the handle_price_account_update
179+
#[serde(default = "default_handle_price_account_update_worker_poll_size")]
180+
pub handle_price_account_update_worker_poll_size: usize,
181+
/// Channel size used to wait for the handle_price_account_update
182+
#[serde(default = "default_handle_price_account_update_channel_size")]
183+
pub handle_price_account_update_channel_size: usize,
184+
/// Minimum time for a subscriber to run
185+
#[serde(default = "default_subscriber_finished_min_time")]
186+
pub subscriber_finished_min_time: Duration,
187+
/// Time to sleep if the subscriber do not run for more than the minimum time
188+
#[serde(default = "default_subscriber_finished_sleep_time")]
189+
pub subscriber_finished_sleep_time: Duration,
161190
}
162191

163192
impl Default for Config {
@@ -169,6 +198,12 @@ impl Default for Config {
169198
updates_channel_capacity: 10000,
170199
data_channel_capacity: 10000,
171200
max_lookup_batch_size: 100,
201+
handle_price_account_update_worker_poll_size:
202+
default_handle_price_account_update_worker_poll_size(),
203+
handle_price_account_update_channel_size:
204+
default_handle_price_account_update_channel_size(),
205+
subscriber_finished_min_time: default_subscriber_finished_min_time(),
206+
subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
172207
}
173208
}
174209
}

0 commit comments

Comments
 (0)