Skip to content

fix(nats sink): fix vector exiting if nats sink url fails dns resolution or is unavailable without --require-healthy #23167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions changelog.d/22914_fix_nats_sink_ignored_healthcheck.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed crash if nats sink url is unresolvable or unavailable even when --require-healthy is not set.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Fixed crash if nats sink url is unresolvable or unavailable even when --require-healthy is not set.
The `nats` sink now does not return an error when an unresolvable or unavailable URL is provided. Note that if `--require-healthy` is set then Vector will stop on startup.


authors: rdwr-tomers
18 changes: 13 additions & 5 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,10 @@ impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions {
}

impl NatsSinkConfig {
pub(super) async fn connect(&self) -> Result<async_nats::Client, NatsError> {
let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;

pub(super) async fn connect(
&self,
options: async_nats::ConnectOptions,
) -> Result<async_nats::Client, NatsError> {
let urls = self.parse_server_addresses()?;
options.connect(urls).await.context(ConnectSnafu)
}
Expand All @@ -158,7 +159,9 @@ impl NatsSinkConfig {
}

pub(super) async fn publisher(&self) -> Result<NatsPublisher, NatsError> {
let connection = self.connect().await?;
let mut options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
options = options.retry_on_initial_connect();
let connection = self.connect(options).await?;

if self.jetstream {
Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
Expand All @@ -171,7 +174,12 @@ impl NatsSinkConfig {
}

async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> {
config.connect().map_ok(|_| ()).map_err(|e| e.into()).await
let options: async_nats::ConnectOptions = (&config).try_into().context(ConfigSnafu)?;
config
.connect(options)
.map_ok(|_| ())
.map_err(|e| e.into())
.await
}

pub enum NatsPublisher {
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/nats/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;
use vector_lib::codecs::TextSerializerConfig;

use super::{config::NatsSinkConfig, sink::NatsSink, NatsError};
use super::{config::NatsSinkConfig, sink::NatsSink, ConfigSnafu, NatsError};
use crate::{
nats::{
NatsAuthConfig, NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword,
Expand All @@ -13,6 +13,7 @@ use crate::{
},
tls::TlsEnableableConfig,
};
use snafu::ResultExt;

async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> {
// Publish `N` messages to NATS.
Expand All @@ -26,9 +27,10 @@ async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> {

// Establish the consumer subscription.
let subject = conf.subject.clone();
let options: async_nats::ConnectOptions = (&conf).try_into().context(ConfigSnafu)?;
let consumer = conf
.clone()
.connect()
.connect(options)
.await
.expect("failed to connect with test consumer");
let mut sub = consumer
Expand Down
Loading