Skip to content

Commit a7de991

Browse files
authored
feat(server): Add serve api to server (#2152)
1 parent ca9dbde commit a7de991

File tree

1 file changed

+94
-23
lines changed
  • tonic/src/transport/server

1 file changed

+94
-23
lines changed

tonic/src/transport/server/mod.rs

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,96 @@ impl<L> Server<L> {
528528
}
529529
}
530530

531-
pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>(
531+
fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
532+
Ok(TcpIncoming::bind(addr)
533+
.map_err(super::Error::from_source)?
534+
.with_nodelay(Some(self.tcp_nodelay))
535+
.with_keepalive(self.tcp_keepalive))
536+
}
537+
538+
/// Serve the service.
539+
pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
540+
where
541+
L: Layer<S>,
542+
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
543+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
544+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
545+
Into<crate::BoxError> + Send + 'static,
546+
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
547+
ResBody::Error: Into<crate::BoxError>,
548+
{
549+
let incoming = self.bind_incoming(addr)?;
550+
self.serve_with_incoming(svc, incoming).await
551+
}
552+
553+
/// Serve the service with the shutdown signal.
554+
pub async fn serve_with_shutdown<S, F, ResBody>(
555+
self,
556+
addr: SocketAddr,
557+
svc: S,
558+
signal: F,
559+
) -> Result<(), super::Error>
560+
where
561+
L: Layer<S>,
562+
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
563+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
564+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
565+
Into<crate::BoxError> + Send + 'static,
566+
F: Future<Output = ()>,
567+
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
568+
ResBody::Error: Into<crate::BoxError>,
569+
{
570+
let incoming = self.bind_incoming(addr)?;
571+
self.serve_with_incoming_shutdown(svc, incoming, signal)
572+
.await
573+
}
574+
575+
/// Serve the service on the provided incoming stream.
576+
pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
577+
self,
578+
svc: S,
579+
incoming: I,
580+
) -> Result<(), super::Error>
581+
where
582+
L: Layer<S>,
583+
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
584+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
585+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
586+
Into<crate::BoxError> + Send + 'static,
587+
I: Stream<Item = Result<IO, IE>>,
588+
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
589+
IE: Into<crate::BoxError>,
590+
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
591+
ResBody::Error: Into<crate::BoxError>,
592+
{
593+
self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
594+
.await
595+
}
596+
597+
/// Serve the service with the signal on the provided incoming stream.
598+
pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
599+
self,
600+
svc: S,
601+
incoming: I,
602+
signal: F,
603+
) -> Result<(), super::Error>
604+
where
605+
L: Layer<S>,
606+
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
607+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
608+
<<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
609+
Into<crate::BoxError> + Send + 'static,
610+
I: Stream<Item = Result<IO, IE>>,
611+
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
612+
IE: Into<crate::BoxError>,
613+
F: Future<Output = ()>,
614+
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
615+
ResBody::Error: Into<crate::BoxError>,
616+
{
617+
self.serve_internal(svc, incoming, Some(signal)).await
618+
}
619+
620+
async fn serve_internal<S, I, F, IO, IE, ResBody>(
532621
self,
533622
svc: S,
534623
incoming: I,
@@ -786,17 +875,7 @@ impl<L> Router<L> {
786875
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
787876
ResBody::Error: Into<crate::BoxError>,
788877
{
789-
let incoming = TcpIncoming::bind(addr)
790-
.map_err(super::Error::from_source)?
791-
.with_nodelay(Some(self.server.tcp_nodelay))
792-
.with_keepalive(self.server.tcp_keepalive);
793-
self.server
794-
.serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
795-
self.routes.prepare(),
796-
incoming,
797-
None,
798-
)
799-
.await
878+
self.server.serve(addr, self.routes.prepare()).await
800879
}
801880

802881
/// Consume this [`Server`] creating a future that will execute the server
@@ -819,12 +898,8 @@ impl<L> Router<L> {
819898
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
820899
ResBody::Error: Into<crate::BoxError>,
821900
{
822-
let incoming = TcpIncoming::bind(addr)
823-
.map_err(super::Error::from_source)?
824-
.with_nodelay(Some(self.server.tcp_nodelay))
825-
.with_keepalive(self.server.tcp_keepalive);
826901
self.server
827-
.serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
902+
.serve_with_shutdown(addr, self.routes.prepare(), signal)
828903
.await
829904
}
830905

@@ -852,11 +927,7 @@ impl<L> Router<L> {
852927
ResBody::Error: Into<crate::BoxError>,
853928
{
854929
self.server
855-
.serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
856-
self.routes.prepare(),
857-
incoming,
858-
None,
859-
)
930+
.serve_with_incoming(self.routes.prepare(), incoming)
860931
.await
861932
}
862933

@@ -887,7 +958,7 @@ impl<L> Router<L> {
887958
ResBody::Error: Into<crate::BoxError>,
888959
{
889960
self.server
890-
.serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
961+
.serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
891962
.await
892963
}
893964
}

0 commit comments

Comments
 (0)