-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(amqp sink): attempt one reconnect when channel has errored #22971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(amqp sink): attempt one reconnect when channel has errored #22971
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aramperes
src/sinks/amqp/channel.rs
Outdated
} | ||
|
||
info!( | ||
message = "Recovering broken connection to the AMQP broker.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can add a URL tag here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that is a good idea since it could print the password:
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovered connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
Ok sounds good, I will try a proof of concept and see if it provides value. The official |
@pront I've integrated I tested the recovery behavior and it is the same as with my previous implementation. When picking a Channel from the pool, it will check its current state. If it is not Logs:
|
Hi @pront this is ready for review when you have some time, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aramperes, this looks great
src/sinks/amqp/channel.rs
Outdated
pub(super) fn new_channel_pool(config: &AmqpSinkConfig) -> crate::Result<AmqpSinkChannels> { | ||
let channel_manager = AmqpSinkChannelManager::new(&config.connection); | ||
let channels = Pool::builder(channel_manager) | ||
.max_size(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this configurable and default to 4
. Also, let's mention it in the changelog.
e4e33f3
to
d1c0726
Compare
Head branch was pushed to by a user without write access
@pront There was a formatting issue with the docs, could you re-trigger the CI? 🙏 |
Done. But I wonder if this should have been auto-generated with |
db683a1
to
01c5278
Compare
Ahhh. I tried running I updated the |
Summary
Improves the resilience of the
amqp sink
by attempting to re-create a brokenChannel
when sending new events.This is a simple implementation that could be extended later or in this PR, depending on the Vector team's preference & vision. The current implementation:
tower
retry mechanism in some way;amqp source
, which would be more complex to re-create the listeners.Lapin does not have a built-in reconnection mechanism and it is up to the client to implement this. See amqp-rs/lapin#70, amqp-rs/lapin#389
Implementation
A new
AmqpChannel
struct wraps atokio::sync::RwLock<lapin::Channel>
. This read-write lock is used to lock during the re-connection to prevent concurrent attempts.The API could be further improved by making
crate::amqp::AmqpConfig::connect()
return anAmqpChannel
directly, but that would also affect theamqp source
.Change Type
Is this a breaking change?
How did you test this PR?
amqp
sink. Example:Error
state. If the RabbitMQ process hasn't booted up and the re-connection failed, the event is dropped. Otherwise, the channel is re-established and the event is sent to the new channel.Example of the re-connection succeeding:
Example of re-connection failing:
Does this PR include user facing changes?
Notes
@vectordotdev/vector
to reach out to us regarding this PR.pre-push
hook, please see this template.cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)./scripts/check_changelog_fragments.sh
git merge origin master
andgit push
.Cargo.lock
), pleaserun
cargo vdev build licenses
to regenerate the license inventory and commit the changes (if any). More details here.References
Closes #22313