Skip to content

Commit cac826d

Browse files
committed
add graceful shutdown support
1 parent 8854eb7 commit cac826d

File tree

5 files changed

+79
-10
lines changed

5 files changed

+79
-10
lines changed

Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ tracing = "0.1.16"
3232
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
3333
atty = "0.2.14"
3434
clap = { version = "4.1.8", features = ["derive"] }
35-
tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt"] }
35+
tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt", "signal", "macros"] }
3636
aws-sdk-s3 = "0.26.0"
3737
aws-config = "0.55.1"

src/bin/rla-server.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ extern crate regex;
1414
extern crate rust_log_analyzer as rla;
1515
extern crate serde_json;
1616

17+
use crate::server::QueueItem;
1718
use clap::Parser;
19+
use crossbeam::channel::Sender;
1820
use rla::index::IndexStorage;
1921
use std::process;
2022
use std::sync::Arc;
@@ -91,7 +93,10 @@ fn main() {
9193

9294
let (queue_send, queue_recv) = crossbeam::channel::unbounded();
9395

94-
let service = Arc::new(server::RlaService::new(args.webhook_verify, queue_send)?);
96+
let service = Arc::new(server::RlaService::new(
97+
args.webhook_verify,
98+
queue_send.clone(),
99+
)?);
95100

96101
let mut worker = server::Worker::new(
97102
args.index_file,
@@ -103,7 +108,7 @@ fn main() {
103108
args.query_builds_from_primary_repo,
104109
)?;
105110

106-
thread::spawn(move || {
111+
let worker_thread = thread::spawn(move || {
107112
if let Err(e) = worker.main() {
108113
error!("Worker failed, exiting: {}", e);
109114
process::exit(1);
@@ -126,9 +131,34 @@ fn main() {
126131
}))
127132
}
128133
}))
134+
.with_graceful_shutdown(graceful_shutdown(queue_send))
129135
.await
130136
})?;
131137

138+
worker_thread.join().expect("worker thread failed");
139+
132140
Ok(())
133141
});
134142
}
143+
144+
async fn graceful_shutdown(sender: Sender<QueueItem>) {
145+
let ctrl_c = tokio::signal::ctrl_c();
146+
147+
// ECS uses SIGTERM to signal graceful shutdown must begin.
148+
#[cfg(unix)]
149+
let mut sigterm =
150+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
151+
#[cfg(unix)]
152+
let sigterm = sigterm.recv();
153+
154+
#[cfg(not(unix))]
155+
let sigterm = std::future::pending(); // Never resolves
156+
157+
tokio::select! {
158+
_ = ctrl_c => {}
159+
_ = sigterm => {}
160+
};
161+
162+
info!("graceful shutdown signal received");
163+
let _ = sender.send(QueueItem::GracefulShutdown);
164+
}

src/bin/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub enum QueueItem {
1919
payload: rla::github::PullRequestEvent,
2020
delivery_id: String,
2121
},
22+
GracefulShutdown,
2223
}
2324

2425
impl QueueItem {
@@ -27,6 +28,7 @@ impl QueueItem {
2728
QueueItem::GitHubStatus { delivery_id, .. } => Some(&delivery_id),
2829
QueueItem::GitHubCheckRun { delivery_id, .. } => Some(&delivery_id),
2930
QueueItem::GitHubPullRequest { delivery_id, .. } => Some(&delivery_id),
31+
QueueItem::GracefulShutdown => None,
3032
}
3133
}
3234
}

src/bin/server/worker.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ impl Worker {
8484
let _enter = span.enter();
8585

8686
match self.process(item, &span) {
87-
Ok(()) => (),
87+
Ok(ProcessOutcome::Continue) => (),
88+
Ok(ProcessOutcome::Exit) => return Ok(()),
8889
Err(e) => error!("Processing queue item failed: {}", e),
8990
}
9091
}
@@ -97,7 +98,7 @@ impl Worker {
9798
self.secondary_repos.iter().find(|r| *r == repo).is_some()
9899
}
99100

100-
fn process(&mut self, item: QueueItem, span: &tracing::Span) -> rla::Result<()> {
101+
fn process(&mut self, item: QueueItem, span: &tracing::Span) -> rla::Result<ProcessOutcome> {
101102
let (repo, build_id, outcome) = match &item {
102103
QueueItem::GitHubStatus { payload, .. } => {
103104
match self.ci.build_id_from_github_status(&payload) {
@@ -109,7 +110,7 @@ impl Worker {
109110
"Ignoring invalid event (ctx: {:?}, url: {:?}).",
110111
payload.context, payload.target_url
111112
);
112-
return Ok(());
113+
return Ok(ProcessOutcome::Continue);
113114
}
114115
}
115116
}
@@ -125,11 +126,20 @@ impl Worker {
125126
"Ignoring invalid event (app id: {:?}, url: {:?}).",
126127
payload.check_run.app.id, payload.check_run.details_url
127128
);
128-
return Ok(());
129+
return Ok(ProcessOutcome::Continue);
129130
}
130131
}
131132
}
132-
QueueItem::GitHubPullRequest { payload, .. } => return self.process_pr(payload),
133+
QueueItem::GitHubPullRequest { payload, .. } => {
134+
self.process_pr(payload)?;
135+
return Ok(ProcessOutcome::Continue);
136+
}
137+
138+
QueueItem::GracefulShutdown => {
139+
info!("persisting the index to disk before shutting down");
140+
self.index.save(&self.index_file)?;
141+
return Ok(ProcessOutcome::Exit);
142+
}
133143
};
134144

135145
span.record("build_id", &build_id);
@@ -154,7 +164,7 @@ impl Worker {
154164

155165
if !outcome.is_finished() {
156166
info!("ignoring in-progress build");
157-
return Ok(());
167+
return Ok(ProcessOutcome::Continue);
158168
}
159169

160170
// Avoid processing the same build multiple times.
@@ -168,7 +178,7 @@ impl Worker {
168178
info!("did not learn as it's not an auto build");
169179
}
170180

171-
Ok(())
181+
Ok(ProcessOutcome::Continue)
172182
}
173183

174184
fn report_failed(&mut self, build_id: u64, build: &dyn rla::ci::Build) -> rla::Result<()> {
@@ -397,6 +407,11 @@ impl<T: Clone + Eq + Hash> RecentlySeen<T> {
397407
}
398408
}
399409

410+
enum ProcessOutcome {
411+
Continue,
412+
Exit,
413+
}
414+
400415
#[cfg(test)]
401416
mod tests {
402417
use super::*;

0 commit comments

Comments
 (0)