Skip to content

feat: integrates Aws bedrock into golem #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

iambenkay
Copy link
Contributor

@iambenkay iambenkay commented Jun 12, 2025

What's New?

Integrates AWS Bedrock into Golem LLM.

Implementation details

  1. Uses wasi-async-runtime and custom wasi reqwest client to run aws-sdk operations.
  2. Introduces a feature flag nopoll to disable polling related features for llm-bedrock crate. (polling is not needed for aws bedrock integration since the SDK does not expose a compatible pollable type).
  3. Added get_config_key_or_none to golem_llm for optional config variables.
  4. Introduces infer crate to infer mime type.

Showcase

bedrock-showcase.mov

Test 6 Fix

test6-fix.mov

/claim #2
closes #2

@algora-pbc algora-pbc bot mentioned this pull request Jun 13, 2025
@iambenkay iambenkay changed the title feat: integrations Aws bedrock into golem feat: integrates Aws bedrock into golem Jun 13, 2025
@iambenkay iambenkay marked this pull request as ready for review June 13, 2025 23:30
@iambenkay
Copy link
Contributor Author

There was one cargo check failing. I have fixed that now!

@iambenkay iambenkay force-pushed the aws-bedrock branch 3 times, most recently from bd5725e to 023723e Compare June 15, 2025 23:15
@iambenkay
Copy link
Contributor Author

@jdegoes @vigoo Kindly find the time to review this PR. I included some new readability related changes over the weekend. I also added more trace logs and cleaned up unnecessary unwraps.

@iambenkay
Copy link
Contributor Author

@jdegoes and @vigoo This PR is still waiting for a review. Check it out.

@iambenkay
Copy link
Contributor Author

iambenkay commented Jun 19, 2025

While running test case 6, I noticed that the retry_prompt implementation doesn't work as fluid as it is meant to. It repeats some parts of the previous message stream and other times outrightly returns an incorrect resumption stream.

I changed it to be more accurate. Video attached below

test6-fix.mov

@iambenkay
Copy link
Contributor Author

@jdegoes @vigoo can you take a look at this PR for AWS Bedrock?

@iambenkay
Copy link
Contributor Author

I have migrated the bedrock implementation to work with the new folder structure @vigoo and @jdegoes

Copy link

@mschuwalow mschuwalow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vigoo, this one looks good from my side

}

impl llm::GuestChatStream for BedrockChatStream {
fn get_next(&self) -> Option<Vec<llm::StreamEvent>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation blocks currently until the next event is available, which only blocking_get_next() should do.

Morally it's probably better to always return none here and only do the blocking in blocking_get_next()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I totally get the moral angle. I will move that logic into blocking_get_next.

Copy link
Contributor Author

@iambenkay iambenkay Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after looking into the code, changing this will require me to make changes to the underlying durability implementation llm/llm/src/durability.rs#L291 since that depends on calling get_next directly and not blocking_get_next.

I totally understand the moral aspect but it will introduce more changes to the core durability implementation than is necessary

Copy link

@mschuwalow mschuwalow Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. Let's wait for @vigoo then and either do the necessary changes here or in a followup.

@mschuwalow
Copy link

mschuwalow commented Jul 4, 2025

Also please look into using https://github.com/yoshuawuyts/wasm-http-tools/tree/main/crates/wasi-async-runtime as the async runtime instead of tokio. It performs better and is more stable than tokio for wasi in our experience

@iambenkay
Copy link
Contributor Author

Will check out this async runtime and try it out.

@iambenkay
Copy link
Contributor Author

iambenkay commented Jul 4, 2025

@mschuwalow I checked out the library and I can't believe I didn't know about that! 🔥

I switched to using this runtime instead; all integration tests are still passing 🔥 and it does feel snappier!
Thanks for the recommendation!

@vigoo
Copy link
Collaborator

vigoo commented Jul 9, 2025

So my follow up question is, since we are in single threaded env anyways, why is it bad to call Pollable::block?

By using the async runtime's block_on, all the pollables are collected in the reactor and they get polled together. This allows them to advance concurrently, and allows to achieve concurrency for the rust futures. If you block on a single pollable, that blocks the thread and nothing else can advance

@iambenkay
Copy link
Contributor Author

iambenkay commented Jul 9, 2025

I understand that, I will use the block_on for the sake of future proofing (I think I read somewhere that some efforts are being made to bring multithreading to golem later on) but I was more curious about if this can benefit a single threaded system like the golem worker runtime?

Edit:

I got the answer to my question. Even though my scenario does not show it, if I am trying to run multiple pollables from the same callback in a concurrent way I would need to use this block_on.

@iambenkay
Copy link
Contributor Author

iambenkay commented Jul 9, 2025

Edit:
I found the root cause and I described it in a later comment.


So @vigoo and @kmatasfp something is wrong with using the wait_for on the pollable. It is not apparent what the issue is, but it panics and this happens randomly (succeeds other times and when it panics golem automatically recovers and actually runs it to the end without any further panic). It is happening especially for tests that are streaming responses from bedrock. The error is weird because I am explicitly passing a pollable.

This one in the attached image is test2 a test case where we are not simulating a panic.

image

Below is the implementation of WasiSleep that leads to the erratic panic I described above:

impl AsyncSleep for WasiSleep {
    fn sleep(&self, duration: std::time::Duration) -> Sleep {
        let reactor = self.reactor.clone();

        let fut = async move {
            let nanos = duration.as_nanos() as u64;
            let pollable = monotonic_clock::subscribe_duration(nanos);

            reactor
                .clone()
                .wait_for(unsafe { std::mem::transmute(pollable) })
                .await;
        };
        Sleep::new(Box::pin(UnsafeFuture::new(fut)))
    }
}

unsafe impl Send for WasiSleep {}
unsafe impl Sync for WasiSleep {}

#[derive(Clone)]
pub struct UnsafeFuture<Fut> {
    inner: Fut,
}

impl<F> UnsafeFuture<F>
where
    F: Future,
{
    pub fn new(inner: F) -> Self {
        Self { inner }
    }
}

unsafe impl<F> Send for UnsafeFuture<F> where F: Future {}
unsafe impl<F> Sync for UnsafeFuture<F> where F: Future {}

impl<F> Future for UnsafeFuture<F>
where
    F: Future,
{
    type Output = F::Output;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let pinned_future = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) };
        pinned_future.poll(cx)
    }
}

When I switch to calling .block() directly (as in the PR) all tests pass without any panic.

@iambenkay
Copy link
Contributor Author

I got an idea why, and I am going to implement it and update you guys on my findings.

Overview

I will implement HttpClient trait coming from aws-config rather than rely on the implementation in aws_smithy_wasm since it does not utilize a reactor.

I will use the concrete http client provided here: golemcloud/reqwest/blob/9e0c586a3f2fc2f9fe32ddf46c2a49152777693b/src/wasi/async_client.rs#L65 and implement it.

I have learnt so much from this. I am actually excited

@iambenkay
Copy link
Contributor Author

iambenkay commented Jul 9, 2025

This was such an interesting process for me 🔥. I have changed from using the default aws-smithy-wasm provided client to the async wasi one linked above.

I had to refactor how the runtime was instantiated by moving them to the top-level.

After my migration, I ran all the tests and they are still working beautifully!

Thank you everyone, those were really great improvements! cc @vigoo @kmatasfp @mschuwalow

Let me know if you need me to make a new video showcasing what is going on.

reactor: wasi_async_runtime::Reactor,
}

impl WasiClient {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around some more on my branch with aws sdk, this approach works, as long as aws sdk does not fail:
In case of failure for example trying to upload to a s3 bucket that does not exist or I do not have permissions to I get nasty error and crash my worker:

  2025-07-13T09:45:31.305659Z  WARN golem_worker_executor::worker::invocation_loop: Failed to start the worker: Failed to resume worker: 3f7ca448-9fc7-4b09-90a6-22d5345e12a6/worker-1-5c3995a8-cf36-4ed1-acae-3f51d091641a: Runtime error: error while executing at wasm backtrace:                                                                                                0: 0x204f5c - golem_stt_whisper.wasm!core::ptr::drop_in_place<reqwest::wasi::async_client::CustomRequestExecution>::h1d1d57ba2ebabf5b.27849                                             1:  0x298a8 - golem_stt_whisper.wasm!<futures_concurrency::future::join::tuple::Join2<A,B> as core::future::future::Future>::poll::hc7c07be640063ff0                                    2: 0x1f76fd - golem_stt_whisper.wasm!<golem_stt_whisper::aws_client::UnsafeFuture<Fut> as core::future::future::Future>::poll::h387a1df8e55a85b3                                        3: 0x1c2e0f - golem_stt_whisper.wasm!<aws_smithy_runtime_api::client::http::HttpConnectorFuture as core::future::future::Future>::poll::h8cebdd696a953f9b                               4: 0x12cec6 - golem_stt_whisper.wasm!<aws_smithy_runtime::client::http::body::minimum_throughput::MaybeUploadThroughputCheckFuture as core::future::future::Future>::poll::h373dcbad0ff7fadf                                                                                                                                                                                    5: 0x14f840 - golem_stt_whisper.wasm!aws_smithy_runtime::client::orchestrator::try_attempt::{{closure}}::h0f6ac7c602162fb4                                                              6: 0x14cafe - golem_stt_whisper.wasm!<tracing::instrument::Instrumented<T> as core::future::future::Future>::poll::hd2498155e7011856                                                    7: 0x154422 - golem_stt_whisper.wasm!<aws_smithy_runtime::client::timeout::MaybeTimeoutFuture<InnerFuture> as core::future::future::Future>::poll::hb05289326b80f2f9
    8:  0x72bfb - golem_stt_whisper.wasm!<aws_smithy_runtime::client::timeout::MaybeTimeoutFuture<InnerFuture> as core::future::future::Future>::poll::hf24f9fb20016d1ae                    9:  0x4605e - golem_stt_whisper.wasm!<tracing::instrument::Instrumented<T> as core::future::future::Future>::poll::hf8c60fa9e71d641c                                                   10:  0x3fed4 - golem_stt_whisper.wasm!golem_stt_whisper::aws::S3Client::put_object::{{closure}}::h22fa7290b4479229                                                                      11: 0x20e0e8 - golem_stt_whisper.wasm!golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0#test-put-object-to-s3: Unexpected oplog entry: expected OplogEntry::EndRemoteWrite |, got ImportedFunctionInvoked { timestamp: Timestamp(Timestamp("2025-07-13T09:44:46.369000000Z")), function_name: "golem io::poll::poll", request: Inline([2]), response: Inline([2, 0, 1, 3]), wrapped_function_type: ReadLocal }

Using the official WasiHttpClientBuilder things behave as expected. So I recommend you also include some tests for failure cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do some fuzz tests for unexpected scenarios

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also got bunch of these errors:

[runtime error] failed to invoke function golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0.{test-aws-transcribe}: Worker Service - Error: 500 Internal Server Error, Runtime error: error while executing at wasm backtrace:
    0:  0x37cea - golem_stt_whisper.wasm!core::ptr::drop_in_place<reqwest::wasi::async_client::CustomRequestExecution>::h22eaeb82651671cf.28263
    1:  0x3aeca - golem_stt_whisper.wasm!reqwest::wasi::async_client::Client::execute::{{closure}}::h31d8bb41b5f258c4
    2:  0x363ab - golem_stt_whisper.wasm!<golem_stt_whisper::aws_client::ReqwestHttpClient as golem_stt_whisper::aws_client::HttpClient<bytes::bytes::Bytes>>::execute::{{closure}}::h3cbe4efd76e508be
    3:  0x2fe73 - golem_stt_whisper.wasm!golem_stt_whisper::aws2::S3Client<HC>::put_object::{{closure}}::h2bc0f090990ccfe2
    4:  0x4a782 - golem_stt_whisper.wasm!wasi_async_runtime::block_on::block_on::hf4157653bfb99eab
    5:  0x5d6f6 - golem_stt_whisper.wasm!golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0#test-aws-transcribe: Unexpected oplog entry: expected OplogEntry::EndRemoteWrite |, got ImportedFunctionInvoked { timestamp: Timestamp(Timestamp("2025-07-13T15:08:41.232000000Z")), function_name: "golem io::poll::poll", request: Inline([2]), response: Inline([2, 0, 1, 2]), wrapped_function_type: ReadLocal }

Copy link

@kmatasfp kmatasfp Jul 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok figured out where these errors come from, it seems if you use sleep for example when you do exponential backoff before retrying , golem kills the worker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add AWS Bedrock Provider
5 participants