1
1
#![ allow( dead_code) ]
2
2
3
- mod sync_transport;
4
- use sync_transport:: SyncTransport ;
5
-
6
3
use bytes:: { Bytes , BytesMut } ;
7
4
use log:: error;
8
5
use retty:: channel:: { InboundPipeline , Pipeline } ;
@@ -14,7 +11,7 @@ use sfu::{
14
11
SrtpHandler , StunHandler ,
15
12
} ;
16
13
use std:: cell:: RefCell ;
17
- use std:: collections:: { HashMap , VecDeque } ;
14
+ use std:: collections:: HashMap ;
18
15
use std:: io:: { Error , ErrorKind , Read } ;
19
16
use std:: net:: { SocketAddr , UdpSocket } ;
20
17
use std:: rc:: Rc ;
@@ -106,13 +103,7 @@ pub fn sync_run(
106
103
107
104
println ! ( "listening {}..." , socket. local_addr( ) ?) ;
108
105
109
- let outgoing_queue = Rc :: new ( RefCell :: new ( VecDeque :: new ( ) ) ) ;
110
-
111
- let pipeline = build_pipeline (
112
- socket. local_addr ( ) ?,
113
- outgoing_queue. clone ( ) ,
114
- server_states. clone ( ) ,
115
- ) ;
106
+ let pipeline = build_pipeline ( socket. local_addr ( ) ?, server_states. clone ( ) ) ;
116
107
117
108
let mut buf = vec ! [ 0 ; 2000 ] ;
118
109
@@ -126,7 +117,7 @@ pub fn sync_run(
126
117
}
127
118
} ;
128
119
129
- write_socket_output ( & socket, & outgoing_queue ) ;
120
+ write_socket_output ( & socket, & pipeline ) ;
130
121
131
122
// Spawn new incoming signal message from the signaling server thread.
132
123
if let Ok ( signal_message) = rx. try_recv ( ) {
@@ -167,9 +158,11 @@ pub fn sync_run(
167
158
Ok ( ( ) )
168
159
}
169
160
170
- fn write_socket_output ( socket : & UdpSocket , outgoing_queue : & Rc < RefCell < VecDeque < TaggedBytesMut > > > ) {
171
- let mut queue = outgoing_queue. borrow_mut ( ) ;
172
- while let Some ( transmit) = queue. pop_front ( ) {
161
+ fn write_socket_output (
162
+ socket : & UdpSocket ,
163
+ pipeline : & Rc < Pipeline < TaggedBytesMut , TaggedBytesMut > > ,
164
+ ) {
165
+ while let Some ( transmit) = pipeline. poll_transmit ( ) {
173
166
socket
174
167
. send_to ( & transmit. message , transmit. transport . peer_addr )
175
168
. expect ( "sending UDP data" ) ;
@@ -200,14 +193,11 @@ fn read_socket_input(socket: &UdpSocket, buf: &mut [u8]) -> Option<TaggedBytesMu
200
193
201
194
fn build_pipeline (
202
195
local_addr : SocketAddr ,
203
- writer : Rc < RefCell < VecDeque < TaggedBytesMut > > > ,
204
196
server_states : Rc < RefCell < ServerStates > > ,
205
197
) -> Rc < Pipeline < TaggedBytesMut , TaggedBytesMut > > {
206
198
let pipeline: Pipeline < TaggedBytesMut , TaggedBytesMut > = Pipeline :: new ( ) ;
207
199
208
- let sync_transport_handler = SyncTransport :: new ( writer) ;
209
200
let demuxer_handler = DemuxerHandler :: new ( ) ;
210
- let write_exception_handler = ExceptionHandler :: new ( ) ;
211
201
let stun_handler = StunHandler :: new ( ) ;
212
202
// DTLS
213
203
let dtls_handler = DtlsHandler :: new ( local_addr, Rc :: clone ( & server_states) ) ;
@@ -218,11 +208,9 @@ fn build_pipeline(
218
208
let interceptor_handler = InterceptorHandler :: new ( Rc :: clone ( & server_states) ) ;
219
209
// Gateway
220
210
let gateway_handler = GatewayHandler :: new ( Rc :: clone ( & server_states) ) ;
221
- let read_exception_handler = ExceptionHandler :: new ( ) ;
211
+ let exception_handler = ExceptionHandler :: new ( ) ;
222
212
223
- pipeline. add_back ( sync_transport_handler) ;
224
213
pipeline. add_back ( demuxer_handler) ;
225
- pipeline. add_back ( write_exception_handler) ;
226
214
pipeline. add_back ( stun_handler) ;
227
215
// DTLS
228
216
pipeline. add_back ( dtls_handler) ;
@@ -233,7 +221,7 @@ fn build_pipeline(
233
221
pipeline. add_back ( interceptor_handler) ;
234
222
// Gateway
235
223
pipeline. add_back ( gateway_handler) ;
236
- pipeline. add_back ( read_exception_handler ) ;
224
+ pipeline. add_back ( exception_handler ) ;
237
225
238
226
pipeline. finalize ( )
239
227
}
0 commit comments