Skip to content

Commit c21d0d0

Browse files
Merge pull request #130 from wasmCloud/bump/nats-nkeys
Bumped nats and nkeys to fix update bug
2 parents 640515e + 059773d commit c21d0d0

File tree

11 files changed

+732
-621
lines changed

11 files changed

+732
-621
lines changed

Cargo.lock

Lines changed: 646 additions & 557 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "wadm"
33
description = "wasmCloud Application Deployment Manager: A tool for running Wasm applications in wasmCloud"
4-
version = "0.4.0"
4+
version = "0.4.1"
55
edition = "2021"
66
authors = ["wasmCloud Team"]
77
keywords = ["webassembly", "wasmcloud", "wadm"]
@@ -24,7 +24,7 @@ _e2e_tests = []
2424

2525
[dependencies]
2626
anyhow = "1"
27-
async-nats = "0.29"
27+
async-nats = "0.30"
2828
async-trait = "0.1"
2929
atty = { version = "0.2", optional = true }
3030
bytes = "1"
@@ -34,7 +34,7 @@ cloudevents-sdk = "0.7"
3434
futures = "0.3"
3535
indexmap = { version = "1", features = ["serde-1"] }
3636
lazy_static = "1"
37-
nkeys = "0.2.0"
37+
nkeys = "0.3.0"
3838
# One version back to avoid clashes with 0.10 of otlp
3939
opentelemetry = { version = "0.17", features = ["rt-tokio"], optional = true }
4040
# 0.10 to avoid protoc dep
@@ -59,8 +59,8 @@ tracing-subscriber = { version = "0.3.7", features = [
5959
"json",
6060
], optional = true }
6161
uuid = "1"
62-
wasmbus-rpc = "0.13"
63-
wasmcloud-control-interface = "0.25"
62+
wasmbus-rpc = "0.14"
63+
wasmcloud-control-interface = "0.27"
6464
semver = { version = "1.0.16", features = ["serde"] }
6565

6666
[dev-dependencies]

src/consumers/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl Stream for CommandConsumer {
7878
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
7979
match self.stream.try_poll_next_unpin(cx) {
8080
Poll::Ready(None) => Poll::Ready(None),
81-
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
81+
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
8282
Poll::Ready(Some(Ok(msg))) => {
8383
// Convert to our event type, skipping if we can't do it (and looping around to
8484
// try the next poll)

src/consumers/events.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl Stream for EventConsumer {
7878
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
7979
match self.stream.try_poll_next_unpin(cx) {
8080
Poll::Ready(None) => Poll::Ready(None),
81-
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
81+
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
8282
Poll::Ready(Some(Ok(msg))) => {
8383
// Parse as a cloud event, skipping if we can't do it (and looping around to try
8484
// the next poll)

src/scaler/spreadscaler/link.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ impl<S: ReadStore + Send + Sync, L: LinkSource> LinkScaler<S, L> {
266266

267267
#[cfg(test)]
268268
mod test {
269-
use wasmbus_rpc::core::LinkDefinition;
269+
use wasmcloud_control_interface::LinkDefinition;
270270

271271
use super::*;
272272

src/scaler/spreadscaler/mod.rs

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorSpreadScaler<S> {
179179
model_name: self.config.model_name.to_owned(),
180180
annotations: spreadscaler_annotations(&spread.name, self.id()),
181181
})])
182-
}
182+
}
183183
// Stop actors to reach desired replicas
184184
Ordering::Greater => {
185185
let count_to_stop = current_count - count;
@@ -1064,7 +1064,7 @@ mod test {
10641064
let real_spread = SpreadScalerProperty {
10651065
// Makes it so we always get at least 2 commands
10661066
replicas: 9,
1067-
spread: Vec::new()
1067+
spread: Vec::new(),
10681068
};
10691069

10701070
let spreadscaler = ActorSpreadScaler::new(
@@ -1081,18 +1081,20 @@ mod test {
10811081
.store_many(
10821082
lattice_id,
10831083
[
1084-
(host_id.to_string(),
1085-
Host {
1086-
actors: HashMap::from_iter([(actor_id.to_string(), 10)]),
1087-
friendly_name: "hey".to_string(),
1088-
labels: HashMap::new(),
1089-
annotations: HashMap::new(),
1090-
providers: HashSet::new(),
1091-
uptime_seconds: 123,
1092-
version: None,
1093-
id: host_id.to_string(),
1094-
last_seen: Utc::now(),
1095-
}),
1084+
(
1085+
host_id.to_string(),
1086+
Host {
1087+
actors: HashMap::from_iter([(actor_id.to_string(), 10)]),
1088+
friendly_name: "hey".to_string(),
1089+
labels: HashMap::new(),
1090+
annotations: HashMap::new(),
1091+
providers: HashSet::new(),
1092+
uptime_seconds: 123,
1093+
version: None,
1094+
id: host_id.to_string(),
1095+
last_seen: Utc::now(),
1096+
},
1097+
),
10961098
(
10971099
host_id2.to_string(),
10981100
Host {
@@ -1105,10 +1107,9 @@ mod test {
11051107
version: None,
11061108
id: host_id2.to_string(),
11071109
last_seen: Utc::now(),
1108-
}
1109-
)
1110-
]
1111-
1110+
},
1111+
),
1112+
],
11121113
)
11131114
.await?;
11141115

@@ -1122,17 +1123,22 @@ mod test {
11221123
capabilities: vec![],
11231124
issuer: "AASDASDASDASD".to_string(),
11241125
call_alias: None,
1125-
instances: HashMap::from_iter([(
1126-
host_id.to_string(),
1127-
HashSet::from_iter((0..10).map(|n| WadmActorInstance {
1128-
instance_id: format!("{n}"),
1129-
annotations: spreadscaler_annotations("default", spreadscaler.id()),
1130-
})),
1131-
),
1132-
(host_id2.to_string(), HashSet::from_iter((0..10).map(|n| WadmActorInstance {
1133-
instance_id: format!("blah{n}"),
1134-
annotations: spreadscaler_annotations("default", spreadscaler.id()),
1135-
})))]),
1126+
instances: HashMap::from_iter([
1127+
(
1128+
host_id.to_string(),
1129+
HashSet::from_iter((0..10).map(|n| WadmActorInstance {
1130+
instance_id: format!("{n}"),
1131+
annotations: spreadscaler_annotations("default", spreadscaler.id()),
1132+
})),
1133+
),
1134+
(
1135+
host_id2.to_string(),
1136+
HashSet::from_iter((0..10).map(|n| WadmActorInstance {
1137+
instance_id: format!("blah{n}"),
1138+
annotations: spreadscaler_annotations("default", spreadscaler.id()),
1139+
})),
1140+
),
1141+
]),
11361142
reference: actor_reference.to_string(),
11371143
},
11381144
)
@@ -1142,20 +1148,26 @@ mod test {
11421148
// which one will have more stopped, but both should show up
11431149
let cmds = spreadscaler.reconcile().await?;
11441150
assert_eq!(cmds.len(), 2);
1145-
assert!(cmds.iter().any(|command| {
1146-
if let Command::StopActor(actor) = command {
1147-
actor.host_id == host_id
1148-
} else {
1149-
false
1150-
}
1151-
}), "Should have found both hosts for stopping commands");
1152-
assert!(cmds.iter().any(|command| {
1153-
if let Command::StopActor(actor) = command {
1154-
actor.host_id == host_id2
1155-
} else {
1156-
false
1157-
}
1158-
}), "Should have found both hosts for stopping commands");
1151+
assert!(
1152+
cmds.iter().any(|command| {
1153+
if let Command::StopActor(actor) = command {
1154+
actor.host_id == host_id
1155+
} else {
1156+
false
1157+
}
1158+
}),
1159+
"Should have found both hosts for stopping commands"
1160+
);
1161+
assert!(
1162+
cmds.iter().any(|command| {
1163+
if let Command::StopActor(actor) = command {
1164+
actor.host_id == host_id2
1165+
} else {
1166+
false
1167+
}
1168+
}),
1169+
"Should have found both hosts for stopping commands"
1170+
);
11591171

11601172
// Now check that cleanup removes everything
11611173
let cmds = spreadscaler.cleanup().await?;

src/server/storage.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,16 @@ impl ModelStorage {
7373
trace!(%key, "Storing manifest at key");
7474
let data = serde_json::to_vec(&model).map_err(anyhow::Error::from)?;
7575
if let Some(revision) = current_revision {
76-
self.store.update(&key, data.into(), revision).await
76+
self.store
77+
.update(&key, data.into(), revision)
78+
.await
79+
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
7780
} else {
78-
self.store.put(&key, data.into()).await
81+
self.store
82+
.put(&key, data.into())
83+
.await
84+
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
7985
}
80-
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
8186

8287
trace!("Adding model to set");
8388
self.retry_model_update(lattice_id, ModelNameOperation::Add(model.name()))

src/storage/nats_kv.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,24 @@ impl NatsKvStore {
6969
let key = generate_key::<T>(lattice_id);
7070
tracing::Span::current().record("key", &key);
7171
debug!("Fetching data from store");
72-
match self.store.entry(key).await? {
73-
Some(entry) if !matches!(entry.operation, Operation::Delete | Operation::Purge) => {
72+
match self.store.entry(key).await {
73+
Ok(Some(entry)) if !matches!(entry.operation, Operation::Delete | Operation::Purge) => {
7474
trace!(len = %entry.value.len(), "Fetched bytes from store...deserializing");
7575
serde_json::from_slice::<'_, HashMap<String, T>>(&entry.value)
7676
.map(|d| (d, entry.revision))
7777
.map_err(NatsStoreError::from)
7878
}
7979
// If it was a delete entry, we still need to return the revision
80-
Some(entry) => {
80+
Ok(Some(entry)) => {
8181
trace!("Data was deleted, returning last revision");
8282
debug!("No data found for key, returning empty");
8383
Ok((HashMap::with_capacity(0), entry.revision))
8484
}
85-
None => {
85+
Ok(None) => {
8686
debug!("No data found for key, returning empty");
8787
Ok((HashMap::with_capacity(0), 0))
8888
}
89+
Err(e) => Err(NatsStoreError::Nats(e.into())),
8990
}
9091
}
9192
}
@@ -179,7 +180,7 @@ impl Store for NatsKvStore {
179180
.update(key, serialized.into(), revision)
180181
.await
181182
.map(|_| ())
182-
.map_err(NatsStoreError::from)
183+
.map_err(|e| NatsStoreError::Nats(e.into()))
183184
}
184185

185186
#[instrument(level = "debug", skip(self, data), fields(key = Empty))]
@@ -222,7 +223,7 @@ impl Store for NatsKvStore {
222223
.update(key, serialized.into(), revision)
223224
.await
224225
.map(|_| ())
225-
.map_err(NatsStoreError::from)
226+
.map_err(|e| NatsStoreError::Nats(e.into()))
226227
}
227228
}
228229

src/test_util.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use std::{collections::HashMap, sync::Arc};
33

44
use serde::{de::DeserializeOwned, Serialize};
55
use tokio::sync::RwLock;
6-
use wasmbus_rpc::core::LinkDefinition;
7-
use wasmcloud_control_interface::HostInventory;
6+
use wasmcloud_control_interface::{HostInventory, LinkDefinition};
87

98
use crate::publisher::Publisher;
109
use crate::storage::StateKind;

src/workers/event_helpers.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::collections::HashMap;
22

33
use tracing::{instrument, warn};
4-
use wasmbus_rpc::core::LinkDefinition;
5-
use wasmcloud_control_interface::HostInventory;
4+
use wasmcloud_control_interface::{HostInventory, LinkDefinition};
65

76
use crate::{commands::Command, publisher::Publisher, APP_SPEC_ANNOTATION};
87

tests/event_consumer_integration.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ async fn test_event_stream() -> Result<()> {
116116
provider.public_key
117117
);
118118
} else {
119+
println!("EVT: {:?}", evt);
119120
panic!("Event wasn't an provider started event");
120121
}
121122
evt.ack().await.expect("Should be able to ack event");
@@ -307,6 +308,11 @@ async fn wait_for_event(
307308
Event::HostHeartbeat(_)
308309
| Event::ProviderHealthCheckPassed(_)
309310
| Event::ProviderHealthCheckFailed(_)
311+
// NOTE(brooksmtownsend): Ignoring the plural actor event for now as this test
312+
// is more for the event stream than scalers. When we use plural events to
313+
// synthesize lattice state, this should be changed to the singular event
314+
| Event::ActorsStarted(_)
315+
| Event::ActorsStopped(_)
310316
) {
311317
evt.ack().await.expect("Should be able to ack message");
312318
// Just a copy paste here so we don't have to deal with async recursion

0 commit comments

Comments
 (0)