Skip to content

Moving producers/consumers client connection creation to Environment #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 6 additions & 50 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ use crate::error::ConsumerStoreOffsetError;
use crate::{
client::{MessageHandler, MessageResult},
error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError},
Client, ClientOptions, Environment, MetricsCollector,
Client, Environment, MetricsCollector,
};
use futures::{future::BoxFuture, task::AtomicWaker, Stream};
use rand::rngs::StdRng;
use rand::{seq::SliceRandom, SeedableRng};

type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;

Expand Down Expand Up @@ -132,56 +130,14 @@ impl ConsumerBuilder {
return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
}

// Connect to the user specified node first, then look for a random replica to connect to instead.
// This is recommended for load balancing purposes
let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
let collector = self.environment.options.client_options.collector.clone();

let mut client = self
let client_uwrapped = self
.environment
.create_client_with_options(opt_with_client_provided_name)
.create_consumer_client(stream, self.client_provided_name.clone())
.await?;
let collector = self.environment.options.client_options.collector.clone();
if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
// If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
// This is desired behavior in case there is only one node in the cluster.
if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
tracing::debug!(
"Picked replica {:?} out of possible candidates {:?} for stream {}",
replica,
metadata.replicas,
stream
);
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
let options = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == replica.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.environment.options.client_options
})
.await?;
}
}
} else {
return Err(ConsumerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

let client = client_uwrapped.clone();

let subscription_id = 1;
let (tx, rx) = channel(10000);
Expand Down
113 changes: 112 additions & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use std::time::Duration;

use crate::producer::NoDedup;
use crate::types::OffsetSpecification;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::collections::HashMap;

use crate::{
client::{Client, ClientOptions, MetricsCollector},
consumer::ConsumerBuilder,
error::StreamDeleteError,
error::{ConsumerCreateError, ProducerCreateError, StreamDeleteError},
producer::ProducerBuilder,
stream_creator::StreamCreator,
superstream::RoutingStrategy,
Expand All @@ -36,6 +39,114 @@ impl Environment {
Ok(Environment { options })
}

pub async fn create_producer_client(
self,
stream: &str,
client_provided_name: String,
) -> Result<Client, ProducerCreateError> {
let mut opt_with_client_provided_name = self.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = client_provided_name.clone();

let mut client = self
.create_client_with_options(opt_with_client_provided_name.clone())
.await?;

if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
tracing::debug!(
"Connecting to leader node {:?} of stream {}",
metadata.leader,
stream
);
let load_balancer_mode = self.options.client_options.load_balancer_mode;
if load_balancer_mode {
// Producer must connect to leader node
let options: ClientOptions = self.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == metadata.leader.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..opt_with_client_provided_name.clone()
})
.await?
};
} else {
return Err(ProducerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

Ok(client)
}

pub async fn create_consumer_client(
self,
stream: &str,
client_provided_name: String,
) -> Result<Client, ConsumerCreateError> {
let mut opt_with_client_provided_name = self.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = client_provided_name.clone();

let mut client = self
.create_client_with_options(opt_with_client_provided_name.clone())
.await?;

if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
// If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
// This is desired behavior in case there is only one node in the cluster.
if let Some(replica) = metadata.replicas.choose(&mut StdRng::from_entropy()) {
tracing::debug!(
"Picked replica {:?} out of possible candidates {:?} for stream {}",
replica,
metadata.replicas,
stream
);
let load_balancer_mode = self.options.client_options.load_balancer_mode;
if load_balancer_mode {
let options = self.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == replica.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: replica.host.clone(),
port: replica.port as u16,
..self.options.client_options
})
.await?;
}
}
} else {
return Err(ConsumerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

Ok(client)
}

/// Returns a builder for creating a stream with a specific configuration
pub fn stream_creator(&self) -> StreamCreator {
StreamCreator::new(self.clone())
Expand Down
50 changes: 6 additions & 44 deletions src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::future::Future;
use std::vec;
use std::{
marker::PhantomData,
sync::{
Expand All @@ -19,7 +18,7 @@ use rabbitmq_stream_protocol::{message::Message, ResponseCode, ResponseKind};

use crate::client::ClientMessage;
use crate::MetricsCollector;
use crate::{client::MessageHandler, ClientOptions, RabbitMQStreamResult};
use crate::{client::MessageHandler, RabbitMQStreamResult};
use crate::{
client::{Client, MessageResult},
environment::Environment,
Expand Down Expand Up @@ -119,14 +118,15 @@ impl<T> ProducerBuilder<T> {
// The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
// to the leader anyway - it is the only one capable of writing.

let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
let metrics_collector = self.environment.options.client_options.collector.clone();

let mut client = self
let client_unwrapped = self
.environment
.create_client_with_options(opt_with_client_provided_name.clone())
.create_producer_client(stream, self.client_provided_name.clone())
.await?;

let client = client_unwrapped;

let mut publish_version = 1;

if self.filter_value_extractor.is_some() {
Expand All @@ -137,44 +137,6 @@ impl<T> ProducerBuilder<T> {
}
}

let metrics_collector = self.environment.options.client_options.collector.clone();
if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
tracing::debug!(
"Connecting to leader node {:?} of stream {}",
metadata.leader,
stream
);
let load_balancer_mode = self.environment.options.client_options.load_balancer_mode;
if load_balancer_mode {
// Producer must connect to leader node
let options: ClientOptions = self.environment.options.client_options.clone();
loop {
let temp_client = Client::connect(options.clone()).await?;
let mapping = temp_client.connection_properties().await;
if let Some(advertised_host) = mapping.get("advertised_host") {
if *advertised_host == metadata.leader.host.clone() {
client.close().await?;
client = temp_client;
break;
}
}
temp_client.close().await?;
}
} else {
client.close().await?;
client = Client::connect(ClientOptions {
host: metadata.leader.host.clone(),
port: metadata.leader.port as u16,
..opt_with_client_provided_name.clone()
})
.await?
};
} else {
return Err(ProducerCreateError::StreamDoesNotExist {
stream: stream.into(),
});
}

let waiting_confirmations: WaiterMap = Arc::new(DashMap::new());

let confirm_handler = ProducerConfirmHandler {
Expand Down
Loading