@@ -249,69 +249,105 @@ where
249
249
}
250
250
}
251
251
252
- /// Type representing an agent listening for incoming connections.
253
- #[ async_trait]
254
- pub trait Agent : ' static + Sync + Send + Sized {
255
- /// Create new session object when a new socket is accepted.
256
- fn new_session < S > ( & mut self , socket : & S :: Stream ) -> impl Session
257
- where
258
- S : ListeningSocket + fmt:: Debug + Send ;
259
-
260
- /// Listen on a socket waiting for client connections.
261
- async fn listen < S > ( mut self , mut socket : S ) -> Result < ( ) , AgentError >
262
- where
263
- S : ListeningSocket + fmt:: Debug + Send ,
264
- {
265
- log:: info!( "Listening; socket = {:?}" , socket) ;
266
- loop {
267
- match socket. accept ( ) . await {
268
- Ok ( socket) => {
269
- let session = self . new_session :: < S > ( & socket) ;
270
- tokio:: spawn ( async move {
271
- let adapter = Framed :: new ( socket, Codec :: < Request , Response > :: default ( ) ) ;
272
- if let Err ( e) = handle_socket :: < S > ( session, adapter) . await {
273
- log:: error!( "Agent protocol error: {:?}" , e) ;
274
- }
275
- } ) ;
276
- }
277
- Err ( e) => {
278
- log:: error!( "Failed to accept socket: {:?}" , e) ;
279
- return Err ( AgentError :: IO ( e) ) ;
280
- }
281
- }
282
- }
283
- }
252
+ /// Factory of sessions for the given type of sockets.
253
+ pub trait Agent < S > : ' static + Send + Sync
254
+ where
255
+ S : ListeningSocket + fmt:: Debug + Send ,
256
+ {
257
+ /// Create a [`Session`] object for a given `socket`.
258
+ fn new_session ( & mut self , socket : & S :: Stream ) -> impl Session ;
259
+ }
284
260
285
- /// Bind to a service binding listener.
286
- async fn bind ( mut self , listener : service_binding:: Listener ) -> Result < ( ) , AgentError > {
287
- match listener {
288
- #[ cfg( unix) ]
289
- service_binding:: Listener :: Unix ( listener) => {
290
- self . listen ( UnixListener :: from_std ( listener) ?) . await
291
- }
292
- service_binding:: Listener :: Tcp ( listener) => {
293
- self . listen ( TcpListener :: from_std ( listener) ?) . await
261
+ /// Listen for connections on a given socket and use session factory
262
+ /// to create new session for each accepted socket.
263
+ pub async fn listen < S > ( mut socket : S , mut sf : impl Agent < S > ) -> Result < ( ) , AgentError >
264
+ where
265
+ S : ListeningSocket + fmt:: Debug + Send ,
266
+ {
267
+ log:: info!( "Listening; socket = {:?}" , socket) ;
268
+ loop {
269
+ match socket. accept ( ) . await {
270
+ Ok ( socket) => {
271
+ let session = sf. new_session ( & socket) ;
272
+ tokio:: spawn ( async move {
273
+ let adapter = Framed :: new ( socket, Codec :: < Request , Response > :: default ( ) ) ;
274
+ if let Err ( e) = handle_socket :: < S > ( session, adapter) . await {
275
+ log:: error!( "Agent protocol error: {:?}" , e) ;
276
+ }
277
+ } ) ;
294
278
}
295
- # [ cfg ( windows ) ]
296
- service_binding :: Listener :: NamedPipe ( pipe ) => {
297
- self . listen ( NamedPipeListener :: bind ( pipe ) ? ) . await
279
+ Err ( e ) => {
280
+ log :: error! ( "Failed to accept socket: {:?}" , e ) ;
281
+ return Err ( AgentError :: IO ( e ) ) ;
298
282
}
299
- #[ cfg( not( windows) ) ]
300
- service_binding:: Listener :: NamedPipe ( _) => Err ( AgentError :: IO ( std:: io:: Error :: other (
301
- "Named pipes supported on Windows only" ,
302
- ) ) ) ,
303
283
}
304
284
}
305
285
}
306
286
307
- impl < T > Agent for T
287
+ #[ cfg( unix) ]
288
+ impl < T > Agent < tokio:: net:: UnixListener > for T
289
+ where
290
+ T : Default + Send + Sync + Session ,
291
+ {
292
+ fn new_session ( & mut self , _socket : & tokio:: net:: UnixStream ) -> impl Session {
293
+ Self :: default ( )
294
+ }
295
+ }
296
+
297
+ impl < T > Agent < tokio:: net:: TcpListener > for T
308
298
where
309
- T : Default + Session ,
299
+ T : Default + Send + Sync + Session ,
310
300
{
311
- fn new_session < S > ( & mut self , _socket : & S :: Stream ) -> impl Session
312
- where
313
- S : ListeningSocket + fmt:: Debug + Send ,
314
- {
301
+ fn new_session ( & mut self , _socket : & tokio:: net:: TcpStream ) -> impl Session {
315
302
Self :: default ( )
316
303
}
317
304
}
305
+
306
+ #[ cfg( windows) ]
307
+ impl < T > Agent < NamedPipeListener > for T
308
+ where
309
+ T : Default + Send + Sync + Session ,
310
+ {
311
+ fn new_session (
312
+ & mut self ,
313
+ _socket : & tokio:: net:: windows:: named_pipe:: NamedPipeServer ,
314
+ ) -> impl Session {
315
+ Self :: default ( )
316
+ }
317
+ }
318
+
319
+ /// Bind to a service binding listener.
320
+ #[ cfg( unix) ]
321
+ pub async fn bind < SF > ( listener : service_binding:: Listener , sf : SF ) -> Result < ( ) , AgentError >
322
+ where
323
+ SF : Agent < tokio:: net:: UnixListener > + Agent < tokio:: net:: TcpListener > ,
324
+ {
325
+ match listener {
326
+ #[ cfg( unix) ]
327
+ service_binding:: Listener :: Unix ( listener) => {
328
+ listen ( UnixListener :: from_std ( listener) ?, sf) . await
329
+ }
330
+ service_binding:: Listener :: Tcp ( listener) => {
331
+ listen ( TcpListener :: from_std ( listener) ?, sf) . await
332
+ }
333
+ _ => Err ( AgentError :: IO ( std:: io:: Error :: other (
334
+ "Unsupported type of a listener." ,
335
+ ) ) ) ,
336
+ }
337
+ }
338
+
339
+ /// Bind to a service binding listener.
340
+ #[ cfg( windows) ]
341
+ pub async fn bind < SF > ( listener : service_binding:: Listener , sf : SF ) -> Result < ( ) , AgentError >
342
+ where
343
+ SF : Agent < NamedPipeListener > + Agent < tokio:: net:: TcpListener > ,
344
+ {
345
+ match listener {
346
+ service_binding:: Listener :: Tcp ( listener) => {
347
+ listen ( TcpListener :: from_std ( listener) ?, sf) . await
348
+ }
349
+ service_binding:: Listener :: NamedPipe ( pipe) => {
350
+ listen ( NamedPipeListener :: bind ( pipe) ?, sf) . await
351
+ }
352
+ }
353
+ }
0 commit comments