3
3
4
4
mod handlers;
5
5
mod subscriptions;
6
- pub ( crate ) mod pending_requests ;
6
+ pub ( crate ) mod req_queue ;
7
7
8
8
use std:: {
9
9
borrow:: Cow ,
@@ -28,7 +28,6 @@ use ra_ide::{Canceled, FileId, LineIndex};
28
28
use ra_prof:: profile;
29
29
use ra_project_model:: { PackageRoot , ProjectWorkspace } ;
30
30
use ra_vfs:: VfsTask ;
31
- use rustc_hash:: FxHashSet ;
32
31
use serde:: { de:: DeserializeOwned , Serialize } ;
33
32
use threadpool:: ThreadPool ;
34
33
@@ -38,12 +37,10 @@ use crate::{
38
37
from_proto,
39
38
global_state:: { file_id_to_url, GlobalState , GlobalStateSnapshot } ,
40
39
lsp_ext,
41
- main_loop:: {
42
- pending_requests:: { PendingRequest , PendingRequests } ,
43
- subscriptions:: Subscriptions ,
44
- } ,
40
+ main_loop:: subscriptions:: Subscriptions ,
45
41
Result ,
46
42
} ;
43
+ use req_queue:: ReqQueue ;
47
44
48
45
#[ derive( Debug ) ]
49
46
pub struct LspError {
@@ -153,10 +150,10 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
153
150
register_options : Some ( serde_json:: to_value ( registration_options) . unwrap ( ) ) ,
154
151
} ;
155
152
let params = lsp_types:: RegistrationParams { registrations : vec ! [ registration] } ;
156
- let request = request_new :: < lsp_types :: request :: RegisterCapability > (
157
- loop_state . next_request_id ( ) ,
158
- params ,
159
- ) ;
153
+ let request = loop_state
154
+ . req_queue
155
+ . outgoing
156
+ . register :: < lsp_types :: request :: RegisterCapability > ( params , |_ , _| ( ) ) ;
160
157
connection. sender . send ( request. into ( ) ) . unwrap ( ) ;
161
158
}
162
159
@@ -199,7 +196,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
199
196
global_state. analysis_host . request_cancellation ( ) ;
200
197
log:: info!( "waiting for tasks to finish..." ) ;
201
198
task_receiver. into_iter ( ) . for_each ( |task| {
202
- on_task ( task, & connection. sender , & mut loop_state. pending_requests , & mut global_state)
199
+ on_task ( task, & connection. sender , & mut loop_state. req_queue . incoming , & mut global_state)
203
200
} ) ;
204
201
log:: info!( "...tasks have finished" ) ;
205
202
log:: info!( "joining threadpool..." ) ;
@@ -264,27 +261,14 @@ impl fmt::Debug for Event {
264
261
}
265
262
}
266
263
267
- #[ derive( Debug , Default ) ]
264
+ #[ derive( Default ) ]
268
265
struct LoopState {
269
- next_request_id : u64 ,
270
- pending_responses : FxHashSet < RequestId > ,
271
- pending_requests : PendingRequests ,
266
+ req_queue : ReqQueue < fn ( & mut GlobalState , lsp_server:: Response ) > ,
272
267
subscriptions : Subscriptions ,
273
268
workspace_loaded : bool ,
274
269
roots_progress_reported : Option < usize > ,
275
270
roots_scanned : usize ,
276
271
roots_total : usize ,
277
- configuration_request_id : Option < RequestId > ,
278
- }
279
-
280
- impl LoopState {
281
- fn next_request_id ( & mut self ) -> RequestId {
282
- self . next_request_id += 1 ;
283
- let res: RequestId = self . next_request_id . into ( ) ;
284
- let inserted = self . pending_responses . insert ( res. clone ( ) ) ;
285
- assert ! ( inserted) ;
286
- res
287
- }
288
272
}
289
273
290
274
fn loop_turn (
@@ -307,7 +291,7 @@ fn loop_turn(
307
291
308
292
match event {
309
293
Event :: Task ( task) => {
310
- on_task ( task, & connection. sender , & mut loop_state. pending_requests , global_state) ;
294
+ on_task ( task, & connection. sender , & mut loop_state. req_queue . incoming , global_state) ;
311
295
global_state. maybe_collect_garbage ( ) ;
312
296
}
313
297
Event :: Vfs ( task) => {
@@ -317,7 +301,7 @@ fn loop_turn(
317
301
Event :: Msg ( msg) => match msg {
318
302
Message :: Request ( req) => on_request (
319
303
global_state,
320
- & mut loop_state. pending_requests ,
304
+ & mut loop_state. req_queue . incoming ,
321
305
pool,
322
306
task_sender,
323
307
& connection. sender ,
@@ -328,32 +312,8 @@ fn loop_turn(
328
312
on_notification ( & connection. sender , global_state, loop_state, not) ?;
329
313
}
330
314
Message :: Response ( resp) => {
331
- let removed = loop_state. pending_responses . remove ( & resp. id ) ;
332
- if !removed {
333
- log:: error!( "unexpected response: {:?}" , resp)
334
- }
335
-
336
- if Some ( & resp. id ) == loop_state. configuration_request_id . as_ref ( ) {
337
- loop_state. configuration_request_id = None ;
338
- log:: debug!( "config update response: '{:?}" , resp) ;
339
- let Response { error, result, .. } = resp;
340
-
341
- match ( error, result) {
342
- ( Some ( err) , _) => {
343
- log:: error!( "failed to fetch the server settings: {:?}" , err)
344
- }
345
- ( None , Some ( configs) ) => {
346
- if let Some ( new_config) = configs. get ( 0 ) {
347
- let mut config = global_state. config . clone ( ) ;
348
- config. update ( & new_config) ;
349
- global_state. update_configuration ( config) ;
350
- }
351
- }
352
- ( None , None ) => {
353
- log:: error!( "received empty server settings response from the client" )
354
- }
355
- }
356
- }
315
+ let handler = loop_state. req_queue . outgoing . complete ( resp. id . clone ( ) ) ;
316
+ handler ( global_state, resp)
357
317
}
358
318
} ,
359
319
} ;
@@ -407,12 +367,12 @@ fn loop_turn(
407
367
fn on_task (
408
368
task : Task ,
409
369
msg_sender : & Sender < Message > ,
410
- pending_requests : & mut PendingRequests ,
370
+ incoming_requests : & mut req_queue :: Incoming ,
411
371
state : & mut GlobalState ,
412
372
) {
413
373
match task {
414
374
Task :: Respond ( response) => {
415
- if let Some ( completed) = pending_requests . finish ( & response. id ) {
375
+ if let Some ( completed) = incoming_requests . complete ( response. id . clone ( ) ) {
416
376
log:: info!( "handled req#{} in {:?}" , completed. id, completed. duration) ;
417
377
state. complete_request ( completed) ;
418
378
msg_sender. send ( response. into ( ) ) . unwrap ( ) ;
@@ -427,7 +387,7 @@ fn on_task(
427
387
428
388
fn on_request (
429
389
global_state : & mut GlobalState ,
430
- pending_requests : & mut PendingRequests ,
390
+ incoming_requests : & mut req_queue :: Incoming ,
431
391
pool : & ThreadPool ,
432
392
task_sender : & Sender < Task > ,
433
393
msg_sender : & Sender < Message > ,
@@ -440,7 +400,7 @@ fn on_request(
440
400
global_state,
441
401
task_sender,
442
402
msg_sender,
443
- pending_requests ,
403
+ incoming_requests ,
444
404
request_received,
445
405
} ;
446
406
pool_dispatcher
@@ -504,12 +464,7 @@ fn on_notification(
504
464
NumberOrString :: Number ( id) => id. into ( ) ,
505
465
NumberOrString :: String ( id) => id. into ( ) ,
506
466
} ;
507
- if loop_state. pending_requests . cancel ( & id) {
508
- let response = Response :: new_err (
509
- id,
510
- ErrorCode :: RequestCanceled as i32 ,
511
- "canceled by client" . to_string ( ) ,
512
- ) ;
467
+ if let Some ( response) = loop_state. req_queue . incoming . cancel ( id) {
513
468
msg_sender. send ( response. into ( ) ) . unwrap ( )
514
469
}
515
470
return Ok ( ( ) ) ;
@@ -572,18 +527,38 @@ fn on_notification(
572
527
Ok ( _) => {
573
528
// As stated in https://github.com/microsoft/language-server-protocol/issues/676,
574
529
// this notification's parameters should be ignored and the actual config queried separately.
575
- let request_id = loop_state. next_request_id ( ) ;
576
- let request = request_new :: < lsp_types:: request:: WorkspaceConfiguration > (
577
- request_id. clone ( ) ,
578
- lsp_types:: ConfigurationParams {
579
- items : vec ! [ lsp_types:: ConfigurationItem {
580
- scope_uri: None ,
581
- section: Some ( "rust-analyzer" . to_string( ) ) ,
582
- } ] ,
583
- } ,
584
- ) ;
530
+ let request = loop_state
531
+ . req_queue
532
+ . outgoing
533
+ . register :: < lsp_types:: request:: WorkspaceConfiguration > (
534
+ lsp_types:: ConfigurationParams {
535
+ items : vec ! [ lsp_types:: ConfigurationItem {
536
+ scope_uri: None ,
537
+ section: Some ( "rust-analyzer" . to_string( ) ) ,
538
+ } ] ,
539
+ } ,
540
+ |global_state, resp| {
541
+ log:: debug!( "config update response: '{:?}" , resp) ;
542
+ let Response { error, result, .. } = resp;
543
+
544
+ match ( error, result) {
545
+ ( Some ( err) , _) => {
546
+ log:: error!( "failed to fetch the server settings: {:?}" , err)
547
+ }
548
+ ( None , Some ( configs) ) => {
549
+ if let Some ( new_config) = configs. get ( 0 ) {
550
+ let mut config = global_state. config . clone ( ) ;
551
+ config. update ( & new_config) ;
552
+ global_state. update_configuration ( config) ;
553
+ }
554
+ }
555
+ ( None , None ) => log:: error!(
556
+ "received empty server settings response from the client"
557
+ ) ,
558
+ }
559
+ } ,
560
+ ) ;
585
561
msg_sender. send ( request. into ( ) ) ?;
586
- loop_state. configuration_request_id = Some ( request_id) ;
587
562
588
563
return Ok ( ( ) ) ;
589
564
}
@@ -752,13 +727,16 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
752
727
753
728
match ( prev, loop_state. workspace_loaded ) {
754
729
( None , false ) => {
755
- let work_done_progress_create = request_new :: < lsp_types:: request:: WorkDoneProgressCreate > (
756
- loop_state. next_request_id ( ) ,
757
- WorkDoneProgressCreateParams {
758
- token : lsp_types:: ProgressToken :: String ( "rustAnalyzer/startup" . into ( ) ) ,
759
- } ,
760
- ) ;
761
- sender. send ( work_done_progress_create. into ( ) ) . unwrap ( ) ;
730
+ let request = loop_state
731
+ . req_queue
732
+ . outgoing
733
+ . register :: < lsp_types:: request:: WorkDoneProgressCreate > (
734
+ WorkDoneProgressCreateParams {
735
+ token : lsp_types:: ProgressToken :: String ( "rustAnalyzer/startup" . into ( ) ) ,
736
+ } ,
737
+ |_, _| ( ) ,
738
+ ) ;
739
+ sender. send ( request. into ( ) ) . unwrap ( ) ;
762
740
send_startup_progress_notif (
763
741
sender,
764
742
WorkDoneProgress :: Begin ( WorkDoneProgressBegin {
@@ -800,7 +778,7 @@ struct PoolDispatcher<'a> {
800
778
req : Option < Request > ,
801
779
pool : & ' a ThreadPool ,
802
780
global_state : & ' a mut GlobalState ,
803
- pending_requests : & ' a mut PendingRequests ,
781
+ incoming_requests : & ' a mut req_queue :: Incoming ,
804
782
msg_sender : & ' a Sender < Message > ,
805
783
task_sender : & ' a Sender < Task > ,
806
784
request_received : Instant ,
@@ -829,7 +807,7 @@ impl<'a> PoolDispatcher<'a> {
829
807
result_to_task :: < R > ( id, result)
830
808
} )
831
809
. map_err ( |_| format ! ( "sync task {:?} panicked" , R :: METHOD ) ) ?;
832
- on_task ( task, self . msg_sender , self . pending_requests , self . global_state ) ;
810
+ on_task ( task, self . msg_sender , self . incoming_requests , self . global_state ) ;
833
811
Ok ( self )
834
812
}
835
813
@@ -876,7 +854,7 @@ impl<'a> PoolDispatcher<'a> {
876
854
return None ;
877
855
}
878
856
} ;
879
- self . pending_requests . start ( PendingRequest {
857
+ self . incoming_requests . register ( req_queue :: PendingInRequest {
880
858
id : id. clone ( ) ,
881
859
method : R :: METHOD . to_string ( ) ,
882
860
received : self . request_received ,
@@ -993,14 +971,6 @@ where
993
971
Notification :: new ( N :: METHOD . to_string ( ) , params)
994
972
}
995
973
996
- fn request_new < R > ( id : RequestId , params : R :: Params ) -> Request
997
- where
998
- R : lsp_types:: request:: Request ,
999
- R :: Params : Serialize ,
1000
- {
1001
- Request :: new ( id, R :: METHOD . to_string ( ) , params)
1002
- }
1003
-
1004
974
#[ cfg( test) ]
1005
975
mod tests {
1006
976
use std:: borrow:: Cow ;
0 commit comments