You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Linux f9a6f0aa9b4c 6.10.14-linuxkit #1 SMP Fri Nov 29 17:22:03 UTC 2024 aarch64 GNU/Linux
Crates
Sse
Description
I am using Axum SSE with combination of the async-graphql to subscribe for changes
I am using Redis internally for PubSub and I am passing created stream further on to axum:
let client = redis::Client::open(self.connection_string.clone())
.unwrap()
let mut pubsub = client.get_async_pubsub()
.await
.unwrap()
for channel in &channels {
pubsub.psubscribe(channel)
.await
.unwrap();
}
Ok(Box::pin(pubsub.into_on_message().map(|msg| {
let payload = msg.get_payload::<String>();
match payload {
Ok(value) => value,
Err(e) => serde_json::to_string(&e).unwrap(),
}
})))
Sse stream:
let stream = schema.execute_stream(request).filter_map(|response| {
// Handle errors in the response.
if !response.errors.is_empty() {
tracing::error!(
"Error occurred while executing the request: {:?}",
&response.errors
);
return Some(Ok::<Event, Infallible>(
Event::default()
.json_data(response.errors)
.unwrap_or_default(),
));
}
let data = response.data.into_json();
match data {
Ok(data) => {
return Some(Ok::<Event, Infallible>(
Event::default().json_data(data).unwrap_or(
Event::default().data(
"Error occurred while serializing the data".to_string(),
),
),
));
},
Err(e) => {
return Some(Ok::<Event, Infallible>(
Event::default()
.json_data(e.to_string())
.unwrap_or_default(),
));
},
}
});
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
The problem is that I always get only first event emitted but not the others, as if the stream has finished.
I expect to get each event, not only the first one. Am I doing something wrong ?
I have tried multiple solutions. i have tried to wrap top level stream (that one passed to Sse) wrap in mspc::BroadcastStream, create separated thread .. but nothing really works. Can pin_project, use internally by the Sse, cause this issue ?
This discussion was converted from issue #3309 on April 26, 2025 22:11.
Heading
Bold
Italic
Quote
Code
Link
Numbered list
Unordered list
Task list
Attach files
Mention
Reference
Menu
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Bug Report
Version
Platform
Linux f9a6f0aa9b4c 6.10.14-linuxkit #1 SMP Fri Nov 29 17:22:03 UTC 2024 aarch64 GNU/Linux
Crates
Description
I am using Axum SSE with combination of the async-graphql to subscribe for changes
I am using Redis internally for PubSub and I am passing created stream further on to axum:
Beta Was this translation helpful? Give feedback.
All reactions