14
14
15
15
use std:: collections:: HashMap ;
16
16
use std:: sync:: Arc ;
17
- use std:: time:: Duration ;
18
- use std:: time:: SystemTime ;
19
17
20
18
use databend_common_base:: base:: GlobalInstance ;
21
19
use databend_common_base:: headers:: HEADER_DEDUPLICATE_LABEL ;
@@ -53,7 +51,6 @@ use opentelemetry::propagation::TextMapPropagator;
53
51
use opentelemetry_sdk:: propagation:: BaggagePropagator ;
54
52
use poem:: error:: ResponseError ;
55
53
use poem:: error:: Result as PoemResult ;
56
- use poem:: web:: cookie:: Cookie ;
57
54
use poem:: web:: Json ;
58
55
use poem:: Addr ;
59
56
use poem:: Endpoint ;
@@ -70,8 +67,7 @@ use crate::clusters::ClusterDiscovery;
70
67
use crate :: servers:: http:: error:: HttpErrorCode ;
71
68
use crate :: servers:: http:: error:: JsonErrorOnly ;
72
69
use crate :: servers:: http:: error:: QueryError ;
73
- use crate :: servers:: http:: v1:: unix_ts;
74
- use crate :: servers:: http:: v1:: ClientSessionManager ;
70
+ use crate :: servers:: http:: middleware:: session_header:: ClientSession ;
75
71
use crate :: servers:: http:: v1:: HttpQueryContext ;
76
72
use crate :: servers:: http:: v1:: SessionClaim ;
77
73
use crate :: servers:: login_history:: LoginEventType ;
@@ -81,9 +77,7 @@ use crate::servers::HttpHandlerKind;
81
77
use crate :: sessions:: SessionManager ;
82
78
const USER_AGENT : & str = "User-Agent" ;
83
79
const TRACE_PARENT : & str = "traceparent" ;
84
- const COOKIE_LAST_REFRESH_TIME : & str = "last_refresh_time" ;
85
- const COOKIE_SESSION_ID : & str = "session_id" ;
86
- const COOKIE_COOKIE_ENABLED : & str = "cookie_enabled" ;
80
+
87
81
#[ derive( Debug , Copy , Clone ) ]
88
82
pub enum EndpointKind {
89
83
Login ,
@@ -352,12 +346,6 @@ pub struct HTTPSessionEndpoint<E> {
352
346
pub auth_manager : Arc < AuthMgr > ,
353
347
}
354
348
355
- fn make_cookie ( name : impl Into < String > , value : impl Into < String > ) -> Cookie {
356
- let mut cookie = Cookie :: new_with_str ( name, value) ;
357
- cookie. set_path ( "/" ) ;
358
- cookie
359
- }
360
-
361
349
impl < E > HTTPSessionEndpoint < E > {
362
350
#[ async_backtrace:: framed]
363
351
async fn auth (
@@ -371,6 +359,31 @@ impl<E> HTTPSessionEndpoint<E> {
371
359
let node_id = GlobalConfig :: instance ( ) . query . node_id . clone ( ) ;
372
360
login_history. client_ip = client_host. clone ( ) . unwrap_or_default ( ) ;
373
361
login_history. node_id = node_id. clone ( ) ;
362
+ let user_agent = req
363
+ . headers ( )
364
+ . get ( USER_AGENT )
365
+ . map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
366
+
367
+ let is_worksheet = user_agent
368
+ . as_ref ( )
369
+ . map ( |ua_str| {
370
+ [
371
+ // only worksheet client run in browser.
372
+ // most browser ua contain multi of them
373
+ "Mozilla" ,
374
+ "Chrome" ,
375
+ "Firefox" ,
376
+ "Safari" ,
377
+ "Edge" ,
378
+ // worksheet start query with ua like DatabendCloud/worksheet=4703;
379
+ "worksheet" ,
380
+ ]
381
+ . iter ( )
382
+ . any ( |kw| ua_str. contains ( kw) )
383
+ } )
384
+ . unwrap_or ( false ) ;
385
+
386
+ login_history. user_agent = user_agent. clone ( ) . unwrap_or_default ( ) ;
374
387
375
388
let credential = get_credential ( req, self . kind , self . endpoint_kind ) ?;
376
389
login_history. auth_type = credential. type_name ( ) ;
@@ -383,14 +396,6 @@ impl<E> HTTPSessionEndpoint<E> {
383
396
let tenant = Tenant :: new_or_err ( tenant_id. clone ( ) , func_name ! ( ) ) ?;
384
397
session. set_current_tenant ( tenant) ;
385
398
}
386
-
387
- // cookie_enabled is used to recognize old clients that not support cookie yet.
388
- // for these old clients, there is no session id available, thus can not use temp table.
389
- let cookie_enabled = req. cookie ( ) . get ( COOKIE_COOKIE_ENABLED ) . is_some ( ) ;
390
- let cookie_session_id = req
391
- . cookie ( )
392
- . get ( COOKIE_SESSION_ID )
393
- . map ( |s| s. value_str ( ) . to_string ( ) ) ;
394
399
let ( user_name, authed_client_session_id) = self
395
400
. auth_manager
396
401
. auth (
@@ -401,75 +406,42 @@ impl<E> HTTPSessionEndpoint<E> {
401
406
. await ?;
402
407
login_history. user_name = user_name. clone ( ) ;
403
408
404
- // If cookie_session_id is set, we disable writing to login_history.
405
- // The cookie_session_id is initially issued by the server to the client upon the first successful login.
406
- // For all subsequent requests, the client includes this session_id with each request.
407
- // This indicates the user is already logged in, so we skip recording another login event.
408
- if cookie_session_id. is_some ( ) {
409
- login_history. disable_write = true ;
409
+ let mut client_session = ClientSession :: try_decode ( req) ?;
410
+ if client_session. is_none ( ) && !matches ! ( self . endpoint_kind, EndpointKind :: PollQuery ) {
411
+ info ! (
412
+ "[HTTP-SESSION] got request without session, url={}, headers={:?}" ,
413
+ req. uri( ) ,
414
+ & req. headers( )
415
+ ) ;
410
416
}
411
- let client_session_id = match ( & authed_client_session_id, & cookie_session_id) {
412
- ( Some ( id1) , Some ( id2) ) => {
413
- if id1 != id2 {
414
- return Err ( ErrorCode :: AuthenticateFailure ( format ! (
415
- "[HTTP-SESSION] Session ID mismatch: token session ID '{}' does not match cookie session ID '{}'" ,
416
- id1, id2
417
- ) ) ) ;
418
- }
419
- Some ( id1. clone ( ) )
420
- }
421
- ( Some ( id) , None ) => {
422
- if cookie_enabled {
423
- req. cookie ( ) . add ( make_cookie ( COOKIE_SESSION_ID , id) ) ;
424
- }
425
- Some ( id. clone ( ) )
426
- }
427
- ( None , Some ( id) ) => Some ( id. clone ( ) ) ,
428
- ( None , None ) => {
429
- if cookie_enabled {
430
- let id = Uuid :: new_v4 ( ) . to_string ( ) ;
431
- info ! ( "[HTTP-SESSION] Created new session with ID: {}" , id) ;
432
- req. cookie ( ) . add ( make_cookie ( COOKIE_SESSION_ID , & id) ) ;
433
- Some ( id)
434
- } else {
435
- None
436
- }
437
- }
438
- } ;
439
- login_history. session_id = client_session_id. clone ( ) . unwrap_or_default ( ) ;
440
417
441
- if let Some ( id) = & client_session_id {
442
- session. set_client_session_id ( id. clone ( ) ) ;
418
+ if let ( Some ( id1) , Some ( c) ) = ( & authed_client_session_id, & client_session) {
419
+ if * id1 != c. header . id {
420
+ return Err ( ErrorCode :: AuthenticateFailure ( format ! (
421
+ "[HTTP-SESSION] Session ID mismatch: token session ID '{}' does not match header session ID '{}'" ,
422
+ id1, c. header. id
423
+ ) ) ) ;
424
+ }
443
425
}
444
-
445
- if cookie_enabled {
446
- let last_refresh_time = req
447
- . cookie ( )
448
- . get ( COOKIE_LAST_REFRESH_TIME )
449
- . map ( |s| s. value_str ( ) . to_string ( ) ) ;
450
-
451
- let need_update = if let Some ( ts) = & last_refresh_time {
452
- let ts = ts. parse :: < u64 > ( ) . map_err ( |_| {
453
- ErrorCode :: BadArguments ( format ! (
454
- "[HTTP-SESSION] Invalid last_refresh_time value: {}" ,
455
- ts
456
- ) )
457
- } ) ?;
458
- let ts = SystemTime :: UNIX_EPOCH + Duration :: from_secs ( ts) ;
459
- if let Some ( id) = & client_session_id {
460
- ClientSessionManager :: instance ( )
461
- . refresh_state ( session. get_current_tenant ( ) , id, & user_name, & ts)
462
- . await ?
463
- } else {
464
- true
465
- }
466
- } else {
467
- true
468
- } ;
469
- if need_update {
470
- let ts = unix_ts ( ) . as_secs ( ) . to_string ( ) ;
471
- req. cookie ( ) . add ( make_cookie ( COOKIE_LAST_REFRESH_TIME , ts) ) ;
426
+ login_history. disable_write = false ;
427
+ if let Some ( s) = & mut client_session {
428
+ let sid = s. header . id . clone ( ) ;
429
+ session. set_client_session_id ( sid. clone ( ) ) ;
430
+ login_history. session_id = sid. clone ( ) ;
431
+ if !s. is_new_session {
432
+ // if session enabled by client:
433
+ // log for the first request of the session.
434
+ // else:
435
+ // log every request, which can be distinguished by `session_id = ''`
436
+ login_history. disable_write = true ;
472
437
}
438
+ s. try_refresh_state (
439
+ session. get_current_tenant ( ) ,
440
+ & user_name,
441
+ req. cookie ( ) ,
442
+ is_worksheet,
443
+ )
444
+ . await ?;
473
445
}
474
446
475
447
let session = session_manager. register_session ( session) ?;
@@ -479,12 +451,6 @@ impl<E> HTTPSessionEndpoint<E> {
479
451
. get ( HEADER_DEDUPLICATE_LABEL )
480
452
. map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
481
453
482
- let user_agent = req
483
- . headers ( )
484
- . get ( USER_AGENT )
485
- . map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
486
- login_history. user_agent = user_agent. clone ( ) . unwrap_or_default ( ) ;
487
-
488
454
let expected_node_id = req
489
455
. headers ( )
490
456
. get ( HEADER_NODE_ID )
@@ -509,9 +475,11 @@ impl<E> HTTPSessionEndpoint<E> {
509
475
http_method : req. method ( ) . to_string ( ) ,
510
476
uri : req. uri ( ) . to_string ( ) ,
511
477
client_host,
512
- client_session_id,
478
+ client_session_id : client_session . as_ref ( ) . map ( |s| s . header . id . clone ( ) ) ,
513
479
user_name,
514
480
is_sticky_node,
481
+ client_session,
482
+ fixed_coordinator_node : is_worksheet,
515
483
} )
516
484
}
517
485
}
@@ -698,8 +666,15 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
698
666
Ok ( ctx) => {
699
667
login_history. event_type = LoginEventType :: LoginSuccess ;
700
668
login_history. write_to_log ( ) ;
669
+ let client_session = ctx. client_session . clone ( ) ;
701
670
req. extensions_mut ( ) . insert ( ctx) ;
702
- self . ep . call ( req) . await . map ( |v| v. into_response ( ) )
671
+ self . ep . call ( req) . await . map ( |v| {
672
+ let mut r = v. into_response ( ) ;
673
+ if let Some ( s) = client_session {
674
+ s. on_response ( & mut r) ;
675
+ }
676
+ r
677
+ } )
703
678
}
704
679
Err ( err) => {
705
680
login_history. event_type = LoginEventType :: LoginFailed ;
0 commit comments