Skip to content

Commit 3ee4389

Browse files
authored
Add support for Lambda streaming response (#628)
1 parent cd0a19c commit 3ee4389

File tree

9 files changed

+385
-11
lines changed

9 files changed

+385
-11
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "basic-streaming-response"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
hyper = { version = "0.14", features = [
10+
"http1",
11+
"client",
12+
"stream",
13+
] }
14+
lambda_runtime = { path = "../../lambda-runtime" }
15+
tokio = { version = "1", features = ["macros"] }
16+
tracing = { version = "0.1", features = ["log"] }
17+
tracing-subscriber = { version = "0.3", default-features = false, features = ["ansi", "fmt"] }
18+
serde_json = "1.0"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# AWS Lambda Function example
2+
3+
## Build & Deploy
4+
5+
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
6+
2. Build the function with `cargo lambda build --release`
7+
3. Deploy the function to AWS Lambda with `cargo lambda deploy --enable-function-url --iam-role YOUR_ROLE`
8+
4. Enable Lambda streaming response on Lambda console: change the function url's invoke mode to `RESPONSE_STREAM`
9+
5. Verify the function works: `curl <function-url>`. The results should be streamed back with 0.5 second pause between each word.
10+
11+
## Build for ARM 64
12+
13+
Build the function with `cargo lambda build --release --arm64`
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use hyper::{body::Body, Response};
2+
use lambda_runtime::{service_fn, Error, LambdaEvent};
3+
use serde_json::Value;
4+
use std::{thread, time::Duration};
5+
6+
async fn func(_event: LambdaEvent<Value>) -> Result<Response<Body>, Error> {
7+
let messages = vec!["Hello", "world", "from", "Lambda!"];
8+
9+
let (mut tx, rx) = Body::channel();
10+
11+
tokio::spawn(async move {
12+
for message in messages.iter() {
13+
tx.send_data((message.to_string() + "\n").into()).await.unwrap();
14+
thread::sleep(Duration::from_millis(500));
15+
}
16+
});
17+
18+
let resp = Response::builder()
19+
.header("content-type", "text/html")
20+
.header("CustomHeader", "outerspace")
21+
.body(rx)?;
22+
23+
Ok(resp)
24+
}
25+
26+
#[tokio::main]
27+
async fn main() -> Result<(), Error> {
28+
// required to enable CloudWatch error logging by the runtime
29+
tracing_subscriber::fmt()
30+
.with_max_level(tracing::Level::INFO)
31+
// disable printing the name of the module in every log line.
32+
.with_target(false)
33+
// this needs to be set to false, otherwise ANSI color codes will
34+
// show up in a confusing manner in CloudWatch logs.
35+
.with_ansi(false)
36+
// disabling time is handy because CloudWatch will add the ingestion time.
37+
.without_time()
38+
.init();
39+
40+
lambda_runtime::run_with_streaming_response(service_fn(func)).await?;
41+
Ok(())
42+
}

lambda-http/Cargo.toml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,20 @@ apigw_websockets = []
2323
alb = []
2424

2525
[dependencies]
26-
base64 = "0.13.0"
27-
bytes = "1"
26+
base64 = "0.21"
27+
bytes = "1.4"
28+
futures = "0.3"
2829
http = "0.2"
2930
http-body = "0.4"
30-
hyper = "0.14.20"
31+
hyper = "0.14"
3132
lambda_runtime = { path = "../lambda-runtime", version = "0.7" }
32-
serde = { version = "^1", features = ["derive"] }
33-
serde_json = "^1"
34-
serde_urlencoded = "0.7.0"
35-
mime = "0.3.16"
36-
encoding_rs = "0.8.31"
37-
url = "2.2.2"
38-
percent-encoding = "2.2.0"
33+
serde = { version = "1.0", features = ["derive"] }
34+
serde_json = "1.0"
35+
serde_urlencoded = "0.7"
36+
mime = "0.3"
37+
encoding_rs = "0.8"
38+
url = "2.2"
39+
percent-encoding = "2.2"
3940

4041
[dependencies.aws_lambda_events]
4142
version = "^0.7.2"

lambda-http/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ use std::{
9292
task::{Context as TaskContext, Poll},
9393
};
9494

95+
mod streaming;
96+
pub use streaming::run_with_streaming_response;
97+
9598
/// Type alias for `http::Request`s with a fixed [`Body`](enum.Body.html) type
9699
pub type Request = http::Request<Body>;
97100

lambda-http/src/streaming.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use crate::request::LambdaRequest;
2+
use crate::tower::ServiceBuilder;
3+
use crate::{Request, RequestExt};
4+
pub use aws_lambda_events::encodings::Body as LambdaEventBody;
5+
use bytes::Bytes;
6+
pub use http::{self, Response};
7+
use http_body::Body;
8+
use lambda_runtime::LambdaEvent;
9+
pub use lambda_runtime::{self, service_fn, tower, Context, Error, Service};
10+
use std::fmt::{Debug, Display};
11+
12+
/// Starts the Lambda Rust runtime and stream response back [Configure Lambda
13+
/// Streaming Response](https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html).
14+
///
15+
/// This takes care of transforming the LambdaEvent into a [`Request`] and
16+
/// accepts [`http::Response<http_body::Body>`] as response.
17+
pub async fn run_with_streaming_response<'a, S, B, E>(handler: S) -> Result<(), Error>
18+
where
19+
S: Service<Request, Response = Response<B>, Error = E>,
20+
S::Future: Send + 'a,
21+
E: Debug + Display,
22+
B: Body + Unpin + Send + 'static,
23+
B::Data: Into<Bytes> + Send,
24+
B::Error: Into<Error> + Send + Debug,
25+
{
26+
let svc = ServiceBuilder::new()
27+
.map_request(|req: LambdaEvent<LambdaRequest>| {
28+
let event: Request = req.payload.into();
29+
event.with_lambda_context(req.context)
30+
})
31+
.service(handler);
32+
33+
lambda_runtime::run_with_streaming_response(svc).await
34+
}

lambda-runtime-api-client/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ where
5353

5454
/// Create a new client with a given base URI and HTTP connector.
5555
pub fn with(base: Uri, connector: C) -> Self {
56-
let client = hyper::Client::builder().build(connector);
56+
let client = hyper::Client::builder()
57+
.http1_max_buf_size(1024 * 1024)
58+
.build(connector);
5759
Self { base, client }
5860
}
5961

lambda-runtime/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ mod simulated;
3434
/// Types available to a Lambda function.
3535
mod types;
3636

37+
mod streaming;
38+
pub use streaming::run_with_streaming_response;
39+
3740
use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest};
3841
pub use types::{Context, LambdaEvent};
3942

0 commit comments

Comments
 (0)