@@ -1709,8 +1709,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
1709
1709
mod tests {
1710
1710
use assertables:: * ;
1711
1711
use futures:: { future:: try_join_all, Future , FutureExt } ;
1712
- use std:: { sync:: Once , time:: Duration } ;
1712
+ use once_cell:: sync:: Lazy ;
1713
+ use std:: { env, sync:: Once , time:: Duration } ;
1713
1714
use tempdir:: TempDir ;
1715
+ use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
1714
1716
1715
1717
use super :: * ;
1716
1718
@@ -1774,15 +1776,78 @@ mod tests {
1774
1776
}
1775
1777
}
1776
1778
1777
- async fn new_coordinator ( ) -> Coordinator < impl Backend > {
1779
+ const MAX_CONCURRENT_TESTS : Lazy < usize > = Lazy :: new ( || {
1780
+ env:: var ( "TESTS_MAX_CONCURRENCY" )
1781
+ . ok ( )
1782
+ . and_then ( |v| v. parse ( ) . ok ( ) )
1783
+ . unwrap_or ( 5 )
1784
+ } ) ;
1785
+
1786
+ static CONCURRENT_TEST_SEMAPHORE : Lazy < Arc < Semaphore > > =
1787
+ Lazy :: new ( || Arc :: new ( Semaphore :: new ( * MAX_CONCURRENT_TESTS ) ) ) ;
1788
+
1789
+ struct RestrictedCoordinator < T > {
1790
+ _permit : OwnedSemaphorePermit ,
1791
+ coordinator : Coordinator < T > ,
1792
+ }
1793
+
1794
+ impl < T > RestrictedCoordinator < T >
1795
+ where
1796
+ T : Backend ,
1797
+ {
1798
+ async fn with < F , Fut > ( f : F ) -> Self
1799
+ where
1800
+ F : FnOnce ( ) -> Fut ,
1801
+ Fut : Future < Output = Coordinator < T > > ,
1802
+ {
1803
+ let semaphore = CONCURRENT_TEST_SEMAPHORE . clone ( ) ;
1804
+ let permit = semaphore
1805
+ . acquire_owned ( )
1806
+ . await
1807
+ . expect ( "Unable to acquire permit" ) ;
1808
+ let coordinator = f ( ) . await ;
1809
+ Self {
1810
+ _permit : permit,
1811
+ coordinator,
1812
+ }
1813
+ }
1814
+
1815
+ async fn shutdown ( self ) -> super :: Result < T , super :: Error > {
1816
+ self . coordinator . shutdown ( ) . await
1817
+ }
1818
+ }
1819
+
1820
+ impl < T > ops:: Deref for RestrictedCoordinator < T > {
1821
+ type Target = Coordinator < T > ;
1822
+
1823
+ fn deref ( & self ) -> & Self :: Target {
1824
+ & self . coordinator
1825
+ }
1826
+ }
1827
+
1828
+ impl < T > ops:: DerefMut for RestrictedCoordinator < T > {
1829
+ fn deref_mut ( & mut self ) -> & mut Self :: Target {
1830
+ & mut self . coordinator
1831
+ }
1832
+ }
1833
+
1834
+ async fn new_coordinator_test ( ) -> RestrictedCoordinator < impl Backend > {
1835
+ RestrictedCoordinator :: with ( || Coordinator :: new ( TestBackend :: new ( ) ) ) . await
1836
+ }
1837
+
1838
+ async fn new_coordinator_docker ( ) -> RestrictedCoordinator < impl Backend > {
1839
+ RestrictedCoordinator :: with ( || Coordinator :: new_docker ( ) ) . await
1840
+ }
1841
+
1842
+ async fn new_coordinator ( ) -> RestrictedCoordinator < impl Backend > {
1778
1843
#[ cfg( not( force_docker) ) ]
1779
1844
{
1780
- Coordinator :: new ( TestBackend :: new ( ) ) . await
1845
+ new_coordinator_test ( ) . await
1781
1846
}
1782
1847
1783
1848
#[ cfg( force_docker) ]
1784
1849
{
1785
- Coordinator :: new_docker ( ) . await
1850
+ new_coordinator_docker ( ) . await
1786
1851
}
1787
1852
}
1788
1853
@@ -2472,7 +2537,7 @@ mod tests {
2472
2537
#[ snafu:: report]
2473
2538
async fn compile_wasm ( ) -> Result < ( ) > {
2474
2539
// cargo-wasm only exists inside the container
2475
- let coordinator = Coordinator :: new_docker ( ) . await ;
2540
+ let coordinator = new_coordinator_docker ( ) . await ;
2476
2541
2477
2542
let req = CompileRequest {
2478
2543
target : CompileTarget :: Wasm ,
@@ -2703,7 +2768,7 @@ mod tests {
2703
2768
#[ snafu:: report]
2704
2769
async fn network_connections_are_disabled ( ) -> Result < ( ) > {
2705
2770
// The limits are only applied to the container
2706
- let coordinator = Coordinator :: new_docker ( ) . await ;
2771
+ let coordinator = new_coordinator_docker ( ) . await ;
2707
2772
2708
2773
let req = ExecuteRequest {
2709
2774
code : r#"
@@ -2729,7 +2794,7 @@ mod tests {
2729
2794
#[ snafu:: report]
2730
2795
async fn memory_usage_is_limited ( ) -> Result < ( ) > {
2731
2796
// The limits are only applied to the container
2732
- let coordinator = Coordinator :: new_docker ( ) . await ;
2797
+ let coordinator = new_coordinator_docker ( ) . await ;
2733
2798
2734
2799
let req = ExecuteRequest {
2735
2800
code : r#"
@@ -2756,7 +2821,7 @@ mod tests {
2756
2821
#[ snafu:: report]
2757
2822
async fn number_of_pids_is_limited ( ) -> Result < ( ) > {
2758
2823
// The limits are only applied to the container
2759
- let coordinator = Coordinator :: new_docker ( ) . await ;
2824
+ let coordinator = new_coordinator_docker ( ) . await ;
2760
2825
2761
2826
let req = ExecuteRequest {
2762
2827
code : r##"
0 commit comments