|
2 | 2 |
|
3 | 3 | use std::collections::HashMap;
|
4 | 4 | use std::io::{Cursor, Read};
|
| 5 | +use std::sync::mpsc::TryRecvError; |
5 | 6 | use std::time::Duration;
|
6 | 7 |
|
7 | 8 | use curl::easy::{Easy, List};
|
@@ -95,8 +96,7 @@ fn upload_lots() {
|
95 | 96 | }
|
96 | 97 |
|
97 | 98 | let mut m = Multi::new();
|
98 |
| - let poll = t!(mio::Poll::new()); |
99 |
| - let (tx, rx) = mio_extras::channel::channel(); |
| 99 | + let (tx, rx) = std::sync::mpsc::channel(); |
100 | 100 | let tx2 = tx.clone();
|
101 | 101 | t!(m.socket_function(move |socket, events, token| {
|
102 | 102 | t!(tx2.send(Message::Wait(socket, events, token)));
|
@@ -136,84 +136,55 @@ fn upload_lots() {
|
136 | 136 | t!(h.upload(true));
|
137 | 137 | t!(h.http_headers(list));
|
138 | 138 |
|
139 |
| - t!(poll.register(&rx, mio::Token(0), mio::Ready::all(), mio::PollOpt::level())); |
140 |
| - |
141 | 139 | let e = t!(m.add(h));
|
142 | 140 |
|
143 | 141 | let mut next_token = 1;
|
144 | 142 | let mut token_map = HashMap::new();
|
145 | 143 | let mut cur_timeout = None;
|
146 |
| - let mut events = mio::Events::with_capacity(128); |
147 |
| - let mut running = true; |
148 |
| - |
149 |
| - while running { |
150 |
| - let n = t!(poll.poll(&mut events, cur_timeout)); |
151 | 144 |
|
152 |
| - if n == 0 && t!(m.timeout()) == 0 { |
153 |
| - running = false; |
| 145 | + loop { |
| 146 | + match cur_timeout { |
| 147 | + Some(cur_timeout) => std::thread::sleep(cur_timeout), |
| 148 | + None => {} |
154 | 149 | }
|
155 | 150 |
|
156 |
| - for event in events.iter() { |
157 |
| - while event.token() == mio::Token(0) { |
158 |
| - match rx.try_recv() { |
159 |
| - Ok(Message::Timeout(dur)) => cur_timeout = dur, |
160 |
| - Ok(Message::Wait(socket, events, token)) => { |
161 |
| - let evented = mio::unix::EventedFd(&socket); |
162 |
| - if events.remove() { |
163 |
| - token_map.remove(&token).unwrap(); |
164 |
| - } else { |
165 |
| - let mut e = mio::Ready::empty(); |
166 |
| - if events.input() { |
167 |
| - e |= mio::Ready::readable(); |
168 |
| - } |
169 |
| - if events.output() { |
170 |
| - e |= mio::Ready::writable(); |
171 |
| - } |
172 |
| - if token == 0 { |
173 |
| - let token = next_token; |
174 |
| - next_token += 1; |
175 |
| - t!(m.assign(socket, token)); |
176 |
| - token_map.insert(token, socket); |
177 |
| - t!(poll.register( |
178 |
| - &evented, |
179 |
| - mio::Token(token), |
180 |
| - e, |
181 |
| - mio::PollOpt::level() |
182 |
| - )); |
183 |
| - } else { |
184 |
| - t!(poll.reregister( |
185 |
| - &evented, |
186 |
| - mio::Token(token), |
187 |
| - e, |
188 |
| - mio::PollOpt::level() |
189 |
| - )); |
190 |
| - } |
191 |
| - } |
| 151 | + t!(m.timeout()); |
| 152 | + |
| 153 | + let message = rx.try_recv(); |
| 154 | + |
| 155 | + match message { |
| 156 | + Ok(Message::Timeout(dur)) => cur_timeout = dur, |
| 157 | + Ok(Message::Wait(socket, events, token)) => { |
| 158 | + if events.remove() { |
| 159 | + token_map.remove(&token); |
| 160 | + } else { |
| 161 | + if token == 0 { |
| 162 | + let token = next_token; |
| 163 | + next_token += 1; |
| 164 | + t!(m.assign(socket, token)); |
| 165 | + token_map.insert(token, socket); |
192 | 166 | }
|
193 |
| - Err(_) => break, |
194 | 167 | }
|
195 |
| - } |
196 | 168 |
|
197 |
| - if event.token() == mio::Token(0) { |
198 |
| - continue; |
199 |
| - } |
| 169 | + let mut e = Events::new(); |
| 170 | + if events.input() { |
| 171 | + e.input(true); |
| 172 | + } |
| 173 | + if events.output() { |
| 174 | + e.output(true); |
| 175 | + } |
| 176 | + let remaining = t!(m.action(socket, &e)); |
200 | 177 |
|
201 |
| - let token = event.token(); |
202 |
| - let socket = token_map[&token.into()]; |
203 |
| - let mut e = Events::new(); |
204 |
| - if event.readiness().is_readable() { |
205 |
| - e.input(true); |
206 |
| - } |
207 |
| - if event.readiness().is_writable() { |
208 |
| - e.output(true); |
209 |
| - } |
210 |
| - if mio::unix::UnixReady::from(event.readiness()).is_error() { |
211 |
| - e.error(true); |
| 178 | + if remaining == 0 { |
| 179 | + break; |
| 180 | + } |
212 | 181 | }
|
213 |
| - let remaining = t!(m.action(socket, &e)); |
214 |
| - if remaining == 0 { |
215 |
| - running = false; |
| 182 | + Err(TryRecvError::Empty) => { |
| 183 | + for socket in token_map.values().copied() { |
| 184 | + t!(m.action(socket, &Events::new())); |
| 185 | + } |
216 | 186 | }
|
| 187 | + Err(TryRecvError::Disconnected) => break, |
217 | 188 | }
|
218 | 189 | }
|
219 | 190 |
|
|
0 commit comments