1
- use futures:: { future:: BoxFuture , Future , FutureExt } ;
1
+ use futures:: {
2
+ future:: { BoxFuture , OptionFuture } ,
3
+ Future , FutureExt ,
4
+ } ;
2
5
use snafu:: prelude:: * ;
3
6
use std:: {
4
7
collections:: HashMap ,
@@ -14,7 +17,7 @@ use tokio::{
14
17
join,
15
18
process:: { Child , ChildStdin , ChildStdout , Command } ,
16
19
select,
17
- sync:: { mpsc, oneshot} ,
20
+ sync:: { mpsc, oneshot, OnceCell } ,
18
21
task:: { JoinHandle , JoinSet } ,
19
22
time:: { self , MissedTickBehavior } ,
20
23
} ;
@@ -67,8 +70,6 @@ pub enum Channel {
67
70
}
68
71
69
72
impl Channel {
70
- pub ( crate ) const ALL : [ Self ; 3 ] = [ Self :: Stable , Self :: Beta , Self :: Nightly ] ;
71
-
72
73
#[ cfg( test) ]
73
74
pub ( crate ) fn to_str ( self ) -> & ' static str {
74
75
match self {
@@ -401,49 +402,50 @@ enum DemultiplexCommand {
401
402
pub struct Coordinator < B > {
402
403
backend : B ,
403
404
// Consider making these lazily-created and/or idly time out
404
- stable : Container ,
405
- beta : Container ,
406
- nightly : Container ,
405
+ stable : OnceCell < Container > ,
406
+ beta : OnceCell < Container > ,
407
+ nightly : OnceCell < Container > ,
407
408
token : CancellationToken ,
408
409
}
409
410
410
411
impl < B > Coordinator < B >
411
412
where
412
413
B : Backend ,
413
414
{
414
- pub async fn new ( backend : B ) -> Result < Self , Error > {
415
+ pub async fn new ( backend : B ) -> Self {
415
416
let token = CancellationToken :: new ( ) ;
416
417
417
- let [ stable, beta, nightly] =
418
- Channel :: ALL . map ( |channel| Container :: new ( channel, token. clone ( ) , & backend) ) ;
419
-
420
- let ( stable, beta, nightly) = join ! ( stable, beta, nightly) ;
421
-
422
- let stable = stable?;
423
- let beta = beta?;
424
- let nightly = nightly?;
425
-
426
- Ok ( Self {
418
+ Self {
427
419
backend,
428
- stable,
429
- beta,
430
- nightly,
420
+ stable : OnceCell :: new ( ) ,
421
+ beta : OnceCell :: new ( ) ,
422
+ nightly : OnceCell :: new ( ) ,
431
423
token,
432
- } )
424
+ }
433
425
}
434
426
435
427
pub async fn execute (
436
428
& self ,
437
429
request : ExecuteRequest ,
438
430
) -> Result < WithOutput < ExecuteResponse > , ExecuteError > {
439
- self . select_channel ( request. channel ) . execute ( request) . await
431
+ use execute_error:: * ;
432
+
433
+ self . select_channel ( request. channel )
434
+ . await
435
+ . context ( CouldNotStartContainerSnafu ) ?
436
+ . execute ( request)
437
+ . await
440
438
}
441
439
442
440
pub async fn begin_execute (
443
441
& self ,
444
442
request : ExecuteRequest ,
445
443
) -> Result < ActiveExecution , ExecuteError > {
444
+ use execute_error:: * ;
445
+
446
446
self . select_channel ( request. channel )
447
+ . await
448
+ . context ( CouldNotStartContainerSnafu ) ?
447
449
. begin_execute ( request)
448
450
. await
449
451
}
@@ -452,48 +454,72 @@ where
452
454
& self ,
453
455
request : CompileRequest ,
454
456
) -> Result < WithOutput < CompileResponse > , CompileError > {
455
- self . select_channel ( request. channel ) . compile ( request) . await
457
+ use compile_error:: * ;
458
+
459
+ self . select_channel ( request. channel )
460
+ . await
461
+ . context ( CouldNotStartContainerSnafu ) ?
462
+ . compile ( request)
463
+ . await
456
464
}
457
465
458
466
pub async fn begin_compile (
459
467
& self ,
460
468
request : CompileRequest ,
461
469
) -> Result < ActiveCompilation , CompileError > {
470
+ use compile_error:: * ;
471
+
462
472
self . select_channel ( request. channel )
473
+ . await
474
+ . context ( CouldNotStartContainerSnafu ) ?
463
475
. begin_compile ( request)
464
476
. await
465
477
}
466
478
467
- pub async fn shutdown ( self ) -> Result < B > {
479
+ pub async fn idle ( & mut self ) -> Result < ( ) > {
468
480
let Self {
469
- backend,
470
481
stable,
471
482
beta,
472
483
nightly,
473
484
token,
485
+ ..
474
486
} = self ;
475
487
token. cancel ( ) ;
476
488
477
- let ( stable, beta, nightly) = join ! ( stable. shutdown( ) , beta. shutdown( ) , nightly. shutdown( ) ) ;
489
+ let channels =
490
+ [ stable, beta, nightly] . map ( |c| OptionFuture :: from ( c. take ( ) . map ( |c| c. shutdown ( ) ) ) ) ;
491
+
492
+ let [ stable, beta, nightly] = channels;
493
+
494
+ let ( stable, beta, nightly) = join ! ( stable, beta, nightly) ;
478
495
479
- stable?;
480
- beta?;
481
- nightly?;
496
+ stable. transpose ( ) ?;
497
+ beta. transpose ( ) ?;
498
+ nightly. transpose ( ) ?;
482
499
483
- Ok ( backend )
500
+ Ok ( ( ) )
484
501
}
485
502
486
- fn select_channel ( & self , channel : Channel ) -> & Container {
487
- match channel {
503
+ pub async fn shutdown ( mut self ) -> Result < B > {
504
+ self . idle ( ) . await ?;
505
+ Ok ( self . backend )
506
+ }
507
+
508
+ async fn select_channel ( & self , channel : Channel ) -> Result < & Container , Error > {
509
+ let container = match channel {
488
510
Channel :: Stable => & self . stable ,
489
511
Channel :: Beta => & self . beta ,
490
512
Channel :: Nightly => & self . nightly ,
491
- }
513
+ } ;
514
+
515
+ container
516
+ . get_or_try_init ( || Container :: new ( channel, self . token . clone ( ) , & self . backend ) )
517
+ . await
492
518
}
493
519
}
494
520
495
521
impl Coordinator < DockerBackend > {
496
- pub async fn new_docker ( ) -> Result < Self , Error > {
522
+ pub async fn new_docker ( ) -> Self {
497
523
Self :: new ( DockerBackend ( ( ) ) ) . await
498
524
}
499
525
}
@@ -771,6 +797,9 @@ impl fmt::Debug for ActiveExecution {
771
797
#[ derive( Debug , Snafu ) ]
772
798
#[ snafu( module) ]
773
799
pub enum ExecuteError {
800
+ #[ snafu( display( "Could not start the container" ) ) ]
801
+ CouldNotStartContainer { source : Error } ,
802
+
774
803
#[ snafu( display( "Could not modify Cargo.toml" ) ) ]
775
804
CouldNotModifyCargoToml { source : ModifyCargoTomlError } ,
776
805
@@ -809,6 +838,9 @@ impl fmt::Debug for ActiveCompilation {
809
838
#[ derive( Debug , Snafu ) ]
810
839
#[ snafu( module) ]
811
840
pub enum CompileError {
841
+ #[ snafu( display( "Could not start the container" ) ) ]
842
+ CouldNotStartContainer { source : Error } ,
843
+
812
844
#[ snafu( display( "Could not modify Cargo.toml" ) ) ]
813
845
CouldNotModifyCargoToml { source : ModifyCargoTomlError } ,
814
846
@@ -1376,7 +1408,7 @@ mod tests {
1376
1408
}
1377
1409
}
1378
1410
1379
- async fn new_coordinator ( ) -> Result < Coordinator < impl Backend > > {
1411
+ async fn new_coordinator ( ) -> Coordinator < impl Backend > {
1380
1412
Coordinator :: new ( TestBackend :: new ( ) ) . await
1381
1413
//Coordinator::new_docker().await
1382
1414
}
@@ -1396,7 +1428,7 @@ mod tests {
1396
1428
#[ tokio:: test]
1397
1429
#[ snafu:: report]
1398
1430
async fn test_execute_response ( ) -> Result < ( ) > {
1399
- let coordinator = new_coordinator ( ) . await ? ;
1431
+ let coordinator = new_coordinator ( ) . await ;
1400
1432
1401
1433
let response = coordinator
1402
1434
. execute ( new_execute_request ( ) )
@@ -1424,7 +1456,7 @@ mod tests {
1424
1456
] ;
1425
1457
1426
1458
let tests = params. into_iter ( ) . map ( |( mode, expected) | async move {
1427
- let coordinator = new_coordinator ( ) . await ? ;
1459
+ let coordinator = new_coordinator ( ) . await ;
1428
1460
1429
1461
let request = ExecuteRequest {
1430
1462
mode,
@@ -1456,7 +1488,7 @@ mod tests {
1456
1488
let tests = params. into_iter ( ) . flat_map ( |( code, works_in) | {
1457
1489
Edition :: ALL . into_iter ( ) . zip ( works_in) . map (
1458
1490
move |( edition, expected_to_work) | async move {
1459
- let coordinator = new_coordinator ( ) . await ? ;
1491
+ let coordinator = new_coordinator ( ) . await ;
1460
1492
1461
1493
let request = ExecuteRequest {
1462
1494
code : code. into ( ) ,
@@ -1496,7 +1528,7 @@ mod tests {
1496
1528
] ;
1497
1529
1498
1530
let tests = params. into_iter ( ) . map ( |( crate_type, expected) | async move {
1499
- let coordinator = new_coordinator ( ) . await ? ;
1531
+ let coordinator = new_coordinator ( ) . await ;
1500
1532
1501
1533
let request = ExecuteRequest {
1502
1534
crate_type,
@@ -1529,7 +1561,7 @@ mod tests {
1529
1561
let params = [ ( false , "Running `" ) , ( true , "Running unittests" ) ] ;
1530
1562
1531
1563
let tests = params. into_iter ( ) . map ( |( tests, expected) | async move {
1532
- let coordinator = new_coordinator ( ) . await ? ;
1564
+ let coordinator = new_coordinator ( ) . await ;
1533
1565
1534
1566
let request = ExecuteRequest {
1535
1567
code : code. into ( ) ,
@@ -1562,7 +1594,7 @@ mod tests {
1562
1594
] ;
1563
1595
1564
1596
let tests = params. into_iter ( ) . map ( |( backtrace, expected) | async move {
1565
- let coordinator = new_coordinator ( ) . await ? ;
1597
+ let coordinator = new_coordinator ( ) . await ;
1566
1598
1567
1599
let request = ExecuteRequest {
1568
1600
code : code. into ( ) ,
@@ -1668,7 +1700,7 @@ mod tests {
1668
1700
#[ tokio:: test]
1669
1701
#[ snafu:: report]
1670
1702
async fn test_compile_response ( ) -> Result < ( ) > {
1671
- let coordinator = new_coordinator ( ) . await ? ;
1703
+ let coordinator = new_coordinator ( ) . await ;
1672
1704
1673
1705
let response = coordinator
1674
1706
. compile ( new_compile_request ( ) )
@@ -1688,7 +1720,7 @@ mod tests {
1688
1720
#[ tokio:: test]
1689
1721
#[ snafu:: report]
1690
1722
async fn test_compile_streaming ( ) -> Result < ( ) > {
1691
- let coordinator = new_coordinator ( ) . await ? ;
1723
+ let coordinator = new_coordinator ( ) . await ;
1692
1724
1693
1725
let ActiveCompilation {
1694
1726
task,
@@ -1723,7 +1755,7 @@ mod tests {
1723
1755
#[ snafu:: report]
1724
1756
async fn test_compile_edition ( ) -> Result < ( ) > {
1725
1757
for edition in Edition :: ALL {
1726
- let coordinator = new_coordinator ( ) . await ? ;
1758
+ let coordinator = new_coordinator ( ) . await ;
1727
1759
1728
1760
let response = coordinator
1729
1761
. compile ( new_compile_hir_request_for ( edition) )
@@ -1745,7 +1777,7 @@ mod tests {
1745
1777
#[ tokio:: test]
1746
1778
#[ snafu:: report]
1747
1779
async fn test_compile_assembly ( ) -> Result < ( ) > {
1748
- let coordinator = new_coordinator ( ) . await ? ;
1780
+ let coordinator = new_coordinator ( ) . await ;
1749
1781
1750
1782
let response = coordinator
1751
1783
. compile ( new_compile_assembly_request ( ) )
@@ -1770,7 +1802,7 @@ mod tests {
1770
1802
#[ tokio:: test]
1771
1803
#[ snafu:: report]
1772
1804
async fn test_compile_hir ( ) -> Result < ( ) > {
1773
- let coordinator = new_coordinator ( ) . await ? ;
1805
+ let coordinator = new_coordinator ( ) . await ;
1774
1806
1775
1807
let response = coordinator
1776
1808
. compile ( new_compile_hir_request ( ) )
@@ -1789,7 +1821,7 @@ mod tests {
1789
1821
#[ tokio:: test]
1790
1822
#[ snafu:: report]
1791
1823
async fn test_compile_llvm_ir ( ) -> Result < ( ) > {
1792
- let coordinator = new_coordinator ( ) . await ? ;
1824
+ let coordinator = new_coordinator ( ) . await ;
1793
1825
1794
1826
let response = coordinator
1795
1827
. compile ( new_compile_llvm_ir_request ( ) )
@@ -1809,7 +1841,7 @@ mod tests {
1809
1841
#[ snafu:: report]
1810
1842
async fn test_compile_wasm ( ) -> Result < ( ) > {
1811
1843
// cargo-wasm only exists inside the container
1812
- let coordinator = Coordinator :: new_docker ( ) . await ? ;
1844
+ let coordinator = Coordinator :: new_docker ( ) . await ;
1813
1845
1814
1846
let response = coordinator
1815
1847
. compile ( new_compile_wasm_request ( ) )
@@ -1831,7 +1863,7 @@ mod tests {
1831
1863
#[ tokio:: test]
1832
1864
#[ snafu:: report]
1833
1865
async fn test_compile_clears_old_main_rs ( ) -> Result < ( ) > {
1834
- let coordinator = new_coordinator ( ) . await ? ;
1866
+ let coordinator = new_coordinator ( ) . await ;
1835
1867
1836
1868
// Create a main.rs file
1837
1869
let req = ExecuteRequest {
0 commit comments