Skip to content

Single active consumer implementation #248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/single_active_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Single active consumer
---

This is an example to enable single active consumer functionality for superstream:
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams

This folder contains a super-stream consumer configured to enable it.
You can use the example in the super-stream folder to produce messages for a super-stream.

You can then run the consumer in this folder.
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.

You can then run another consumer in parallel.
now you'll see that one of the two consumers will consume from 2 streams while the other on one stream.

If you run another you'll see that every Consumer will read from a single stream.

If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running.




Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use futures::StreamExt;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let message_count = 1000000;
let super_stream = "hello-rust-super-stream";

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
println!(
"Super stream consumer example, consuming messages from the super stream {}",
super_stream
);

let mut properties = HashMap::new();

properties.insert("single-active-consumer".to_string(), "true".to_string());
properties.insert("name".to_string(), "consumer-group-1".to_string());
properties.insert("super-stream".to_string(), "hello-rust-super-stream".to_string());

let mut super_stream_consumer: SuperStreamConsumer = environment
.super_stream_consumer()
.offset(OffsetSpecification::First)
.client_provided_name("my super stream consumer for hello rust")
/*We can decide a strategy to manage Offset specification in single active consumer based on is_active flag
By default if this clousure is not present the default strategy OffsetSpecification::NEXT will be set.*/
.consumer_update(move |active, message_context| {
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
OffsetSpecification::First
})
.properties(properties)
.build(super_stream)
.await
.unwrap();

for _ in 0..message_count {
let delivery = super_stream_consumer.next().await.unwrap();
{
let delivery = delivery.unwrap();
println!(
"Got message: {:#?} from stream: {} with offset: {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec()).unwrap())
.unwrap(),
delivery.stream(),
delivery.offset()
);
}
}

println!("Stopping super stream consumer...");
let _ = super_stream_consumer.handle().close().await;
println!("Super stream consumer stopped");
Ok(())
}
88 changes: 88 additions & 0 deletions protocol/src/commands/consumer_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_CONSUMER_UPDATE,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct ConsumerUpdateCommand {
pub(crate) correlation_id: u32,
subscription_id: u8,
active: u8,
}

impl ConsumerUpdateCommand {
pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
Self {
correlation_id,
subscription_id,
active,
}
}

Check warning on line 29 in protocol/src/commands/consumer_update.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/consumer_update.rs#L23-L29

Added lines #L23 - L29 were not covered by tests

pub fn get_correlation_id(&self) -> u32 {
self.correlation_id
}

pub fn is_active(&self) -> u8 {
self.active
}
}

impl Encoder for ConsumerUpdateCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.subscription_id.encoded_size()
+ self.active.encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.subscription_id.encode(writer)?;
self.active.encode(writer)?;
Ok(())
}
}

impl Decoder for ConsumerUpdateCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, subscription_id) = u8::decode(input)?;
let (input, active) = u8::decode(input)?;

Ok((
input,
ConsumerUpdateCommand {
correlation_id,
subscription_id,
active,
},
))
}
}

impl Command for ConsumerUpdateCommand {
fn key(&self) -> u16 {
COMMAND_CONSUMER_UPDATE
}

Check warning on line 75 in protocol/src/commands/consumer_update.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/consumer_update.rs#L73-L75

Added lines #L73 - L75 were not covered by tests
}

#[cfg(test)]
mod tests {
use crate::commands::tests::command_encode_decode_test;

use super::ConsumerUpdateCommand;

#[test]
fn consumer_update_response_test() {
command_encode_decode_test::<ConsumerUpdateCommand>();
}
}
86 changes: 86 additions & 0 deletions protocol/src/commands/consumer_update_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
};

use crate::commands::subscribe::OffsetSpecification;

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct ConsumerUpdateRequestCommand {
pub(crate) correlation_id: u32,
response_code: u16,
offset_specification: OffsetSpecification,
}

impl ConsumerUpdateRequestCommand {
pub fn new(
correlation_id: u32,
response_code: u16,
offset_specification: OffsetSpecification,
) -> Self {
Self {
correlation_id,
response_code,
offset_specification,
}
}
}

impl Encoder for ConsumerUpdateRequestCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.response_code.encoded_size()
+ self.offset_specification.encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.response_code.encode(writer)?;
self.offset_specification.encode(writer)?;
Ok(())
}
}

impl Decoder for ConsumerUpdateRequestCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, response_code) = u16::decode(input)?;
let (input, offset_specification) = OffsetSpecification::decode(input)?;

Ok((
input,
ConsumerUpdateRequestCommand {
correlation_id,
response_code,
offset_specification,
},
))
}
}

impl Command for ConsumerUpdateRequestCommand {
fn key(&self) -> u16 {
COMMAND_CONSUMER_UPDATE_REQUEST
}
}

#[cfg(test)]
mod tests {
use crate::commands::tests::command_encode_decode_test;

use super::ConsumerUpdateRequestCommand;

#[test]
fn consumer_update_request_test() {
command_encode_decode_test::<ConsumerUpdateRequestCommand>();
}
}
2 changes: 2 additions & 0 deletions protocol/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::protocol::version::PROTOCOL_VERSION;

pub mod close;
pub mod consumer_update;
pub mod consumer_update_request;
pub mod create_stream;
pub mod create_super_stream;
pub mod credit;
Expand Down
1 change: 1 addition & 0 deletions protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod commands {
pub const COMMAND_STREAMS_STATS: u16 = 28;
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794;
}

// server responses
Expand Down
22 changes: 16 additions & 6 deletions protocol/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use crate::{
codec::{decoder::read_u32, Decoder, Encoder},
commands::{
close::CloseRequest, create_stream::CreateStreamCommand,
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
declare_publisher::DeclarePublisherCommand, delete::Delete,
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
Expand Down Expand Up @@ -68,6 +68,7 @@
DeleteSuperStream(DeleteSuperStreamCommand),
SuperStreamPartitions(SuperStreamPartitionsRequest),
SuperStreamRoute(SuperStreamRouteRequest),
ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
}

impl Encoder for RequestKind {
Expand Down Expand Up @@ -105,6 +106,9 @@
super_stream_partitions.encoded_size()
}
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
consumer_update_request.encoded_size()
}
}
}

Expand Down Expand Up @@ -142,6 +146,9 @@
super_stream_partition.encode(writer)
}
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
consumer_update_request.encode(writer)
}
}
}
}
Expand Down Expand Up @@ -222,6 +229,9 @@
COMMAND_ROUTE => {
SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
}
COMMAND_CONSUMER_UPDATE_REQUEST => {
ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?

Check warning on line 233 in protocol/src/request/mod.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/request/mod.rs#L233

Added line #L233 was not covered by tests
}
n => return Err(DecodeError::UnsupportedResponseType(n)),
};
Ok((input, Request { header, kind: cmd }))
Expand All @@ -234,9 +244,9 @@
use crate::{
codec::{Decoder, Encoder},
commands::{
close::CloseRequest, create_stream::CreateStreamCommand,
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
declare_publisher::DeclarePublisherCommand, delete::Delete,
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
delete_publisher::DeletePublisherCommand,
delete_super_stream::DeleteSuperStreamCommand,
exchange_command_versions::ExchangeCommandVersionsRequest,
Expand Down
9 changes: 8 additions & 1 deletion protocol/src/request/shims.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::commands::create_super_stream::CreateSuperStreamCommand;
use crate::commands::delete_super_stream::DeleteSuperStreamCommand;
use crate::{
commands::{
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
create_stream::CreateStreamCommand, credit::CreditCommand,
declare_publisher::DeclarePublisherCommand, delete::Delete,
delete_publisher::DeletePublisherCommand,
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
Expand Down Expand Up @@ -164,3 +165,9 @@ impl From<SuperStreamRouteRequest> for RequestKind {
RequestKind::SuperStreamRoute(cmd)
}
}

impl From<ConsumerUpdateRequestCommand> for RequestKind {
fn from(cmd: ConsumerUpdateRequestCommand) -> Self {
RequestKind::ConsumerUpdateRequest(cmd)
}
}
Loading
Loading