Skip to content

Commit 0644cc9

Browse files
committed
sqlx-postgres: Cleanup and add more comments
1 parent b349cd7 commit 0644cc9

File tree

3 files changed

+23
-10
lines changed

3 files changed

+23
-10
lines changed

sqlx-core/src/net/socket/buffered.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl<S: Socket> BufferedSocket<S> {
7070
F: FnMut(&mut BytesMut) -> Result<ControlFlow<R, usize>, Error>,
7171
{
7272
loop {
73-
// Read if we want bytes
73+
// Ensure we have enough bytes, only read if we want bytes.
7474
ready!(self.poll_handle_read(cx)?);
7575

7676
match try_read(&mut self.read_buf.read)? {
@@ -149,7 +149,7 @@ impl<S: Socket> BufferedSocket<S> {
149149
pub async fn flush(&mut self) -> io::Result<()> {
150150
while !self.write_buf.is_empty() {
151151
let written = self.socket.write(self.write_buf.get()).await?;
152-
// Consume does the sanity check
152+
// Consume does the sanity check.
153153
self.write_buf.consume(written);
154154
}
155155

@@ -401,7 +401,7 @@ impl<S: Socket> Sink<&[u8]> for BufferedSocket<S> {
401401

402402
while !this.write_buf.is_empty() {
403403
let written = ready!(this.socket.poll_write(cx, this.write_buf.get())?);
404-
// Consume does the sanity check
404+
// Consume does the sanity check.
405405
this.write_buf.consume(written);
406406
}
407407
this.socket.poll_flush(cx).map_err(Into::into)

sqlx-postgres/src/connection/establish.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ impl PgConnection {
4949
params.push(("options", options));
5050
}
5151

52+
// Only after establishing a connection, Postgres sends a [ReadyForQuery] response. While
53+
// establishing a connection this pipe is used to read responses from.
5254
let mut pipe = conn.pipe(|buf| {
5355
buf.write(Startup {
5456
username: Some(&options.username),

sqlx-postgres/src/connection/worker.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl Worker {
9595
Poll::Ready(Some(request)) => Poll::Ready(request),
9696
Poll::Ready(None) => {
9797
// Channel was closed, explicitly or because the sender was dropped. Either way
98-
// we should start a gracefull shutdown.
98+
// we should start a graceful shutdown.
9999
self.socket
100100
.write_buffer_mut()
101101
.put_slice(&[Terminate::FORMAT as u8, 0, 0, 0, 4]);
@@ -151,6 +151,8 @@ impl Worker {
151151
while let Poll::Ready(response) = self.poll_next_message(cx)? {
152152
match response.format {
153153
BackendMessageFormat::ReadyForQuery => {
154+
// Cloning a `ReceivedMessage` here is cheap because it only clones the
155+
// underlying `Bytes`
154156
let rfq: ReadyForQuery = response.clone().decode()?;
155157
self.shared.set_transaction_status(rfq.transaction_status);
156158

@@ -187,6 +189,7 @@ impl Worker {
187189
}
188190

189191
if self.state != WorkerState::Open && self.back_log.is_empty() {
192+
// After the connection is closed and the backlog is empty we can close the socket.
190193
self.state = WorkerState::Closed;
191194
}
192195
Ok(())
@@ -232,18 +235,15 @@ impl Worker {
232235
#[inline(always)]
233236
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
234237
if self.state == WorkerState::Closed {
235-
// The buffer is closed, a [Terminate] message has been sent, now try and close the socket.
238+
// The channel is closed, all requests are flushed and a [Terminate] message has been
239+
// sent, now try and close the socket
236240
self.socket.poll_close_unpin(cx)
237241
} else {
238242
Poll::Pending
239243
}
240244
}
241-
}
242-
243-
impl Future for Worker {
244-
type Output = Result<()>;
245245

246-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246+
fn poll_worker(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
247247
// Try to receive responses from the database and handle them.
248248
self.poll_backlog(cx)?;
249249

@@ -260,6 +260,17 @@ impl Future for Worker {
260260
}
261261
}
262262

263+
impl Future for Worker {
264+
type Output = Result<()>;
265+
266+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267+
self.poll_worker(cx).map_err(|e| {
268+
tracing::error!("Background worker stopped with error: {e:?}");
269+
e
270+
})
271+
}
272+
}
273+
263274
#[derive(Clone)]
264275
pub struct Shared(Arc<Mutex<SharedInner>>);
265276

0 commit comments

Comments
 (0)