Skip to content

Commit d08fbda

Browse files
chore: dedup send_request_handler (#3184)
Signed-off-by: Brian Hardock <brian.hardock@fermyon.com>
1 parent 63ff850 commit d08fbda

File tree

1 file changed

+29
-39
lines changed
  • crates/factor-outbound-http/src

1 file changed

+29
-39
lines changed

crates/factor-outbound-http/src/wasi.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{error::Error, future::Future, pin::Pin, sync::Arc};
1+
use std::{error::Error, future::Future, pin::Pin, sync::Arc, time::Duration};
22

33
use anyhow::Context;
44
use bytes::Bytes;
@@ -294,25 +294,7 @@ async fn send_request_handler(
294294

295295
let stream = TokioIo::new(stream);
296296

297-
let (sender, conn) = if is_http2 {
298-
timeout(
299-
connect_timeout,
300-
hyper::client::conn::http2::handshake(TokioExecutor::default(), stream),
301-
)
302-
.await
303-
.map_err(|_| ErrorCode::ConnectionTimeout)?
304-
.map_err(hyper_request_error)
305-
.map(|(sender, conn)| (HttpSender::Http2(sender), HttpConn::Http2(conn)))?
306-
} else {
307-
timeout(
308-
connect_timeout,
309-
hyper::client::conn::http1::handshake(stream),
310-
)
311-
.await
312-
.map_err(|_| ErrorCode::ConnectionTimeout)?
313-
.map_err(hyper_request_error)
314-
.map(|(sender, conn)| (HttpSender::Http1(sender), HttpConn::Http1(conn)))?
315-
};
297+
let (sender, conn) = new_sender_and_conn(stream, is_http2, connect_timeout).await?;
316298

317299
let worker = wasmtime_wasi::runtime::spawn(async move {
318300
match conn.await {
@@ -335,25 +317,7 @@ async fn send_request_handler(
335317
.is_some_and(|authority| authority.as_str() == v)
336318
});
337319

338-
let (sender, conn) = if is_http2 {
339-
timeout(
340-
connect_timeout,
341-
hyper::client::conn::http2::handshake(TokioExecutor::default(), tcp_stream),
342-
)
343-
.await
344-
.map_err(|_| ErrorCode::ConnectionTimeout)?
345-
.map_err(hyper_request_error)
346-
.map(|(sender, conn)| (HttpSender::Http2(sender), HttpConn::Http2(conn)))?
347-
} else {
348-
timeout(
349-
connect_timeout,
350-
hyper::client::conn::http1::handshake(tcp_stream),
351-
)
352-
.await
353-
.map_err(|_| ErrorCode::ConnectionTimeout)?
354-
.map_err(hyper_request_error)
355-
.map(|(sender, conn)| (HttpSender::Http1(sender), HttpConn::Http1(conn)))?
356-
};
320+
let (sender, conn) = new_sender_and_conn(tcp_stream, is_http2, connect_timeout).await?;
357321

358322
let worker = wasmtime_wasi::runtime::spawn(async move {
359323
match conn.await {
@@ -397,6 +361,32 @@ async fn send_request_handler(
397361
})
398362
}
399363

364+
async fn new_sender_and_conn<T: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static>(
365+
stream: T,
366+
is_http2: bool,
367+
connect_timeout: Duration,
368+
) -> Result<(HttpSender, HttpConn<T>), ErrorCode> {
369+
if is_http2 {
370+
timeout(
371+
connect_timeout,
372+
hyper::client::conn::http2::handshake(TokioExecutor::default(), stream),
373+
)
374+
.await
375+
.map_err(|_| ErrorCode::ConnectionTimeout)?
376+
.map_err(hyper_request_error)
377+
.map(|(sender, conn)| (HttpSender::Http2(sender), HttpConn::Http2(conn)))
378+
} else {
379+
timeout(
380+
connect_timeout,
381+
hyper::client::conn::http1::handshake(stream),
382+
)
383+
.await
384+
.map_err(|_| ErrorCode::ConnectionTimeout)?
385+
.map_err(hyper_request_error)
386+
.map(|(sender, conn)| (HttpSender::Http1(sender), HttpConn::Http1(conn)))
387+
}
388+
}
389+
400390
enum HttpSender {
401391
Http1(hyper::client::conn::http1::SendRequest<BoxBody<Bytes, ErrorCode>>),
402392
Http2(hyper::client::conn::http2::SendRequest<BoxBody<Bytes, ErrorCode>>),

0 commit comments

Comments
 (0)