-
Notifications
You must be signed in to change notification settings - Fork 61
feat: integrates Aws bedrock into golem #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was one cargo check failing. I have fixed that now! |
bd5725e
to
023723e
Compare
While running test case 6, I noticed that the retry_prompt implementation doesn't work as fluid as it is meant to. It repeats some parts of the previous message stream and other times outrightly returns an incorrect resumption stream. I changed it to be more accurate. Video attached below test6-fix.mov |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vigoo, this one looks good from my side
} | ||
|
||
impl llm::GuestChatStream for BedrockChatStream { | ||
fn get_next(&self) -> Option<Vec<llm::StreamEvent>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation blocks currently until the next event is available, which only blocking_get_next() should do.
Morally it's probably better to always return none here and only do the blocking in blocking_get_next()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I totally get the moral angle. I will move that logic into blocking_get_next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So after looking into the code, changing this will require me to make changes to the underlying durability implementation llm/llm/src/durability.rs#L291 since that depends on calling get_next
directly and not blocking_get_next
.
I totally understand the moral aspect but it will introduce more changes to the core durability implementation than is necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. Let's wait for @vigoo then and either do the necessary changes here or in a followup.
Also please look into using https://github.com/yoshuawuyts/wasm-http-tools/tree/main/crates/wasi-async-runtime as the async runtime instead of tokio. It performs better and is more stable than tokio for wasi in our experience |
Will check out this async runtime and try it out. |
@mschuwalow I checked out the library and I can't believe I didn't know about that! 🔥 I switched to using this runtime instead; all integration tests are still passing 🔥 and it does feel snappier! |
By using the async runtime's |
I understand that, I will use the Edit: I got the answer to my question. Even though my scenario does not show it, if I am trying to run multiple pollables from the same callback in a concurrent way I would need to use this block_on. |
Edit: So @vigoo and @kmatasfp something is wrong with using the This one in the attached image is Below is the implementation of impl AsyncSleep for WasiSleep {
fn sleep(&self, duration: std::time::Duration) -> Sleep {
let reactor = self.reactor.clone();
let fut = async move {
let nanos = duration.as_nanos() as u64;
let pollable = monotonic_clock::subscribe_duration(nanos);
reactor
.clone()
.wait_for(unsafe { std::mem::transmute(pollable) })
.await;
};
Sleep::new(Box::pin(UnsafeFuture::new(fut)))
}
}
unsafe impl Send for WasiSleep {}
unsafe impl Sync for WasiSleep {}
#[derive(Clone)]
pub struct UnsafeFuture<Fut> {
inner: Fut,
}
impl<F> UnsafeFuture<F>
where
F: Future,
{
pub fn new(inner: F) -> Self {
Self { inner }
}
}
unsafe impl<F> Send for UnsafeFuture<F> where F: Future {}
unsafe impl<F> Sync for UnsafeFuture<F> where F: Future {}
impl<F> Future for UnsafeFuture<F>
where
F: Future,
{
type Output = F::Output;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let pinned_future = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) };
pinned_future.poll(cx)
}
} When I switch to calling |
I got an idea why, and I am going to implement it and update you guys on my findings. OverviewI will implement I will use the concrete http client provided here: golemcloud/reqwest/blob/9e0c586a3f2fc2f9fe32ddf46c2a49152777693b/src/wasi/async_client.rs#L65 and implement it. I have learnt so much from this. I am actually excited |
This was such an interesting process for me 🔥. I have changed from using the default I had to refactor how the runtime was instantiated by moving them to the top-level. After my migration, I ran all the tests and they are still working beautifully! Thank you everyone, those were really great improvements! cc @vigoo @kmatasfp @mschuwalow Let me know if you need me to make a new video showcasing what is going on. |
reactor: wasi_async_runtime::Reactor, | ||
} | ||
|
||
impl WasiClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around some more on my branch with aws sdk, this approach works, as long as aws sdk does not fail:
In case of failure for example trying to upload to a s3 bucket that does not exist or I do not have permissions to I get nasty error and crash my worker:
2025-07-13T09:45:31.305659Z WARN golem_worker_executor::worker::invocation_loop: Failed to start the worker: Failed to resume worker: 3f7ca448-9fc7-4b09-90a6-22d5345e12a6/worker-1-5c3995a8-cf36-4ed1-acae-3f51d091641a: Runtime error: error while executing at wasm backtrace: 0: 0x204f5c - golem_stt_whisper.wasm!core::ptr::drop_in_place<reqwest::wasi::async_client::CustomRequestExecution>::h1d1d57ba2ebabf5b.27849 1: 0x298a8 - golem_stt_whisper.wasm!<futures_concurrency::future::join::tuple::Join2<A,B> as core::future::future::Future>::poll::hc7c07be640063ff0 2: 0x1f76fd - golem_stt_whisper.wasm!<golem_stt_whisper::aws_client::UnsafeFuture<Fut> as core::future::future::Future>::poll::h387a1df8e55a85b3 3: 0x1c2e0f - golem_stt_whisper.wasm!<aws_smithy_runtime_api::client::http::HttpConnectorFuture as core::future::future::Future>::poll::h8cebdd696a953f9b 4: 0x12cec6 - golem_stt_whisper.wasm!<aws_smithy_runtime::client::http::body::minimum_throughput::MaybeUploadThroughputCheckFuture as core::future::future::Future>::poll::h373dcbad0ff7fadf 5: 0x14f840 - golem_stt_whisper.wasm!aws_smithy_runtime::client::orchestrator::try_attempt::{{closure}}::h0f6ac7c602162fb4 6: 0x14cafe - golem_stt_whisper.wasm!<tracing::instrument::Instrumented<T> as core::future::future::Future>::poll::hd2498155e7011856 7: 0x154422 - golem_stt_whisper.wasm!<aws_smithy_runtime::client::timeout::MaybeTimeoutFuture<InnerFuture> as core::future::future::Future>::poll::hb05289326b80f2f9
8: 0x72bfb - golem_stt_whisper.wasm!<aws_smithy_runtime::client::timeout::MaybeTimeoutFuture<InnerFuture> as core::future::future::Future>::poll::hf24f9fb20016d1ae 9: 0x4605e - golem_stt_whisper.wasm!<tracing::instrument::Instrumented<T> as core::future::future::Future>::poll::hf8c60fa9e71d641c 10: 0x3fed4 - golem_stt_whisper.wasm!golem_stt_whisper::aws::S3Client::put_object::{{closure}}::h22fa7290b4479229 11: 0x20e0e8 - golem_stt_whisper.wasm!golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0#test-put-object-to-s3: Unexpected oplog entry: expected OplogEntry::EndRemoteWrite |, got ImportedFunctionInvoked { timestamp: Timestamp(Timestamp("2025-07-13T09:44:46.369000000Z")), function_name: "golem io::poll::poll", request: Inline([2]), response: Inline([2, 0, 1, 3]), wrapped_function_type: ReadLocal }
Using the official WasiHttpClientBuilder things behave as expected. So I recommend you also include some tests for failure cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do some fuzz tests for unexpected scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also got bunch of these errors:
[runtime error] failed to invoke function golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0.{test-aws-transcribe}: Worker Service - Error: 500 Internal Server Error, Runtime error: error while executing at wasm backtrace:
0: 0x37cea - golem_stt_whisper.wasm!core::ptr::drop_in_place<reqwest::wasi::async_client::CustomRequestExecution>::h22eaeb82651671cf.28263
1: 0x3aeca - golem_stt_whisper.wasm!reqwest::wasi::async_client::Client::execute::{{closure}}::h31d8bb41b5f258c4
2: 0x363ab - golem_stt_whisper.wasm!<golem_stt_whisper::aws_client::ReqwestHttpClient as golem_stt_whisper::aws_client::HttpClient<bytes::bytes::Bytes>>::execute::{{closure}}::h3cbe4efd76e508be
3: 0x2fe73 - golem_stt_whisper.wasm!golem_stt_whisper::aws2::S3Client<HC>::put_object::{{closure}}::h2bc0f090990ccfe2
4: 0x4a782 - golem_stt_whisper.wasm!wasi_async_runtime::block_on::block_on::hf4157653bfb99eab
5: 0x5d6f6 - golem_stt_whisper.wasm!golem:stt-whisper-exports/golem-stt-whisper-test@1.0.0#test-aws-transcribe: Unexpected oplog entry: expected OplogEntry::EndRemoteWrite |, got ImportedFunctionInvoked { timestamp: Timestamp(Timestamp("2025-07-13T15:08:41.232000000Z")), function_name: "golem io::poll::poll", request: Inline([2]), response: Inline([2, 0, 1, 2]), wrapped_function_type: ReadLocal }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok figured out where these errors come from, it seems if you use sleep for example when you do exponential backoff before retrying , golem kills the worker
What's New?
Integrates AWS Bedrock into Golem LLM.
Implementation details
nopoll
to disable polling related features for llm-bedrock crate. (polling is not needed for aws bedrock integration since the SDK does not expose a compatible pollable type).get_config_key_or_none
to golem_llm for optional config variables.infer
crate to infer mime type.Showcase
bedrock-showcase.mov
Test 6 Fix
test6-fix.mov
/claim #2
closes #2