17
17
18
18
#[ cfg( unix) ]
19
19
use std:: os:: unix:: io:: { AsRawFd , FromRawFd , RawFd } ;
20
+ use std:: time:: Duration ;
20
21
21
22
use protobuf:: { CodedInputStream , Message } ;
22
23
use std:: collections:: HashMap ;
@@ -40,6 +41,7 @@ use crate::{MethodHandler, TtrpcContext};
40
41
const DEFAULT_WAIT_THREAD_COUNT_DEFAULT : usize = 3 ;
41
42
const DEFAULT_WAIT_THREAD_COUNT_MIN : usize = 1 ;
42
43
const DEFAULT_WAIT_THREAD_COUNT_MAX : usize = 5 ;
44
+ const DEFAULT_ACCEPT_RETRY_INTERVAL : Duration = Duration :: from_secs ( 10 ) ;
43
45
44
46
type MessageSender = Sender < ( MessageHeader , Vec < u8 > ) > ;
45
47
type MessageReceiver = Receiver < ( MessageHeader , Vec < u8 > ) > ;
@@ -57,6 +59,7 @@ pub struct Server {
57
59
thread_count_default : usize ,
58
60
thread_count_min : usize ,
59
61
thread_count_max : usize ,
62
+ accept_retry_interval : Duration ,
60
63
}
61
64
62
65
struct Connection {
@@ -244,6 +247,7 @@ impl Default for Server {
244
247
thread_count_default : DEFAULT_WAIT_THREAD_COUNT_DEFAULT ,
245
248
thread_count_min : DEFAULT_WAIT_THREAD_COUNT_MIN ,
246
249
thread_count_max : DEFAULT_WAIT_THREAD_COUNT_MAX ,
250
+ accept_retry_interval : DEFAULT_ACCEPT_RETRY_INTERVAL ,
247
251
}
248
252
}
249
253
}
@@ -305,6 +309,11 @@ impl Server {
305
309
self
306
310
}
307
311
312
+ pub fn set_accept_retry_interval ( mut self , interval : Duration ) -> Server {
313
+ self . accept_retry_interval = interval;
314
+ self
315
+ }
316
+
308
317
pub fn start_listen ( & mut self ) -> Result < ( ) > {
309
318
let connections = self . connections . clone ( ) ;
310
319
@@ -320,6 +329,7 @@ impl Server {
320
329
let min = self . thread_count_min ;
321
330
let max = self . thread_count_max ;
322
331
let listener_quit_flag = self . listener_quit_flag . clone ( ) ;
332
+ let accept_retry_interval = self . accept_retry_interval ;
323
333
324
334
let reaper_tx = match self . reaper . take ( ) {
325
335
None => {
@@ -373,6 +383,14 @@ impl Server {
373
383
}
374
384
Err ( e) => {
375
385
error ! ( "listener accept got {:?}" , e) ;
386
+
387
+ // Resource limit errors can't be recoverd in short time
388
+ // and the poll(2) is level-triggered, an uncorrected error can lead to an infinite loop,
389
+ // so we sleep for a while and wait for the error to be corrected.
390
+ if is_resource_limit_error ( e) {
391
+ thread:: sleep ( accept_retry_interval) ;
392
+ }
393
+
376
394
continue ;
377
395
}
378
396
} ;
@@ -597,3 +615,11 @@ fn quit_connection(quit: Arc<AtomicBool>, control_tx: SyncSender<()>) {
597
615
. send ( ( ) )
598
616
. unwrap_or_else ( |err| debug ! ( "Failed to send {:?}" , err) ) ;
599
617
}
618
+
619
+ fn is_resource_limit_error ( e : std:: io:: Error ) -> bool {
620
+ if let Some ( err) = e. raw_os_error ( ) {
621
+ return [ libc:: EMFILE , libc:: ENFILE , libc:: ENOBUFS , libc:: ENOMEM ] . contains ( & err) ;
622
+ }
623
+
624
+ false
625
+ }
0 commit comments