Skip to content

Commit e3bf823

Browse files
authored
Merge pull request #678 from swimos/deps
Upgrades hyper ecosystem
2 parents 57e843f + b2c9c8f commit e3bf823

File tree

11 files changed

+251
-169
lines changed

11 files changed

+251
-169
lines changed

Cargo.toml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ tempdir = "0.3.7"
115115
slab = "0.4"
116116
smallvec = "1.10"
117117
uuid = "1.2"
118-
http = "0.2"
118+
http = "1.1.0"
119119
nom = "7.1"
120120
nom_locate = "4.0"
121121
tracing = "0.1"
@@ -132,8 +132,8 @@ proc-macro2 = "1.0"
132132
syn = "1.0"
133133
quote = "1.0.3"
134134
num-bigint = "0.4"
135-
ratchet = { package = "ratchet_rs", version = "0.4" }
136-
ratchet_fixture = "0.4"
135+
ratchet = { package = "ratchet_rs", version = "1.0" }
136+
ratchet_fixture = "1.0"
137137
flate2 = "1.0.22"
138138
bitflags = "2.5"
139139
rocksdb = "0.22"
@@ -147,7 +147,7 @@ trust-dns-resolver = "0.23.2"
147147
clap = "4.1"
148148
crossbeam-queue = { version = "0.3" }
149149
crossbeam-channel = { version = "0.5" }
150-
hyper = "0.14"
150+
hyper = "1.3.1"
151151
percent-encoding = "2.1.0"
152152
mime = "0.3"
153153
serde_json = "1.0"
@@ -166,9 +166,11 @@ quick-xml = "0.34.0"
166166
csv = "1.2"
167167
serde-xml-rs = "0.6"
168168
axum = "0.7.5"
169-
hyper-staticfile = "0.9"
169+
hyper-staticfile = "0.10.0"
170170
httparse = "1.8"
171171
sha-1 = "0.10.1"
172172
waker-fn = "1.1.0"
173173
num = "0.4"
174174
smol_str = "0.2.0"
175+
http-body-util = "0.1.2"
176+
hyper-util = "0.1.5"

example_apps/tutorial_app/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
1010
example-util = { path = "../example_util" }
1111
rand = { workspace = true }
1212
tutorial-app-model = { path = "./model" }
13-
hyper = { workspace = true, features = ["server", "tcp", "http1"] }
13+
hyper = { workspace = true, features = ["server", "http1"] }
1414
hyper-staticfile = { workspace = true }
1515
http = { workspace = true }
16-
futures = { workspace = true }
16+
futures = { workspace = true }
17+
hyper-util = { workspace = true, features = ["server-graceful", "http1"] }

example_apps/tutorial_app/src/ui.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
use std::fmt::Display;
1616
use std::future::Future;
1717
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
18-
use std::pin::pin;
1918
use std::time::Duration;
2019

21-
use futures::future::Either;
22-
use hyper::service::{make_service_fn, service_fn};
23-
use hyper::Server;
24-
use std::convert::Infallible;
25-
use std::path::Path;
20+
use futures::pin_mut;
21+
use hyper::server::conn::http1::Builder;
22+
use hyper::service::service_fn;
23+
use hyper_util::rt::TokioIo;
24+
use hyper_util::server::graceful::GracefulShutdown;
25+
2626
use tokio::net::TcpListener;
27+
use tokio::select;
2728
use tokio::sync::oneshot;
2829

2930
#[derive(Clone, Copy, Debug)]
@@ -47,42 +48,42 @@ where
4748
addr.await?;
4849
let bind_to = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0));
4950
let listener = TcpListener::bind(bind_to).await?;
50-
let root = Path::new("static-files/");
51-
51+
let resolver = hyper_staticfile::Resolver::new("static-files/");
5252
let addr = listener.local_addr()?;
5353
println!("Web server bound to: {}", addr);
5454

55-
let make_svc = make_service_fn(move |_conn| async move {
56-
Ok::<_, Infallible>(service_fn(move |request| async move {
57-
let result = hyper_staticfile::resolve(&root, &request)
55+
let make_svc = service_fn(move |request| {
56+
let resolver = resolver.clone();
57+
async move {
58+
let result = resolver
59+
.resolve_request(&request)
5860
.await
5961
.expect("Failed to access files.");
6062
let response = hyper_staticfile::ResponseBuilder::new()
6163
.request(&request)
6264
.build(result)?;
6365

6466
Ok::<_, http::Error>(response)
65-
}))
67+
}
6668
});
6769

68-
let (stop_tx, stop_rx) = oneshot::channel();
69-
let server = Server::from_tcp(listener.into_std()?)?
70-
.serve(make_svc)
71-
.with_graceful_shutdown(async move {
72-
let _ = stop_rx.await;
73-
});
70+
let shutdown_handle = GracefulShutdown::new();
7471

75-
let shutdown = pin!(shutdown);
76-
let server = pin!(server);
72+
pin_mut!(shutdown, listener);
7773

78-
match futures::future::select(shutdown, server).await {
79-
Either::Left((_, server)) => {
80-
let _ = stop_tx.send(());
81-
tokio::time::timeout(Duration::from_secs(2), server)
82-
.await
83-
.map_err(|_| TimeoutError)??;
84-
}
85-
Either::Right((result, _)) => result?,
74+
loop {
75+
let (stream, _) = select! {
76+
biased;
77+
_ = &mut shutdown => {
78+
tokio::time::timeout(Duration::from_secs(2), shutdown_handle.shutdown())
79+
.await
80+
.map_err(|_| TimeoutError)?;
81+
return Ok(());
82+
}
83+
result = listener.accept() => result?,
84+
};
85+
86+
let conn = Builder::new().serve_connection(TokioIo::new(stream), make_svc.clone());
87+
tokio::spawn(shutdown_handle.watch(conn));
8688
}
87-
Ok(())
8889
}

runtime/swimos_http/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ sha-1 = { workspace = true }
1919
base64 = { workspace = true }
2020
thiserror = { workspace = true }
2121
tokio = { workspace = true }
22+
http-body-util = { workspace = true }
23+
hyper-util = { workspace = true, features = ["server-graceful", "http1", "server-auto"] }
2224

2325
[dev-dependencies]
2426
tokio = { workspace = true, features = ["rt", "macros", "time"] }
25-
hyper = { workspace = true, features = ["server", "tcp", "http1"] }
27+
hyper = { workspace = true, features = ["server", "http1"] }

runtime/swimos_http/src/websocket.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
1+
use std::{
2+
collections::HashSet,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
17
use base64::{engine::general_purpose::STANDARD, Engine};
28
use bytes::{Bytes, BytesMut};
39
use futures::{ready, Future, FutureExt};
410
use http::{header::HeaderName, HeaderMap, HeaderValue, Method};
11+
use http_body_util::Full;
512
use httparse::Header;
13+
use hyper::body::Incoming;
614
use hyper::{
715
upgrade::{OnUpgrade, Upgraded},
8-
Body, Request, Response,
16+
Request, Response,
917
};
18+
use hyper_util::rt::TokioIo;
1019
use ratchet::{
1120
Extension, ExtensionProvider, NegotiatedExtension, Role, WebSocket, WebSocketConfig,
12-
WebSocketStream,
1321
};
1422
use sha1::{Digest, Sha1};
15-
use std::{
16-
collections::HashSet,
17-
pin::Pin,
18-
task::{Context, Poll},
19-
};
2023
use thiserror::Error;
2124

2225
const UPGRADE_STR: &str = "Upgrade";
@@ -91,10 +94,12 @@ where
9194
}
9295

9396
/// Produce a bad request response for a bad websocket upgrade request.
94-
pub fn fail_upgrade<ExtErr: std::error::Error>(error: UpgradeError<ExtErr>) -> Response<Body> {
97+
pub fn fail_upgrade<ExtErr: std::error::Error>(
98+
error: UpgradeError<ExtErr>,
99+
) -> Response<Full<Bytes>> {
95100
Response::builder()
96101
.status(http::StatusCode::BAD_REQUEST)
97-
.body(Body::from(error.to_string()))
102+
.body(Full::from(error.to_string()))
98103
.expect(FAILED_RESPONSE)
99104
}
100105

@@ -107,13 +112,12 @@ pub fn fail_upgrade<ExtErr: std::error::Error>(error: UpgradeError<ExtErr>) -> R
107112
/// * `unwrap_fn` - Used to unwrap the underlying socket type from the opaque [`Upgraded`] socket
108113
/// provided by hyper.
109114
pub fn upgrade<Ext, U>(
110-
request: Request<Body>,
115+
request: Request<Incoming>,
111116
negotiated: Negotiated<'_, Ext>,
112117
config: Option<WebSocketConfig>,
113118
unwrap_fn: U,
114-
) -> (Response<Body>, UpgradeFuture<Ext, U>)
119+
) -> (Response<Full<Bytes>>, UpgradeFuture<Ext, U>)
115120
where
116-
U: SockUnwrap,
117121
Ext: Extension + Send,
118122
{
119123
let Negotiated {
@@ -149,7 +153,7 @@ where
149153
unwrap_fn,
150154
};
151155

152-
let response = builder.body(Body::empty()).expect(FAILED_RESPONSE);
156+
let response = builder.body(Full::default()).expect(FAILED_RESPONSE);
153157
(response, fut)
154158
}
155159

@@ -211,7 +215,7 @@ impl<ExtErr: std::error::Error> From<ExtErr> for UpgradeError<ExtErr> {
211215
/// The caller will generally know the real underlying type and this allows for that type to be
212216
/// restored.
213217
pub trait SockUnwrap {
214-
type Sock: WebSocketStream;
218+
type Sock;
215219

216220
/// Unwrap the socket (returning the underlying socket and a buffer containing any bytes
217221
/// that have already been read).
@@ -222,15 +226,15 @@ pub trait SockUnwrap {
222226
pub struct NoUnwrap;
223227

224228
impl SockUnwrap for NoUnwrap {
225-
type Sock = Upgraded;
229+
type Sock = TokioIo<Upgraded>;
226230

227231
fn unwrap_sock(&self, upgraded: Upgraded) -> (Self::Sock, BytesMut) {
228-
(upgraded, BytesMut::new())
232+
(TokioIo::new(upgraded), BytesMut::new())
229233
}
230234
}
231235

232236
/// A future that performs a websocket upgrade, unwraps the upgraded socket and
233-
/// creates a ratchet websocket from form it.
237+
/// creates a ratchet websocket from it.
234238
#[derive(Debug)]
235239
pub struct UpgradeFuture<Ext, U> {
236240
upgrade: OnUpgrade,

runtime/swimos_http/tests/wsserver.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use bytes::BytesMut;
16-
use hyper::service::{make_service_fn, service_fn};
17-
use hyper::{upgrade::Upgraded, Body, Request, Response, Server};
15+
use bytes::{Bytes, BytesMut};
16+
use hyper::service::service_fn;
17+
use hyper::{upgrade::Upgraded, Request, Response};
1818
use std::{
1919
error::Error,
20-
net::{Ipv4Addr, SocketAddr, TcpListener},
20+
net::{Ipv4Addr, SocketAddr},
2121
pin::pin,
2222
sync::Arc,
2323
time::Duration,
@@ -29,57 +29,63 @@ use futures::{
2929
future::{join, select, Either},
3030
Future,
3131
};
32+
use http_body_util::Full;
33+
use hyper::body::Incoming;
34+
use hyper_util::rt::{TokioExecutor, TokioIo};
35+
use hyper_util::server::conn::auto::Builder;
36+
use hyper_util::server::graceful::GracefulShutdown;
3237
use ratchet::{CloseCode, CloseReason, Message, NoExt, NoExtProvider, PayloadType, WebSocket};
3338
use thiserror::Error;
39+
use tokio::net::TcpListener;
3440
use tokio::{net::TcpSocket, sync::Notify};
3541

3642
async fn run_server(
3743
bound_to: oneshot::Sender<SocketAddr>,
3844
done: Arc<Notify>,
3945
) -> Result<(), Box<dyn Error>> {
40-
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?;
46+
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?;
4147
let bound = listener.local_addr()?;
4248
let _ = bound_to.send(bound);
4349

44-
let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(upgrade_server)) });
45-
46-
let shutdown = Arc::new(Notify::new());
47-
let shutdown_cpy = shutdown.clone();
48-
49-
let server = pin!(Server::from_tcp(listener)?
50-
.serve(service)
51-
.with_graceful_shutdown(async move {
52-
shutdown_cpy.notified().await;
53-
}));
50+
let (io, _) = listener.accept().await?;
51+
let builder = Builder::new(TokioExecutor::new());
52+
let connection =
53+
builder.serve_connection_with_upgrades(TokioIo::new(io), service_fn(upgrade_server));
54+
let shutdown = GracefulShutdown::new();
5455

56+
let server = pin!(shutdown.watch(connection));
5557
let stop = pin!(done.notified());
58+
5659
match select(server, stop).await {
57-
Either::Left((result, _)) => result?,
58-
Either::Right((_, server)) => {
59-
shutdown.notify_one();
60-
tokio::time::timeout(Duration::from_secs(2), server).await??;
60+
Either::Left((result, _)) => match result {
61+
Ok(()) => Ok(()),
62+
Err(e) => Err(e),
63+
},
64+
Either::Right((_, _server)) => {
65+
tokio::time::timeout(Duration::from_secs(2), shutdown.shutdown()).await?;
66+
Ok(())
6167
}
6268
}
63-
64-
Ok(())
6569
}
6670

67-
async fn upgrade_server(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
71+
async fn upgrade_server(
72+
request: Request<Incoming>,
73+
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
6874
let protocols = ["warp0"].into_iter().collect();
6975
match swimos_http::negotiate_upgrade(&request, &protocols, &NoExtProvider) {
7076
Ok(Some(negotiated)) => {
7177
let (response, upgraded) = swimos_http::upgrade(request, negotiated, None, NoUnwrap);
7278
tokio::spawn(run_websocket(upgraded));
7379
Ok(response)
7480
}
75-
Ok(None) => Response::builder().body(Body::from("Success")),
81+
Ok(None) => Response::builder().body(Full::from("Success")),
7682
Err(err) => Ok(swimos_http::fail_upgrade(err)),
7783
}
7884
}
7985

8086
async fn run_websocket<F>(upgrade_fut: F)
8187
where
82-
F: Future<Output = Result<WebSocket<Upgraded, NoExt>, hyper::Error>> + Send,
88+
F: Future<Output = Result<WebSocket<TokioIo<Upgraded>, NoExt>, hyper::Error>> + Send,
8389
{
8490
match upgrade_fut.await {
8591
Ok(mut websocket) => {

runtime/swimos_remote/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ either = { workspace = true }
3636
smallvec = { workspace = true }
3737
url = { workspace = true }
3838
pin-project = { workspace = true }
39+
hyper = { workspace = true }
3940

4041
rustls = { workspace = true, optional = true }
4142
webpki = { workspace = true, optional = true }

runtime/swimos_remote/src/net/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ where
103103
/// Provides all networking functionality required for a Warp client (DNS resolution and opening sockets).
104104
pub trait ClientConnections: Clone + Send + Sync + 'static {
105105
type ClientSocket: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static;
106+
106107
fn try_open(
107108
&self,
108109
scheme: Scheme,
@@ -111,6 +112,7 @@ pub trait ClientConnections: Clone + Send + Sync + 'static {
111112
) -> BoxFuture<'_, ConnectionResult<Self::ClientSocket>>;
112113

113114
fn dns_resolver(&self) -> BoxDnsResolver;
115+
114116
fn lookup(
115117
&self,
116118
host: String,

server/swimos_server_app/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ rand = { workspace = true }
3939
url = { workspace = true }
4040
swimos_rocks_store = { workspace = true, optional = true }
4141
parking_lot = { workspace = true }
42-
hyper = { workspace = true, features = ["server", "runtime", "tcp", "http1", "backports"] }
42+
hyper = { workspace = true, features = ["server", "http1"] }
4343
pin-project = { workspace = true }
4444
percent-encoding = { workspace = true }
4545
rustls = { workspace = true }
46+
http-body-util = { workspace = true }
47+
hyper-util = { workspace = true }
4648

4749
[dev-dependencies]
4850
swimos_recon = { workspace = true }

0 commit comments

Comments
 (0)