Skip to content

Commit 4aa9c3d

Browse files
authored
Cleaner shutdown (#12)
* Cleaner shutdown * mark as bad just in case although im pretty sure we dont need it * server session duration * test clean shutdown * ah
1 parent 20ceb72 commit 4aa9c3d

File tree

4 files changed

+97
-53
lines changed

4 files changed

+97
-53
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
key: cargo-lock-2-{{ checksum "Cargo.lock" }}
2626
- run:
2727
name: "Install dependencies"
28-
command: "sudo apt-get update && sudo apt-get install -y postgresql-contrib-12 postgresql-client-12"
28+
command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12"
2929
- run:
3030
name: "Build"
3131
command: "cargo build"
@@ -47,4 +47,4 @@ jobs:
4747
workflows:
4848
build:
4949
jobs:
50-
- build
50+
- build

.circleci/run_tests.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /
2626

2727
# Replica/primary selection & more sharding tests
2828
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
29+
30+
# Attempt clean shut down
31+
killall pgcat -s SIGINT

src/main.rs

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern crate toml;
2626

2727
use regex::Regex;
2828
use tokio::net::TcpListener;
29+
use tokio::signal;
2930

3031
use std::collections::HashMap;
3132
use std::sync::{Arc, Mutex};
@@ -108,61 +109,72 @@ async fn main() {
108109

109110
println!("> Waiting for clients...");
110111

111-
loop {
112-
let pool = pool.clone();
113-
let client_server_map = client_server_map.clone();
114-
let server_info = server_info.clone();
115-
116-
let (socket, addr) = match listener.accept().await {
117-
Ok((socket, addr)) => (socket, addr),
118-
Err(err) => {
119-
println!("> Listener: {:?}", err);
120-
continue;
121-
}
122-
};
123-
124-
// Client goes to another thread, bye.
125-
tokio::task::spawn(async move {
126-
let start = chrono::offset::Utc::now().naive_utc();
127-
128-
println!(">> Client {:?} connected", addr);
129-
130-
match client::Client::startup(
131-
socket,
132-
client_server_map,
133-
transaction_mode,
134-
default_server_role,
135-
server_info,
136-
)
137-
.await
138-
{
139-
Ok(mut client) => {
140-
println!(">> Client {:?} authenticated successfully!", addr);
141-
142-
match client.handle(pool).await {
143-
Ok(()) => {
144-
let duration = chrono::offset::Utc::now().naive_utc() - start;
145-
146-
println!(
147-
">> Client {:?} disconnected, session duration: {}",
148-
addr,
149-
format_duration(&duration)
150-
);
151-
}
152-
153-
Err(err) => {
154-
println!(">> Client disconnected with error: {:?}", err);
155-
client.release();
156-
}
157-
}
158-
}
112+
// Main app runs here.
113+
tokio::task::spawn(async move {
114+
loop {
115+
let pool = pool.clone();
116+
let client_server_map = client_server_map.clone();
117+
let server_info = server_info.clone();
159118

119+
let (socket, addr) = match listener.accept().await {
120+
Ok((socket, addr)) => (socket, addr),
160121
Err(err) => {
161-
println!(">> Error: {:?}", err);
122+
println!("> Listener: {:?}", err);
123+
continue;
162124
}
163125
};
164-
});
165-
}
126+
127+
// Client goes to another thread, bye.
128+
tokio::task::spawn(async move {
129+
let start = chrono::offset::Utc::now().naive_utc();
130+
131+
println!(">> Client {:?} connected", addr);
132+
133+
match client::Client::startup(
134+
socket,
135+
client_server_map,
136+
transaction_mode,
137+
default_server_role,
138+
server_info,
139+
)
140+
.await
141+
{
142+
Ok(mut client) => {
143+
println!(">> Client {:?} authenticated successfully!", addr);
144+
145+
match client.handle(pool).await {
146+
Ok(()) => {
147+
let duration = chrono::offset::Utc::now().naive_utc() - start;
148+
149+
println!(
150+
">> Client {:?} disconnected, session duration: {}",
151+
addr,
152+
format_duration(&duration)
153+
);
154+
}
155+
156+
Err(err) => {
157+
println!(">> Client disconnected with error: {:?}", err);
158+
client.release();
159+
}
160+
}
161+
}
162+
163+
Err(err) => {
164+
println!(">> Error: {:?}", err);
165+
}
166+
};
167+
});
168+
}
169+
});
170+
171+
// Setup shut down sequence
172+
match signal::ctrl_c().await {
173+
Ok(()) => {}
174+
Err(err) => {
175+
eprintln!("Unable to listen for shutdown signal: {}", err);
176+
}
177+
};
166178
}
167179

168180
/// Format chrono::Duration to be more human-friendly.

src/server.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ pub struct Server {
4949
// Mapping of clients and servers used for query cancellation.
5050
client_server_map: ClientServerMap,
5151

52+
// Server role, e.g. primary or replica.
5253
role: Role,
54+
55+
// Server connected at
56+
connected_at: chrono::naive::NaiveDateTime,
5357
}
5458

5559
impl Server {
@@ -193,6 +197,7 @@ impl Server {
193197
bad: false,
194198
client_server_map: client_server_map,
195199
role: role,
200+
connected_at: chrono::offset::Utc::now().naive_utc(),
196201
});
197202
}
198203

@@ -417,3 +422,27 @@ impl Server {
417422
}
418423
}
419424
}
425+
426+
impl Drop for Server {
427+
// Try to do a clean shut down.
428+
fn drop(&mut self) {
429+
let mut bytes = BytesMut::with_capacity(4);
430+
bytes.put_u8(b'X');
431+
bytes.put_i32(4);
432+
433+
match self.write.try_write(&bytes) {
434+
Ok(n) => (),
435+
Err(_) => (),
436+
};
437+
438+
self.bad = true;
439+
440+
let now = chrono::offset::Utc::now().naive_utc();
441+
let duration = now - self.connected_at;
442+
443+
println!(
444+
">> Server connection closed, session duration: {}",
445+
crate::format_duration(&duration)
446+
);
447+
}
448+
}

0 commit comments

Comments
 (0)