Skip to content

Commit 1b5db42

Browse files
committed
sqlx-postgres: Update graceful shutdown
1 parent 0644cc9 commit 1b5db42

File tree

1 file changed

+25
-22
lines changed

1 file changed

+25
-22
lines changed

sqlx-postgres/src/connection/worker.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ use super::{request::IoRequest, tls::MaybeUpgradeTls};
2727

2828
#[derive(PartialEq, Debug)]
2929
enum WorkerState {
30-
// The connection is open and ready for business.
30+
// The connection is open and ready for requests.
3131
Open,
32-
// Sent/sending a [Terminate] message but did not close the socket. Responding to the last
33-
// messages but not receiving new ones.
32+
// Responding to the last messages but not receiving new ones. After handling the last message
33+
// a [Terminate] message is issued.
3434
Closing,
35-
// The connection is terminated, this step closes the socket and stops the background task.
35+
// Last messages are handled, [Terminate] message is sent and the session is closed. Nog try
36+
// and close the socket.
3637
Closed,
3738
}
3839

@@ -86,29 +87,21 @@ impl Worker {
8687
// Tries to receive the next message from the channel. Also handles termination if needed.
8788
#[inline(always)]
8889
fn poll_next_request(&mut self, cx: &mut Context<'_>) -> Poll<IoRequest> {
89-
if self.state != WorkerState::Open {
90-
return Poll::Pending;
91-
}
92-
9390
match self.chan.poll_next_unpin(cx) {
9491
Poll::Pending => Poll::Pending,
9592
Poll::Ready(Some(request)) => Poll::Ready(request),
9693
Poll::Ready(None) => {
9794
// Channel was closed, explicitly or because the sender was dropped. Either way
9895
// we should start a graceful shutdown.
99-
self.socket
100-
.write_buffer_mut()
101-
.put_slice(&[Terminate::FORMAT as u8, 0, 0, 0, 4]);
102-
10396
self.state = WorkerState::Closing;
104-
self.should_flush = true;
10597
Poll::Pending
10698
}
10799
}
108100
}
109101

110102
#[inline(always)]
111103
fn poll_receiver(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
104+
// Only try and receive io requests if we're open.
112105
if self.state != WorkerState::Open {
113106
return Poll::Ready(Ok(()));
114107
}
@@ -187,16 +180,17 @@ impl Worker {
187180
_ => self.send_back(response)?,
188181
}
189182
}
190-
191-
if self.state != WorkerState::Open && self.back_log.is_empty() {
192-
// After the connection is closed and the backlog is empty we can close the socket.
193-
self.state = WorkerState::Closed;
194-
}
195183
Ok(())
196184
}
197185

198186
#[inline(always)]
199187
fn poll_next_message(&mut self, cx: &mut Context<'_>) -> Poll<Result<ReceivedMessage>> {
188+
if self.state == WorkerState::Closed {
189+
// We're still responsing to the last messages, only after clearing the backlog we
190+
// should stop reading.
191+
return Poll::Pending;
192+
}
193+
200194
self.socket.poll_try_read(cx, |buf| {
201195
// all packets in postgres start with a 5-byte header
202196
// this header contains the message type and the total length of the message
@@ -234,12 +228,21 @@ impl Worker {
234228

235229
#[inline(always)]
236230
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
237-
if self.state == WorkerState::Closed {
231+
match self.state {
232+
// After responding to the last messages we can issue a [Terminate] request and
233+
// close the connection.
234+
WorkerState::Closing if self.back_log.is_empty() => {
235+
let terminate = [Terminate::FORMAT as u8, 0, 0, 0, 4];
236+
self.socket.write_buffer_mut().put_slice(&terminate);
237+
self.state = WorkerState::Closed;
238+
239+
// Closing the socket also flushes the buffer.
240+
self.socket.poll_close_unpin(cx)
241+
}
238242
// The channel is closed, all requests are flushed and a [Terminate] message has been
239243
// sent, now try and close the socket
240-
self.socket.poll_close_unpin(cx)
241-
} else {
242-
Poll::Pending
244+
WorkerState::Closed => self.socket.poll_close_unpin(cx),
245+
WorkerState::Open | WorkerState::Closing => Poll::Pending,
243246
}
244247
}
245248

0 commit comments

Comments
 (0)