Skip to content

Expose streaming API #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

s0l0ist
Copy link
Contributor

@s0l0ist s0l0ist commented Jul 11, 2025

📬 Issue #, if available:

The SDK supports streaming, but lambda_http::Adapter only handles buffered responses from Axum's Router (i.e., a Service). This change allows direct streaming from Axum handlers/services, while specifying a custom runtime (i.e., with OTeL).

For example, lambda_http::run_with_streaming_response(app).await doesn't allow you to specify a custom runtime. So this change helps to abstract out constructing the stream response and allowing a custom runtime.

Related (ish):

✍️ Description of changes:

This PR exposes:

  • into_streaming_response: Convert a Service into an AWS Lambda streaming response.

Which was originally internal to the run_with_streaming_response function. There are no functional changes.

There might be a more ergonomic way to expose an API like this, but I'm not aware. Happy to make changes as necessary so we're not exposing internals that may change in the future.

This is how you can use it with a custom runtime supporting OTeL:

use crate::{error::BoxError, service::build_service};
use axum::Router;
use lambda_http::into_streaming_response;
use lambda_runtime::{
    Runtime,
    layers::{OpenTelemetryFaasTrigger, OpenTelemetryLayer},
};
use opentelemetry_sdk::trace as sdktrace;

/// The main function to run the AWS Lambda
pub async fn async_main(tracer_provider: sdktrace::SdkTracerProvider) -> Result<(), BoxError> {
    let (app, _): (Router, _) = build_service().await;

    // For a buffered response:
    // let handler = lambda_http::Adapter::from(app);
    // For a streamed response:
    let handler = into_streaming_response(app);

    let runtime = Runtime::new(handler).layer(
        // Create a tracing span for each Lambda invocation
        OpenTelemetryLayer::new(|| {
            if let Err(err) = tracer_provider.force_flush() {
                eprintln!("Error flushing traces: {:#?}", err);
            }
        })
        .with_trigger(OpenTelemetryFaasTrigger::Http),
    );
    runtime.run().await
}

🔏 By submitting this pull request

  • I confirm that I've ran cargo +nightly fmt.
  • I confirm that I've ran cargo clippy --fix.
  • I confirm that I've made a best effort attempt to update all relevant documentation.
  • I confirm that my contribution is made under the terms of the Apache 2.0 license.

@jlizen
Copy link
Contributor

jlizen commented Jul 11, 2025

Hi @s0l0ist , thanks for cutting this PR. This change seems very reasonable to me. I'd love to make the runtime more executor-agnostic, but that's a larger scope of work. This seems like a good middle ground that is useful in its own right.

I do have concerns about leaking internals. This is only naming return types that are already public, but we might want to tweak their composition or otherwise shift bounds in ways that break callers. I think it would be best to type erase the returned service stack.

The easiest way to do that would be via a tower::util::BoxService / BoxLayer (or the sync/clone-bounded variants, though I don't think we need them here - but we could certainly add alternate APIs that do include the sync/clone bounds if it would be useful for how the returned struct is used by a runtime).

That adds a small amount of performance overhead due to an extra allocation and layer of dynamic dispatch. But, I think the ergonomics would be much better compared to a more complex, composable builder-style API using generics and sealed inner layer types. Note that we would probably want some sort of into_streaming_streaming_response_inner() that is what into_streaming_response() is currently, so that our run_with_streaming_response can skip the minor type erasure overhead.

How does that sound to you?

It would also be great to get a small example showing usage of this with a non-tokio runtime, if you'd be open to it! That would both let us validate the API, and make it more discoverable for users. I'd probably be ok with this landing without that, though, if you don't have cycles.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants