Skip to content

Commit 2dda860

Browse files
fix: ensure streams have updated subjects
Signed-off-by: Brooks Townsend <brooksmtownsend@gmail.com>
1 parent 9243793 commit 2dda860

File tree

1 file changed

+27
-11
lines changed

1 file changed

+27
-11
lines changed

bin/nats.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use async_nats::{
1111
Client, ConnectOptions,
1212
};
1313

14+
use tracing::warn;
1415
use wadm::DEFAULT_EXPIRY_TIME;
1516

1617
/// Creates a NATS client from the given options
@@ -120,18 +121,33 @@ pub async fn ensure_stream(
120121
subjects: Vec<String>,
121122
description: Option<String>,
122123
) -> Result<Stream> {
124+
let stream_config = StreamConfig {
125+
name: name.clone(),
126+
description,
127+
num_replicas: 1,
128+
retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
129+
subjects,
130+
max_age: DEFAULT_EXPIRY_TIME,
131+
storage: async_nats::jetstream::stream::StorageType::File,
132+
allow_rollup: false,
133+
..Default::default()
134+
};
135+
136+
if let Ok(stream) = context.get_stream(&name).await {
137+
// For now, we only check if the subjects are the same in order to make sure that
138+
// newer versions of wadm adjust subjects appropriately. In the case that developers
139+
// want to alter the storage or replicas of a stream, for example,
140+
// we don't want to override that configuration.
141+
if stream.cached_info().config.subjects == stream_config.subjects {
142+
return Ok(stream);
143+
} else {
144+
warn!("Found stream {name} with different configuration, deleting and recreating");
145+
context.delete_stream(name).await?;
146+
}
147+
}
148+
123149
context
124-
.get_or_create_stream(StreamConfig {
125-
name,
126-
description,
127-
num_replicas: 1,
128-
retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
129-
subjects,
130-
max_age: DEFAULT_EXPIRY_TIME,
131-
storage: async_nats::jetstream::stream::StorageType::File,
132-
allow_rollup: false,
133-
..Default::default()
134-
})
150+
.get_or_create_stream(stream_config)
135151
.await
136152
.map_err(|e| anyhow::anyhow!("{e:?}"))
137153
}

0 commit comments

Comments
 (0)