Skip to content

Commit 771dbbb

Browse files
authored
Merge pull request containerd#68 from lifupan/fix_select
sync: change from "select" to poll
2 parents a8cb8c3 + 5836232 commit 771dbbb

File tree

2 files changed

+72
-37
lines changed

2 files changed

+72
-37
lines changed

src/sync/client.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414

1515
//! Sync client of ttrpc.
1616
17-
use nix::sys::select::*;
1817
use nix::sys::socket::*;
1918
use nix::unistd::close;
2019
use protobuf::{CodedInputStream, CodedOutputStream, Message};
2120
use std::collections::HashMap;
2221
use std::os::unix::io::RawFd;
2322
use std::sync::mpsc;
2423
use std::sync::{Arc, Mutex};
25-
use std::thread;
24+
use std::{io, thread};
2625

2726
use crate::common::{MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE};
2827
use crate::error::{Error, Result};
@@ -92,21 +91,46 @@ impl Client {
9291

9392
//Recver
9493
thread::spawn(move || {
95-
let bigfd = {
96-
if fd > recver_fd {
97-
fd + 1
98-
} else {
99-
recver_fd + 1
100-
}
101-
};
94+
let mut pollers = vec![
95+
libc::pollfd {
96+
fd: recver_fd,
97+
events: libc::POLLIN,
98+
revents: 0,
99+
},
100+
libc::pollfd {
101+
fd,
102+
events: libc::POLLIN,
103+
revents: 0,
104+
},
105+
];
106+
102107
loop {
103-
let mut rs = FdSet::new();
104-
rs.insert(recver_fd);
105-
rs.insert(fd);
106-
select(bigfd, Some(&mut rs), None, None, None).unwrap();
107-
if rs.contains(recver_fd) {
108+
let returned = unsafe {
109+
let pollers: &mut [libc::pollfd] = &mut pollers;
110+
libc::poll(
111+
pollers as *mut _ as *mut libc::pollfd,
112+
pollers.len() as u64,
113+
-1,
114+
)
115+
};
116+
117+
if returned == -1 {
118+
let err = io::Error::last_os_error();
119+
if err.raw_os_error() == Some(libc::EINTR) {
120+
continue;
121+
}
122+
123+
error!("fatal error in process reaper:{}", err);
108124
break;
109-
} else if !rs.contains(fd) {
125+
} else if returned < 1 {
126+
continue;
127+
}
128+
129+
if pollers[0].revents != 0 {
130+
break;
131+
}
132+
133+
if pollers[pollers.len() - 1].revents == 0 {
110134
continue;
111135
}
112136

src/sync/server.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
//! Sync server of ttrpc.
1616
1717
use nix::fcntl::OFlag;
18-
use nix::sys::select::{select, FdSet};
1918
use nix::sys::socket::{self, *};
2019
use nix::unistd::close;
2120
use nix::unistd::pipe2;
@@ -25,8 +24,8 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
2524
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2625
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
2726
use std::sync::{Arc, Mutex};
28-
use std::thread;
2927
use std::thread::JoinHandle;
28+
use std::{io, thread};
3029

3130
use crate::common::{self, MESSAGE_TYPE_REQUEST};
3231
use crate::error::{get_status, Error, Result};
@@ -368,35 +367,47 @@ impl Server {
368367
let handler = thread::Builder::new()
369368
.name("listener_loop".into())
370369
.spawn(move || {
370+
let mut pollers = vec![
371+
libc::pollfd {
372+
fd: monitor_fd,
373+
events: libc::POLLIN,
374+
revents: 0,
375+
},
376+
libc::pollfd {
377+
fd: listener,
378+
events: libc::POLLIN,
379+
revents: 0,
380+
},
381+
];
382+
371383
loop {
372384
if listener_quit_flag.load(Ordering::SeqCst) {
373385
info!("listener shutdown for quit flag");
374386
break;
375387
}
376388

377-
let mut fd_set = FdSet::new();
378-
fd_set.insert(listener);
379-
fd_set.insert(monitor_fd);
380-
381-
match select(
382-
Some(fd_set.highest().unwrap() + 1),
383-
&mut fd_set,
384-
None,
385-
None,
386-
None,
387-
) {
388-
Ok(_) => (),
389-
Err(e) => {
390-
if e == nix::Error::from(nix::errno::Errno::EINTR) {
391-
continue;
392-
} else {
393-
error!("failed to select error {:?}", e);
394-
break;
395-
}
389+
let returned = unsafe {
390+
let pollers: &mut [libc::pollfd] = &mut pollers;
391+
libc::poll(
392+
pollers as *mut _ as *mut libc::pollfd,
393+
pollers.len() as u64,
394+
-1,
395+
)
396+
};
397+
398+
if returned == -1 {
399+
let err = io::Error::last_os_error();
400+
if err.raw_os_error() == Some(libc::EINTR) {
401+
continue;
396402
}
403+
404+
error!("fatal error in listener_loop:{:?}", err);
405+
break;
406+
} else if returned < 1 {
407+
continue;
397408
}
398409

399-
if fd_set.contains(monitor_fd) || !fd_set.contains(listener) {
410+
if pollers[0].revents != 0 || pollers[pollers.len() - 1].revents == 0 {
400411
continue;
401412
}
402413

0 commit comments

Comments
 (0)