Skip to content

Commit 33131b1

Browse files
committed
feat: graceful shutdown
1 parent 70f87b6 commit 33131b1

File tree

6 files changed

+74
-8
lines changed

6 files changed

+74
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ features = ["derive"]
4848
version = "0.3.18"
4949
features = ["env-filter"]
5050

51+
[dependencies.tokio]
52+
version = "1.37.0"
53+
5154
[dependencies.tonic]
5255
version = "0.11.0"
5356

src/common/src/kill_signals.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use tokio::signal;
2+
use tracing::info;
3+
4+
pub async fn wait_for_kill_signals() {
5+
let ctrl_c = async {
6+
signal::ctrl_c()
7+
.await
8+
.expect("Failed to install Ctrl+C handler");
9+
};
10+
11+
#[cfg(unix)]
12+
let terminate = async {
13+
signal::unix::signal(signal::unix::SignalKind::terminate())
14+
.expect("Failed to install signal handler")
15+
.recv()
16+
.await;
17+
};
18+
19+
#[cfg(not(unix))]
20+
let terminate = std::future::pending::<()>();
21+
22+
tokio::select! {
23+
_ = ctrl_c => {
24+
info!("Received ctrl_c!");
25+
},
26+
_ = terminate => {
27+
info!("Received terminate!");
28+
},
29+
}
30+
}

src/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod grpc;
2+
pub mod kill_signals;
23
pub mod loggers;
34
pub mod options;

src/gpt_answer_server/src/main.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
11
use clap::{Parser, Subcommand};
22
use opentelemetry::global;
3+
use tokio::signal;
4+
use tokio::sync::oneshot;
5+
use tokio::sync::oneshot::Receiver;
36
use tonic::transport::Server;
7+
use tracing::info;
48

59
use common::grpc::gpt_answer::gpt_answer::gpt_answer_service_server::GptAnswerServiceServer;
10+
use common::kill_signals;
611
use common::loggers::telemetry::init_telemetry;
712
use common::options::parse_options;
813
use gpt_answer_server::controllers::gpt_answer::GptAnswerServiceImpl;
914
use gpt_answer_server::options::Options;
1015

11-
pub async fn serve(options: Options) {
16+
pub async fn serve(options: Options, rx: Receiver<()>) {
1217
let address = options.server_endpoint.parse().unwrap();
1318
println!("Starting GPT Answer server at {}", options.server_endpoint);
1419

1520
let gpt_answer_service = GptAnswerServiceImpl::new("dummy_prop".to_string());
1621
Server::builder()
1722
.add_service(GptAnswerServiceServer::new(gpt_answer_service))
18-
.serve(address)
23+
.serve_with_shutdown(address, async {
24+
rx.await.ok();
25+
info!("GRPC server shut down");
26+
})
1927
.await
2028
.unwrap();
2129
}
@@ -47,11 +55,17 @@ async fn main() {
4755
options.log.level.as_str(),
4856
);
4957

50-
let server = tokio::spawn(serve(options));
58+
let (tx, rx) = oneshot::channel();
59+
let server = tokio::spawn(serve(options, rx));
5160

52-
tokio::try_join!(server).expect("Failed to run servers");
61+
kill_signals::wait_for_kill_signals().await;
62+
63+
// Send the shutdown signal
64+
let _ = tx.send(());
65+
tokio::try_join!(server).expect("Failed to run server");
5366

5467
global::shutdown_tracer_provider();
68+
info!("Shutdown successfully!");
5569
}
5670

5771
/// GPT Answer GRPC server.

src/public/src/main.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ use clap::{Parser, Subcommand};
1212
use deadpool_diesel::postgres::Pool;
1313
use deadpool_diesel::{Manager, Runtime};
1414
use opentelemetry::global;
15+
use tokio::signal;
16+
use tokio::sync::oneshot;
17+
use tokio::sync::oneshot::Receiver;
1518
use tracing::info;
1619

1720
use adapter::repositories::grpc::gpt_answer_client::GptAnswerClient;
1821
use adapter::repositories::in_memory::question::QuestionInMemoryRepository;
1922
use adapter::repositories::postgres::question_db::QuestionDBRepository;
2023
use cli::options::Options;
2124
use cli::router::Router;
25+
use common::kill_signals;
2226
use common::loggers::telemetry::init_telemetry;
2327
use common::options::parse_options;
2428
use rust_core::ports::question::QuestionPort;
@@ -50,10 +54,19 @@ async fn main() {
5054
options.log.level.as_str(),
5155
);
5256

53-
let server = tokio::spawn(serve(options));
54-
tokio::try_join!(server).expect("Failed to run servers");
57+
let (tx, rx) = oneshot::channel();
58+
let server = tokio::spawn(serve(options, rx));
59+
60+
kill_signals::wait_for_kill_signals().await;
61+
62+
// Send the shutdown signal
63+
let _ = tx.send(());
64+
65+
// Wait for the server to finish shutting down
66+
tokio::try_join!(server).expect("Failed to run server");
5567

5668
global::shutdown_tracer_provider();
69+
info!("Shutdown successfully!");
5770
}
5871

5972
/// Simple REST server.
@@ -76,7 +89,7 @@ enum Commands {
7689
Config,
7790
}
7891

79-
pub async fn serve(options: Options) {
92+
pub async fn serve(options: Options, rx: Receiver<()>) {
8093
let question_port: Arc<dyn QuestionPort + Send + Sync> = if options.db.in_memory.is_some() {
8194
info!("Using in-memory database");
8295
Arc::new(QuestionInMemoryRepository::new())
@@ -103,6 +116,10 @@ pub async fn serve(options: Options) {
103116
Ipv4Addr::from_str(options.server.url.as_str()).unwrap(),
104117
options.server.port,
105118
);
119+
let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(address, async {
120+
rx.await.ok();
121+
info!("Warp server shut down");
122+
});
106123

107-
warp::serve(routes).run(address).await
124+
server.await;
108125
}

0 commit comments

Comments
 (0)