@@ -5,12 +5,15 @@ use bytes::{Buf, BufMut, BytesMut};
5
5
use once_cell:: sync:: OnceCell ;
6
6
use regex:: Regex ;
7
7
use tokio:: io:: { AsyncReadExt , BufReader } ;
8
- use tokio:: net:: tcp:: { OwnedReadHalf , OwnedWriteHalf } ;
9
- use tokio:: net:: TcpStream ;
8
+ use tokio:: net:: {
9
+ tcp:: { OwnedReadHalf , OwnedWriteHalf } ,
10
+ TcpStream ,
11
+ } ;
10
12
11
13
use std:: collections:: HashMap ;
12
14
13
15
use crate :: config:: Role ;
16
+ use crate :: constants:: * ;
14
17
use crate :: errors:: Error ;
15
18
use crate :: messages:: * ;
16
19
use crate :: pool:: { ClientServerMap , ConnectionPool } ;
@@ -97,15 +100,15 @@ impl Client {
97
100
98
101
match code {
99
102
// Client wants SSL. We don't support it at the moment.
100
- 80877103 => {
103
+ SSL_REQUEST_CODE => {
101
104
let mut no = BytesMut :: with_capacity ( 1 ) ;
102
105
no. put_u8 ( b'N' ) ;
103
106
104
107
write_all ( & mut stream, no) . await ?;
105
108
}
106
109
107
110
// Regular startup message.
108
- 196608 => {
111
+ PROTOCOL_VERSION_NUMBER => {
109
112
// TODO: perform actual auth.
110
113
let parameters = parse_startup ( bytes. clone ( ) ) ?;
111
114
@@ -138,7 +141,7 @@ impl Client {
138
141
}
139
142
140
143
// Query cancel request.
141
- 80877102 => {
144
+ CANCEL_REQUEST_CODE => {
142
145
let ( read, write) = stream. into_split ( ) ;
143
146
144
147
let process_id = bytes. get_i32 ( ) ;
@@ -168,23 +171,31 @@ impl Client {
168
171
169
172
/// Client loop. We handle all messages between the client and the database here.
170
173
pub async fn handle ( & mut self , mut pool : ConnectionPool ) -> Result < ( ) , Error > {
171
- // Special: cancelling existing running query
174
+ // The client wants to cancel a query it has issued previously.
172
175
if self . cancel_mode {
173
176
let ( process_id, secret_key, address, port) = {
174
177
let guard = self . client_server_map . lock ( ) . unwrap ( ) ;
178
+
175
179
match guard. get ( & ( self . process_id , self . secret_key ) ) {
176
180
// Drop the mutex as soon as possible.
181
+ // We found the server the client is using for its query
182
+ // that it wants to cancel.
177
183
Some ( ( process_id, secret_key, address, port) ) => (
178
184
process_id. clone ( ) ,
179
185
secret_key. clone ( ) ,
180
186
address. clone ( ) ,
181
187
port. clone ( ) ,
182
188
) ,
189
+
190
+ // The client doesn't know / got the wrong server,
191
+ // we're closing the connection for security reasons.
183
192
None => return Ok ( ( ) ) ,
184
193
}
185
194
} ;
186
195
187
- // TODO: pass actual server host and port somewhere.
196
+ // Opens a new separate connection to the server, sends the backend_id
197
+ // and secret_key and then closes it for security reasons. No other interactions
198
+ // take place.
188
199
return Ok ( Server :: cancel ( & address, & port, process_id, secret_key) . await ?) ;
189
200
}
190
201
@@ -217,7 +228,7 @@ impl Client {
217
228
} ;
218
229
219
230
// Parse for special server role selection command.
220
- //
231
+ // SET SERVER ROLE TO '(primary|replica)';
221
232
match self . select_role ( message. clone ( ) ) {
222
233
Some ( r) => {
223
234
custom_protocol_response_ok ( & mut self . write , "SET SERVER ROLE" ) . await ?;
@@ -236,15 +247,17 @@ impl Client {
236
247
}
237
248
} ;
238
249
239
- let mut proxy = connection. 0 ;
250
+ let mut reference = connection. 0 ;
240
251
let _address = connection. 1 ;
241
- let server = & mut * proxy ;
252
+ let server = & mut * reference ;
242
253
243
254
// Claim this server as mine for query cancellation.
244
255
server. claim ( self . process_id , self . secret_key ) ;
245
256
257
+ // Transaction loop. Multiple queries can be issued by the client here.
258
+ // The connection belongs to the client until the transaction is over,
259
+ // or until the client disconnects if we are in session mode.
246
260
loop {
247
- // No messages in the buffer, read one.
248
261
let mut message = if message. len ( ) == 0 {
249
262
match read_message ( & mut self . read ) . await {
250
263
Ok ( message) => message,
@@ -268,19 +281,26 @@ impl Client {
268
281
msg
269
282
} ;
270
283
271
- let original = message. clone ( ) ; // To be forwarded to the server
284
+ // The message will be forwarded to the server intact. We still would like to
285
+ // parse it below to figure out what to do with it.
286
+ let original = message. clone ( ) ;
287
+
272
288
let code = message. get_u8 ( ) as char ;
273
289
let _len = message. get_i32 ( ) as usize ;
274
290
275
291
match code {
292
+ // ReadyForQuery
276
293
'Q' => {
277
294
// TODO: implement retries here for read-only transactions.
278
295
server. send ( original) . await ?;
279
296
297
+ // Read all data the server has to offer, which can be multiple messages
298
+ // buffered in 8196 bytes chunks.
280
299
loop {
281
300
// TODO: implement retries here for read-only transactions.
282
301
let response = server. recv ( ) . await ?;
283
302
303
+ // Send server reply to the client.
284
304
match write_all_half ( & mut self . write , response) . await {
285
305
Ok ( _) => ( ) ,
286
306
Err ( err) => {
@@ -294,15 +314,18 @@ impl Client {
294
314
}
295
315
}
296
316
297
- // Send statistic
317
+ // Report query executed statistics.
298
318
self . stats . query ( ) ;
299
319
300
- // Transaction over
320
+ // The transaction is over, we can release the connection back to the pool.
301
321
if !server. in_transaction ( ) {
322
+ // Report transaction executed statistics.
302
323
self . stats . transaction ( ) ;
303
324
304
- // Release server
325
+ // Release server back to the pool if we are in transaction mode.
326
+ // If we are in session mode, we keep the server until the client disconnects.
305
327
if self . transaction_mode {
328
+ // Report this client as idle.
306
329
self . stats . client_idle ( ) ;
307
330
308
331
shard = None ;
@@ -313,6 +336,7 @@ impl Client {
313
336
}
314
337
}
315
338
339
+ // Terminate
316
340
'X' => {
317
341
// Client closing. Rollback and clean up
318
342
// connection before releasing into the pool.
@@ -326,35 +350,46 @@ impl Client {
326
350
return Ok ( ( ) ) ;
327
351
}
328
352
353
+ // Parse
354
+ // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`.
329
355
'P' => {
330
- // Extended protocol, let's buffer most of it
331
356
self . buffer . put ( & original[ ..] ) ;
332
357
}
333
358
359
+ // Bind
360
+ // The placeholder's replacements are here, e.g. 'user@email.com' and 'true'
334
361
'B' => {
335
362
self . buffer . put ( & original[ ..] ) ;
336
363
}
337
364
338
365
// Describe
366
+ // Command a client can issue to describe a previously prepared named statement.
339
367
'D' => {
340
368
self . buffer . put ( & original[ ..] ) ;
341
369
}
342
370
371
+ // Execute
372
+ // Execute a prepared statement prepared in `P` and bound in `B`.
343
373
'E' => {
344
374
self . buffer . put ( & original[ ..] ) ;
345
375
}
346
376
377
+ // Sync
378
+ // Frontend (client) is asking for the query result now.
347
379
'S' => {
348
- // Extended protocol, client requests sync
349
380
self . buffer . put ( & original[ ..] ) ;
350
381
351
- // TODO: retries for read-only transactions
382
+ // TODO: retries for read-only transactions.
352
383
server. send ( self . buffer . clone ( ) ) . await ?;
384
+
353
385
self . buffer . clear ( ) ;
354
386
387
+ // Read all data the server has to offer, which can be multiple messages
388
+ // buffered in 8196 bytes chunks.
355
389
loop {
356
390
// TODO: retries for read-only transactions
357
391
let response = server. recv ( ) . await ?;
392
+
358
393
match write_all_half ( & mut self . write , response) . await {
359
394
Ok ( _) => ( ) ,
360
395
Err ( err) => {
@@ -368,9 +403,11 @@ impl Client {
368
403
}
369
404
}
370
405
406
+ // Report query executed statistics.
371
407
self . stats . query ( ) ;
372
408
373
- // Release server
409
+ // Release server back to the pool if we are in transaction mode.
410
+ // If we are in session mode, we keep the server until the client disconnects.
374
411
if !server. in_transaction ( ) {
375
412
self . stats . transaction ( ) ;
376
413
@@ -392,10 +429,13 @@ impl Client {
392
429
server. send ( original) . await ?;
393
430
}
394
431
432
+ // CopyDone or CopyFail
433
+ // Copy is done, successfully or not.
395
434
'c' | 'f' => {
396
- // Copy is done.
397
435
server. send ( original) . await ?;
436
+
398
437
let response = server. recv ( ) . await ?;
438
+
399
439
match write_all_half ( & mut self . write , response) . await {
400
440
Ok ( _) => ( ) ,
401
441
Err ( err) => {
@@ -404,24 +444,29 @@ impl Client {
404
444
}
405
445
} ;
406
446
407
- // Release the server
447
+ // Release server back to the pool if we are in transaction mode.
448
+ // If we are in session mode, we keep the server until the client disconnects.
408
449
if !server. in_transaction ( ) {
409
450
self . stats . transaction ( ) ;
410
451
411
452
if self . transaction_mode {
412
453
shard = None ;
413
454
role = self . default_server_role ;
455
+
414
456
break ;
415
457
}
416
458
}
417
459
}
418
460
461
+ // Some unexpected message. We either did not implement the protocol correctly
462
+ // or this is not a Postgres client we're talking to.
419
463
_ => {
420
464
println ! ( ">>> Unexpected code: {}" , code) ;
421
465
}
422
466
}
423
467
}
424
468
469
+ // The server is no longer bound to us, we can't cancel it's queries anymore.
425
470
self . release ( ) ;
426
471
}
427
472
}
@@ -450,18 +495,21 @@ impl Client {
450
495
451
496
let len = buf. get_i32 ( ) ;
452
497
let query = String :: from_utf8_lossy ( & buf[ ..len as usize - 4 - 1 ] ) . to_ascii_uppercase ( ) ; // Don't read the ternminating null
498
+
453
499
let rgx = match SHARDING_REGEX_RE . get ( ) {
454
500
Some ( r) => r,
455
501
None => return None ,
456
502
} ;
457
503
458
504
if rgx. is_match ( & query) {
459
505
let shard = query. split ( "'" ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
506
+
460
507
match shard. parse :: < i64 > ( ) {
461
508
Ok ( shard) => {
462
509
let sharder = Sharder :: new ( shards) ;
463
510
Some ( sharder. pg_bigint_hash ( shard) )
464
511
}
512
+
465
513
Err ( _) => None ,
466
514
}
467
515
} else {
@@ -481,6 +529,7 @@ impl Client {
481
529
482
530
let len = buf. get_i32 ( ) ;
483
531
let query = String :: from_utf8_lossy ( & buf[ ..len as usize - 4 - 1 ] ) . to_ascii_uppercase ( ) ;
532
+
484
533
let rgx = match ROLE_REGEX_RE . get ( ) {
485
534
Some ( r) => r,
486
535
None => return None ,
@@ -490,6 +539,7 @@ impl Client {
490
539
// it'll be time to abstract :).
491
540
if rgx. is_match ( & query) {
492
541
let role = query. split ( "'" ) . collect :: < Vec < & str > > ( ) [ 1 ] ;
542
+
493
543
match role {
494
544
"PRIMARY" => Some ( Role :: Primary ) ,
495
545
"REPLICA" => Some ( Role :: Replica ) ,
0 commit comments