Skip to content

Commit 6d806f6

Browse files
committed
graceful shutdown
1 parent e9a85ef commit 6d806f6

File tree

17 files changed

+657
-310
lines changed

17 files changed

+657
-310
lines changed

.github/workflows/coverage-twmq.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353

5454
# Run coverage with tarpaulin
5555
- name: Run coverage
56-
run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*"
56+
run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"
5757

5858
# Upload coverage to Codecov
5959
# TODO: Uncomment once we have open-sourced the repo

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/configuration/server_base.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ redis:
1515

1616
queue:
1717
webhook_workers: 100
18-
erc4337_send_workers: 100
19-
erc4337_confirm_workers: 100
18+
external_bundler_send_workers: 100
19+
userop_confirm_workers: 100
2020
local_concurrency: 100
2121
polling_interval_ms: 100
2222
lease_duration_seconds: 600

server/src/config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ pub struct EngineConfig {
1414
#[derive(Debug, Clone, Deserialize)]
1515
pub struct QueueConfig {
1616
pub webhook_workers: usize,
17-
pub erc4337_send_workers: usize,
18-
pub erc4337_confirm_workers: usize,
17+
18+
pub external_bundler_send_workers: usize,
19+
pub userop_confirm_workers: usize,
20+
21+
pub execution_namespace: Option<String>,
22+
1923
pub local_concurrency: usize,
2024
pub polling_interval_ms: u64,
2125
pub lease_duration_seconds: u64,

server/src/main.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
4646

4747
// Start queue workers
4848
tracing::info!("Starting queue workers...");
49-
queue_manager.start_workers(&config.queue).await?;
49+
let all_workers = queue_manager.start_workers(&config.queue);
5050

5151
let abi_service = ThirdwebAbiServiceBuilder::new(
5252
&config.thirdweb.urls.abi_service,
@@ -59,8 +59,8 @@ async fn main() -> anyhow::Result<()> {
5959
abi_service: Arc::new(abi_service),
6060
chains,
6161
webhook_queue: queue_manager.webhook_queue.clone(),
62-
erc4337_send_queue: queue_manager.erc4337_send_queue.clone(),
63-
erc4337_confirm_queue: queue_manager.erc4337_confirm_queue.clone(),
62+
erc4337_send_queue: queue_manager.external_bundler_send_queue.clone(),
63+
erc4337_confirm_queue: queue_manager.userop_confirm_queue.clone(),
6464
})
6565
.await;
6666

@@ -84,5 +84,11 @@ async fn main() -> anyhow::Result<()> {
8484
tracing::info!("All servers shut down successfully");
8585
}
8686

87+
if let Err(e) = all_workers.shutdown().await {
88+
tracing::error!("Error during coordinated shutdown: {}", e);
89+
} else {
90+
tracing::info!("All workers shut down successfully");
91+
}
92+
8793
Ok(())
8894
}

server/src/queue/manager.rs

Lines changed: 97 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use engine_executors::{
1111
},
1212
webhook::{WebhookJobHandler, WebhookRetryConfig},
1313
};
14-
use twmq::{Queue, queue::QueueOptions};
14+
use twmq::{Queue, queue::QueueOptions, shutdown::ShutdownHandle};
1515

1616
use crate::{
1717
chains::ThirdwebChainService,
@@ -20,10 +20,21 @@ use crate::{
2020

2121
pub struct QueueManager {
2222
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
23-
pub erc4337_send_queue: Arc<Queue<ExternalBundlerSendHandler<ThirdwebChainService>>>,
24-
pub erc4337_confirm_queue: Arc<Queue<UserOpConfirmationHandler<ThirdwebChainService>>>,
23+
pub external_bundler_send_queue: Arc<Queue<ExternalBundlerSendHandler<ThirdwebChainService>>>,
24+
pub userop_confirm_queue: Arc<Queue<UserOpConfirmationHandler<ThirdwebChainService>>>,
2525
}
2626

27+
fn get_queue_name_for_namespace(namespace: &Option<String>, name: &str) -> String {
28+
match namespace {
29+
Some(namespace) => format!("{}_{}", namespace, name),
30+
None => name.to_owned(),
31+
}
32+
}
33+
34+
const EXTERNAL_BUNDLER_SEND_QUEUE_NAME: &str = "external_bundler_send";
35+
const USEROP_CONFIRM_QUEUE_NAME: &str = "userop_confirm";
36+
const WEBHOOK_QUEUE_NAME: &str = "webhook";
37+
2738
impl QueueManager {
2839
pub async fn new(
2940
redis_config: &RedisConfig,
@@ -49,12 +60,13 @@ impl QueueManager {
4960
};
5061

5162
let mut external_bundler_send_queue_opts = base_queue_opts.clone();
52-
let mut external_bundler_confirm_queue_opts = base_queue_opts.clone();
53-
let mut webhook_queue_opts = base_queue_opts.clone();
63+
external_bundler_send_queue_opts.local_concurrency =
64+
queue_config.external_bundler_send_workers;
65+
66+
let mut userop_confirm_queue_opts = base_queue_opts.clone();
67+
userop_confirm_queue_opts.local_concurrency = queue_config.userop_confirm_workers;
5468

55-
external_bundler_send_queue_opts.local_concurrency = queue_config.erc4337_send_workers;
56-
external_bundler_confirm_queue_opts.local_concurrency =
57-
queue_config.erc4337_confirm_workers;
69+
let mut webhook_queue_opts = base_queue_opts.clone();
5870
webhook_queue_opts.local_concurrency = queue_config.webhook_workers;
5971

6072
// Create webhook queue
@@ -63,32 +75,43 @@ impl QueueManager {
6375
retry_config: Arc::new(WebhookRetryConfig::default()),
6476
};
6577

66-
let webhook_queue = Arc::new(
67-
Queue::new(
68-
&redis_config.url,
69-
"webhook",
70-
Some(webhook_queue_opts),
71-
webhook_handler,
72-
)
73-
.await?,
78+
let webhook_queue_name =
79+
get_queue_name_for_namespace(&queue_config.execution_namespace, WEBHOOK_QUEUE_NAME);
80+
81+
let external_bundler_send_queue_name = get_queue_name_for_namespace(
82+
&queue_config.execution_namespace,
83+
EXTERNAL_BUNDLER_SEND_QUEUE_NAME,
84+
);
85+
86+
let userop_confirm_queue_name = get_queue_name_for_namespace(
87+
&queue_config.execution_namespace,
88+
USEROP_CONFIRM_QUEUE_NAME,
7489
);
7590

91+
let webhook_queue = Queue::builder()
92+
.name(webhook_queue_name)
93+
.options(webhook_queue_opts)
94+
.handler(webhook_handler)
95+
.redis_client(redis_client.clone())
96+
.build()
97+
.await?
98+
.arc();
99+
76100
// Create confirmation queue first (needed by send queue)
77101
let confirm_handler = UserOpConfirmationHandler::new(
78102
chain_service.clone(),
79103
deployment_lock.clone(),
80104
webhook_queue.clone(),
81105
);
82106

83-
let erc4337_confirm_queue = Arc::new(
84-
Queue::new(
85-
&redis_config.url,
86-
"erc4337_confirm",
87-
Some(external_bundler_confirm_queue_opts),
88-
confirm_handler,
89-
)
90-
.await?,
91-
);
107+
let userop_confirm_queue = Queue::builder()
108+
.name(userop_confirm_queue_name)
109+
.options(userop_confirm_queue_opts)
110+
.handler(confirm_handler)
111+
.redis_client(redis_client.clone())
112+
.build()
113+
.await?
114+
.arc();
92115

93116
// Create send queue
94117
let send_handler = ExternalBundlerSendHandler {
@@ -97,56 +120,51 @@ impl QueueManager {
97120
deployment_cache,
98121
deployment_lock,
99122
webhook_queue: webhook_queue.clone(),
100-
confirm_queue: erc4337_confirm_queue.clone(),
123+
confirm_queue: userop_confirm_queue.clone(),
101124
};
102125

103-
let erc4337_send_queue = Arc::new(
104-
Queue::new(
105-
&redis_config.url,
106-
"erc4337_send",
107-
Some(external_bundler_send_queue_opts),
108-
send_handler,
109-
)
110-
.await?,
111-
);
126+
let external_bundler_send_queue = Queue::builder()
127+
.name(external_bundler_send_queue_name)
128+
.options(external_bundler_send_queue_opts)
129+
.handler(send_handler)
130+
.redis_client(redis_client.clone())
131+
.build()
132+
.await?
133+
.arc();
112134

113135
Ok(Self {
114136
webhook_queue,
115-
erc4337_send_queue,
116-
erc4337_confirm_queue,
137+
external_bundler_send_queue,
138+
userop_confirm_queue,
117139
})
118140
}
119141

120142
/// Start all workers
121-
pub async fn start_workers(&self, queue_config: &QueueConfig) -> Result<(), EngineError> {
143+
pub fn start_workers(&self, queue_config: &QueueConfig) -> ShutdownHandle {
122144
tracing::info!("Starting queue workers...");
123145

124146
// Start webhook workers
125147
tracing::info!("Starting webhook worker");
126-
if let Err(e) = self.webhook_queue.clone().work().await {
127-
tracing::error!("Webhook worker failed: {}", e);
128-
}
148+
let webhook_worker = self.webhook_queue.work();
129149

130150
// Start ERC-4337 send workers
131151
tracing::info!("Starting external bundler send worker");
132-
if let Err(e) = self.erc4337_send_queue.clone().work().await {
133-
tracing::error!("Webhook worker failed: {}", e);
134-
}
152+
let external_bundler_send_worker = self.external_bundler_send_queue.work();
135153

136154
// Start ERC-4337 confirmation workers
137155
tracing::info!("Starting external bundler confirmation worker");
138-
if let Err(e) = self.erc4337_confirm_queue.clone().work().await {
139-
tracing::error!("Webhook worker failed: {}", e);
140-
}
156+
let userop_confirm_worker = self.userop_confirm_queue.work();
141157

142158
tracing::info!(
143159
"Started {} webhook workers, {} send workers, {} confirm workers",
144160
queue_config.webhook_workers,
145-
queue_config.erc4337_send_workers,
146-
queue_config.erc4337_confirm_workers
161+
queue_config.external_bundler_send_workers,
162+
queue_config.userop_confirm_workers
147163
);
148164

149-
Ok(())
165+
ShutdownHandle::with_worker(webhook_worker)
166+
.and_worker(external_bundler_send_worker)
167+
.and_worker(userop_confirm_worker)
150168
}
151169

152170
/// Get queue statistics for monitoring
@@ -162,34 +180,49 @@ impl QueueManager {
162180
};
163181

164182
let send_stats = QueueStatistics {
165-
pending: self.erc4337_send_queue.count(JobStatus::Pending).await?,
166-
active: self.erc4337_send_queue.count(JobStatus::Active).await?,
167-
delayed: self.erc4337_send_queue.count(JobStatus::Delayed).await?,
168-
success: self.erc4337_send_queue.count(JobStatus::Success).await?,
169-
failed: self.erc4337_send_queue.count(JobStatus::Failed).await?,
183+
pending: self
184+
.external_bundler_send_queue
185+
.count(JobStatus::Pending)
186+
.await?,
187+
active: self
188+
.external_bundler_send_queue
189+
.count(JobStatus::Active)
190+
.await?,
191+
delayed: self
192+
.external_bundler_send_queue
193+
.count(JobStatus::Delayed)
194+
.await?,
195+
success: self
196+
.external_bundler_send_queue
197+
.count(JobStatus::Success)
198+
.await?,
199+
failed: self
200+
.external_bundler_send_queue
201+
.count(JobStatus::Failed)
202+
.await?,
170203
};
171204

172205
let confirm_stats = QueueStatistics {
173-
pending: self.erc4337_confirm_queue.count(JobStatus::Pending).await?,
174-
active: self.erc4337_confirm_queue.count(JobStatus::Active).await?,
175-
delayed: self.erc4337_confirm_queue.count(JobStatus::Delayed).await?,
176-
success: self.erc4337_confirm_queue.count(JobStatus::Success).await?,
177-
failed: self.erc4337_confirm_queue.count(JobStatus::Failed).await?,
206+
pending: self.userop_confirm_queue.count(JobStatus::Pending).await?,
207+
active: self.userop_confirm_queue.count(JobStatus::Active).await?,
208+
delayed: self.userop_confirm_queue.count(JobStatus::Delayed).await?,
209+
success: self.userop_confirm_queue.count(JobStatus::Success).await?,
210+
failed: self.userop_confirm_queue.count(JobStatus::Failed).await?,
178211
};
179212

180213
Ok(QueueStats {
181214
webhook: webhook_stats,
182-
erc4337_send: send_stats,
183-
erc4337_confirm: confirm_stats,
215+
external_bundler_send: send_stats,
216+
userop_confirm: confirm_stats,
184217
})
185218
}
186219
}
187220

188221
#[derive(Debug, serde::Serialize)]
189222
pub struct QueueStats {
190223
pub webhook: QueueStatistics,
191-
pub erc4337_send: QueueStatistics,
192-
pub erc4337_confirm: QueueStatistics,
224+
pub external_bundler_send: QueueStatistics,
225+
pub userop_confirm: QueueStatistics,
193226
}
194227

195228
#[derive(Debug, serde::Serialize)]

twmq/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ tokio = "1.45.0"
1212
thiserror = "2.0.12"
1313
tracing = "0.1.41"
1414
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt"] }
15+
futures = "0.3.31"
1516

1617
[dev-dependencies]
1718
tokio = { version = "1.45.0", features = ["full"] }

twmq/benches/throughput.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,7 @@ async fn load_test_throughput(
201201
}
202202

203203
// Start workers
204-
let worker_handle = {
205-
let queue = queue.clone();
206-
tokio::spawn(async move {
207-
let _ = queue.work().await;
208-
})
209-
};
204+
let worker_handle = queue.work();
210205

211206
// Job producer task
212207
let producer_handle = {
@@ -293,7 +288,7 @@ async fn load_test_throughput(
293288
println!(" Sustainable: {}", is_sustainable);
294289

295290
// Cleanup
296-
worker_handle.abort();
291+
worker_handle.shutdown().await.unwrap();
297292
let _: () = redis::cmd("DEL")
298293
.arg(format!("{}:*", queue_name))
299294
.query_async(&mut redis_conn)

twmq/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,10 @@ pub enum TwmqError {
55

66
#[error("JSON Serialization error: {0}")]
77
JsonError(#[from] serde_json::Error),
8+
9+
#[error("Runtime error: {0}")]
10+
Runtime(String),
11+
12+
#[error("Worker panic: {0}")]
13+
WorkerPanic(String),
814
}

0 commit comments

Comments
 (0)