Skip to content

Commit e370ac2

Browse files
feat(linkscaler): add interest in LinkdefSet
Signed-off-by: Brooks Townsend <brooks@cosmonic.com> address PR feedback, fix tests Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
1 parent 2256c8d commit e370ac2

File tree

16 files changed

+111
-55
lines changed

16 files changed

+111
-55
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
/target
22
test/e2e_log/
3+
4+
*.dump

Cargo.lock

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ uuid = "1"
5555
wasmbus-rpc = "0.14"
5656
wasmcloud-control-interface = "0.28.1"
5757
semver = { version = "1.0.16", features = ["serde"] }
58+
regex = "1.9.3"
5859
base64 = "0.21.2"
5960

6061
[dev-dependencies]

bin/main.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ use std::path::PathBuf;
22
use std::sync::Arc;
33
use std::time::Duration;
44

5-
use async_nats::jetstream::{
6-
stream::{RetentionPolicy, Stream},
7-
Context,
8-
};
5+
use async_nats::jetstream::{stream::Stream, Context};
96
use clap::Parser;
107
use tokio::sync::Semaphore;
118
use tracing::log::debug;
@@ -202,9 +199,6 @@ async fn main() -> anyhow::Result<()> {
202199
"A stream that stores all events coming in on the wasmbus.evt topics in a cluster"
203200
.to_string(),
204201
),
205-
RetentionPolicy::WorkQueue,
206-
None,
207-
None,
208202
)
209203
.await?;
210204

@@ -215,20 +209,13 @@ async fn main() -> anyhow::Result<()> {
215209
COMMAND_STREAM_NAME.to_owned(),
216210
vec![DEFAULT_COMMANDS_TOPIC.to_owned()],
217211
Some("A stream that stores all commands for wadm".to_string()),
218-
RetentionPolicy::WorkQueue,
219-
None,
220-
None,
221212
)
222213
.await?;
223214

224-
let status_stream = nats::ensure_stream(
215+
let status_stream = nats::ensure_status_stream(
225216
&context,
226217
STATUS_STREAM_NAME.to_owned(),
227218
vec![DEFAULT_STATUS_TOPIC.to_owned()],
228-
Some("A stream that stores all status updates for wadm applications".to_string()),
229-
RetentionPolicy::Limits,
230-
Some(std::time::Duration::from_nanos(0)),
231-
Some(10),
232219
)
233220
.await?;
234221

@@ -249,9 +236,6 @@ async fn main() -> anyhow::Result<()> {
249236
mirror_stream.to_owned(),
250237
event_stream_topics.clone(),
251238
Some("A stream that publishes all events to the same stream".to_string()),
252-
RetentionPolicy::WorkQueue,
253-
None,
254-
None,
255239
)
256240
.await?;
257241

bin/nats.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use std::path::PathBuf;
2-
use std::time::Duration;
32

43
use anyhow::{anyhow, Result};
54
use async_nats::{
65
jetstream::{
76
self,
87
kv::{Config as KvConfig, Store},
9-
stream::{Config as StreamConfig, RetentionPolicy, Stream},
8+
stream::{Config as StreamConfig, Stream},
109
Context,
1110
},
1211
Client, ConnectOptions,
@@ -111,19 +110,15 @@ pub async fn ensure_stream(
111110
name: String,
112111
subjects: Vec<String>,
113112
description: Option<String>,
114-
retention: RetentionPolicy,
115-
max_age: Option<Duration>,
116-
max_messages_per_subject: Option<i64>,
117113
) -> Result<Stream> {
118114
context
119115
.get_or_create_stream(StreamConfig {
120116
name,
121117
description,
122118
num_replicas: 1,
123-
retention,
119+
retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
124120
subjects,
125-
max_messages_per_subject: max_messages_per_subject.unwrap_or(0),
126-
max_age: max_age.unwrap_or(DEFAULT_EXPIRY_TIME),
121+
max_age: DEFAULT_EXPIRY_TIME,
127122
storage: async_nats::jetstream::stream::StorageType::File,
128123
allow_rollup: false,
129124
..Default::default()
@@ -132,6 +127,30 @@ pub async fn ensure_stream(
132127
.map_err(|e| anyhow::anyhow!("{e:?}"))
133128
}
134129

130+
pub async fn ensure_status_stream(
131+
context: &Context,
132+
name: String,
133+
subjects: Vec<String>,
134+
) -> Result<Stream> {
135+
context
136+
.get_or_create_stream(StreamConfig {
137+
name,
138+
description: Some(
139+
"A stream that stores all status updates for wadm applications".into(),
140+
),
141+
num_replicas: 1,
142+
allow_direct: true,
143+
retention: async_nats::jetstream::stream::RetentionPolicy::Limits,
144+
max_messages_per_subject: 10,
145+
subjects,
146+
max_age: std::time::Duration::from_nanos(0),
147+
storage: async_nats::jetstream::stream::StorageType::File,
148+
..Default::default()
149+
})
150+
.await
151+
.map_err(|e| anyhow::anyhow!("{e:?}"))
152+
}
153+
135154
/// A helper that ensures that the notify stream exists
136155
pub async fn ensure_notify_stream(
137156
context: &Context,

src/scaler/simplescaler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for SimpleActorScaler<S> {
8585
config,
8686
store: self.store.clone(),
8787
id: self.id.clone(),
88-
status: StatusInfo::compensating("Cleaning up"),
88+
status: StatusInfo::compensating(""),
8989
};
9090

9191
cleanerupper.compute_actor_commands(&self.store).await
@@ -112,7 +112,7 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
112112
model_name,
113113
},
114114
id,
115-
status: StatusInfo::compensating("Initializing simplescaler"),
115+
status: StatusInfo::compensating(""),
116116
}
117117
}
118118

src/scaler/spreadscaler/link.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use tracing::{instrument, trace};
77

88
use crate::{
99
commands::{Command, DeleteLinkdef, PutLinkdef},
10-
events::{Event, LinkdefDeleted, ProviderHealthCheckPassed, ProviderHealthCheckStatus},
10+
events::{
11+
Event, LinkdefDeleted, LinkdefSet, ProviderHealthCheckPassed, ProviderHealthCheckStatus,
12+
},
1113
model::TraitProperty,
1214
scaler::Scaler,
1315
server::StatusInfo,
@@ -94,6 +96,7 @@ where
9496
self.reconcile().await
9597
}
9698
Event::LinkdefDeleted(LinkdefDeleted { linkdef })
99+
| Event::LinkdefSet(LinkdefSet { linkdef })
97100
if linkdef.contract_id == self.config.provider_contract_id
98101
&& linkdef.actor_id == self.actor_id().await.unwrap_or_default()
99102
&& linkdef.provider_id == self.provider_id().await.unwrap_or_default()
@@ -152,7 +155,9 @@ where
152155
// };
153156

154157
let commands = if !exists {
155-
*self.status.write().await = StatusInfo::compensating("Creating link definition");
158+
*self.status.write().await = StatusInfo::compensating(&format!(
159+
"Putting link definition between {actor_id} and {provider_id}"
160+
));
156161
vec![Command::PutLinkdef(PutLinkdef {
157162
actor_id: actor_id.to_owned(),
158163
provider_id: provider_id.to_owned(),
@@ -227,7 +232,7 @@ impl<S: ReadStore + Send + Sync, L: LinkSource> LinkScaler<S, L> {
227232
},
228233
ctl_client,
229234
id,
230-
status: RwLock::new(StatusInfo::compensating("Initializing")),
235+
status: RwLock::new(StatusInfo::compensating("")),
231236
}
232237
}
233238

src/scaler/spreadscaler/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorSpreadScaler<S> {
257257
spread_requirements,
258258
actor_id: self.actor_id.clone(),
259259
id: self.id.clone(),
260-
status: RwLock::new(StatusInfo::compensating("Cleaning up")),
260+
status: RwLock::new(StatusInfo::compensating("")),
261261
};
262262

263263
cleanerupper.reconcile().await
@@ -286,7 +286,7 @@ impl<S: ReadStore + Send + Sync> ActorSpreadScaler<S> {
286286
model_name,
287287
},
288288
id,
289-
status: RwLock::new(StatusInfo::compensating("Initializing")),
289+
status: RwLock::new(StatusInfo::compensating("")),
290290
}
291291
}
292292

src/scaler/spreadscaler/provider.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderSpreadScaler<S> {
225225
.collect::<Vec<Command>>();
226226

227227
if commands.len() < num_to_start {
228+
// NOTE(brooksmtownsend): We're reporting status for the entire spreadscaler here, not just this individual
229+
// command, so we want to consider the providers that are already running for the spread over the
230+
// total expected count.
228231
let msg = format!("Could not satisfy spread {} for {}, {}/{} eligible hosts found.", spread.name, self.config.provider_reference, running_for_spread.len(), count);
229232
spread_status.push(StatusInfo::failed(&msg));
230233
}
@@ -268,7 +271,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderSpreadScaler<S> {
268271
spread_requirements,
269272
provider_id: self.provider_id.clone(),
270273
id: self.id.clone(),
271-
status: RwLock::new(StatusInfo::compensating("Cleaning up")),
274+
status: RwLock::new(StatusInfo::compensating("")),
272275
};
273276

274277
cleanerupper.reconcile().await
@@ -288,7 +291,7 @@ impl<S: ReadStore + Send + Sync> ProviderSpreadScaler<S> {
288291
provider_id: OnceCell::new(),
289292
config,
290293
id,
291-
status: RwLock::new(StatusInfo::compensating("Initializing")),
294+
status: RwLock::new(StatusInfo::compensating("")),
292295
}
293296
}
294297

src/server/handlers.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use async_nats::{jetstream::stream::Stream, Client, Message};
22
use base64::{engine::general_purpose::STANDARD as B64decoder, Engine};
3+
use regex::Regex;
34
use serde_json::json;
5+
use tokio::sync::OnceCell;
46
use tracing::{debug, error, instrument, log::warn, trace};
57

68
use crate::{
@@ -19,6 +21,8 @@ use super::{
1921
StatusResponse, StatusResult, UndeployModelRequest, VersionInfo, VersionResponse,
2022
};
2123

24+
static MANIFEST_NAME_REGEX: OnceCell<Regex> = OnceCell::const_new();
25+
2226
pub(crate) struct Handler<P> {
2327
pub(crate) store: ModelStorage,
2428
pub(crate) client: Client,
@@ -79,7 +83,12 @@ impl<P: Publisher> Handler<P> {
7983
}
8084

8185
let manifest_name = manifest.metadata.name.trim().to_string();
82-
if manifest_name.contains(' ') || manifest_name.contains('.') {
86+
if !MANIFEST_NAME_REGEX
87+
// SAFETY: We know this is valid Regex
88+
.get_or_init(|| async { Regex::new(r"^[-\w]+$").unwrap() })
89+
.await
90+
.is_match(&manifest_name)
91+
{
8392
self.send_error(
8493
msg.reply,
8594
format!(
@@ -784,6 +793,8 @@ impl<P: Publisher> Handler<P> {
784793
}
785794

786795
async fn get_manifest_status(&self, lattice_id: &str, name: &str) -> Option<StatusInfo> {
796+
// NOTE(brooksmtownsend): We're getting the last raw message instead of direct get here
797+
// to ensure we fetch the latest message from the cluster leader.
787798
match self
788799
.status_stream
789800
.get_last_raw_message_by_subject(&format!("wadm.status.{lattice_id}.{name}",))
@@ -799,3 +810,30 @@ impl<P: Publisher> Handler<P> {
799810
}
800811
}
801812
}
813+
814+
#[cfg(test)]
815+
mod test {
816+
#[tokio::test]
817+
async fn manifest_name_regex_works() {
818+
let regex = super::MANIFEST_NAME_REGEX
819+
.get_or_init(|| async { regex::Regex::new(r"^[-\w]+$").unwrap() })
820+
.await;
821+
822+
// Acceptable manifest names
823+
let word = "mymanifest";
824+
let word_with_dash = "my-manifest";
825+
let word_with_underscore = "my_manifest";
826+
let word_with_numbers = "mymanifest-v2-v3-final";
827+
828+
assert!(regex.is_match(word));
829+
assert!(regex.is_match(word_with_dash));
830+
assert!(regex.is_match(word_with_underscore));
831+
assert!(regex.is_match(word_with_numbers));
832+
833+
// Not acceptable manifest names
834+
let word_with_period = "my.manifest";
835+
let word_with_space = "my manifest";
836+
assert!(!regex.is_match(word_with_period));
837+
assert!(!regex.is_match(word_with_space));
838+
}
839+
}

src/workers/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ where
724724
.publish_status(data.manifest.metadata.name.as_ref(), status)
725725
.await
726726
{
727-
warn!(error = ?e, "Failed to set status to compensating");
727+
warn!("Failed to set manifest status: {e:}");
728728
};
729729

730730
// Now handle the result from reconciliation

src/workers/event_helpers.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,16 @@ impl LinkSource for wasmcloud_control_interface::Client {
100100
pub struct StatusPublisher<Pub> {
101101
publisher: Pub,
102102
// Topic prefix, e.g. wadm.status.default
103-
topic: String,
103+
topic_prefix: String,
104104
}
105105

106106
impl<Pub> StatusPublisher<Pub> {
107107
/// Creates an new status publisher configured with the given publisher that will send to the
108-
/// specified topic
109-
pub fn new(publisher: Pub, topic: &str) -> StatusPublisher<Pub> {
108+
/// manifest status topic using the given prefix
109+
pub fn new(publisher: Pub, topic_prefix: &str) -> StatusPublisher<Pub> {
110110
StatusPublisher {
111111
publisher,
112-
topic: topic.to_owned(),
112+
topic_prefix: topic_prefix.to_owned(),
113113
}
114114
}
115115
}
@@ -120,7 +120,7 @@ impl<Pub: Publisher> StatusPublisher<Pub> {
120120
self.publisher
121121
.publish(
122122
serde_json::to_vec(&status)?,
123-
Some(&format!("{}.{name}", self.topic)),
123+
Some(&format!("{}.{name}", self.topic_prefix)),
124124
)
125125
.await
126126
}

tests/e2e.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ pub fn check_providers(
509509
Ok(())
510510
}
511511

512-
pub(crate) async fn get_manifest_status(
512+
pub async fn get_manifest_status(
513513
stream: &Stream,
514514
lattice_id: &str,
515515
name: &str,

0 commit comments

Comments
 (0)