Skip to content

fix(nats sink): fix vector exiting if nats sink url fails dns resolution or is unavailable without --require-healthy #23167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions changelog.d/22914_fix_nats_sink_ignored_healthcheck.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed crash if nats sink url is unresolvable or unavailable even when --require-healthy is not set.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Fixed crash if nats sink url is unresolvable or unavailable even when --require-healthy is not set.
The `nats` sink now does not return an error when an unresolvable or unavailable URL is provided. Note that if `--require-healthy` is set then Vector will stop on startup.


authors: rdwr-tomers
16 changes: 12 additions & 4 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions {
}

impl NatsSinkConfig {
pub(super) async fn connect(&self) -> Result<async_nats::Client, NatsError> {
let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
pub(super) async fn connect(&self, with_retry: bool) -> Result<async_nats::Client, NatsError> {
let mut options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
options = match with_retry {
false => options,
true => options.retry_on_initial_connect(),
};

let urls = self.parse_server_addresses()?;
options.connect(urls).await.context(ConnectSnafu)
Expand All @@ -158,7 +162,7 @@ impl NatsSinkConfig {
}

pub(super) async fn publisher(&self) -> Result<NatsPublisher, NatsError> {
let connection = self.connect().await?;
let connection = self.connect(true).await?;

if self.jetstream {
Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
Expand All @@ -171,7 +175,11 @@ impl NatsSinkConfig {
}

async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> {
config.connect().map_ok(|_| ()).map_err(|e| e.into()).await
config
.connect(false)
.map_ok(|_| ())
.map_err(|e| e.into())
.await
}

pub enum NatsPublisher {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/nats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> {
let subject = conf.subject.clone();
let consumer = conf
.clone()
.connect()
.connect(false)
.await
.expect("failed to connect with test consumer");
let mut sub = consumer
Expand Down
Loading