Skip to content

Commit 9bd9e9b

Browse files
feat(*): Update to newest control client
This bumps the control client version and removes some of the previous work that was in rc.1 Signed-off-by: Taylor Thomas <taylor@cosmonic.com>
1 parent 9ca8b96 commit 9bd9e9b

11 files changed

+82
-157
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "wadm"
33
description = "wasmCloud Application Deployment Manager: A tool for running Wasm applications in wasmCloud"
4-
version = "0.8.0-rc.1"
4+
version = "0.8.0-rc.2"
55
edition = "2021"
66
authors = ["wasmCloud Team"]
77
keywords = ["webassembly", "wasmcloud", "wadm"]
@@ -63,7 +63,7 @@ tracing-subscriber = { version = "0.3.7", features = [
6363
"json",
6464
], optional = true }
6565
uuid = "1"
66-
wasmcloud-control-interface = "0.30"
66+
wasmcloud-control-interface = "0.31"
6767

6868
[dev-dependencies]
6969
serial_test = "1"

bin/connections.rs

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,44 @@
11
//! A module for connection pools and generators. This is needed because control interface clients
22
//! (and possibly other things like nats connections in the future) are lattice scoped or need
33
//! different credentials
4-
use std::marker::PhantomData;
5-
6-
use wasmcloud_control_interface::{
7-
kv::{Build, KvStore},
8-
Client, ClientBuilder,
9-
};
4+
use wasmcloud_control_interface::{Client, ClientBuilder};
105

116
// Copied from https://github.com/wasmCloud/control-interface-client/blob/main/src/broker.rs#L1, not public
127
const DEFAULT_TOPIC_PREFIX: &str = "wasmbus.ctl";
138

14-
#[derive(Debug, Default, Clone)]
15-
pub struct ControlClientConfig {
16-
/// The jetstream domain to use for the clients
17-
pub js_domain: Option<String>,
18-
/// The topic prefix to use for operations
19-
pub topic_prefix: Option<String>,
20-
}
21-
229
/// A client constructor for wasmCloud control interface clients, identified by a lattice ID
2310
// NOTE: Yes, this sounds java-y. Deal with it.
2411
#[derive(Clone)]
25-
pub struct ControlClientConstructor<T> {
12+
pub struct ControlClientConstructor {
2613
client: async_nats::Client,
27-
config: ControlClientConfig,
28-
marker: PhantomData<T>,
14+
/// The topic prefix to use for operations
15+
topic_prefix: Option<String>,
2916
}
3017

31-
impl<T: KvStore + Build + Clone> ControlClientConstructor<T> {
32-
/// Creates a new client pool that is all backed using the same NATS client. The given NATS
33-
/// client should be using credentials that can access all desired lattices.
18+
impl ControlClientConstructor {
19+
/// Creates a new client pool that is all backed using the same NATS client and an optional
20+
/// topic prefix. The given NATS client should be using credentials that can access all desired
21+
/// lattices.
3422
pub fn new(
3523
client: async_nats::Client,
36-
config: ControlClientConfig,
37-
) -> ControlClientConstructor<T> {
24+
topic_prefix: Option<String>,
25+
) -> ControlClientConstructor {
3826
ControlClientConstructor {
3927
client,
40-
config,
41-
marker: PhantomData,
28+
topic_prefix,
4229
}
4330
}
4431

4532
/// Get the client for the given lattice ID
46-
pub async fn get_connection(
47-
&self,
48-
id: &str,
49-
multitenant_prefix: Option<&str>,
50-
) -> anyhow::Result<Client<T>> {
51-
let builder: ClientBuilder<T> =
52-
ClientBuilder::new_generic(self.client.clone()).lattice_prefix(id);
53-
let builder = if let Some(domain) = self.config.js_domain.as_deref() {
54-
builder.js_domain(domain)
55-
} else {
56-
builder
57-
};
33+
pub fn get_connection(&self, id: &str, multitenant_prefix: Option<&str>) -> Client {
34+
let builder = ClientBuilder::new(self.client.clone()).lattice_prefix(id);
5835

5936
let builder = builder.topic_prefix(topic_prefix(
6037
multitenant_prefix,
61-
self.config.topic_prefix.as_deref(),
38+
self.topic_prefix.as_deref(),
6239
));
6340

64-
builder
65-
.build()
66-
.await
67-
.map_err(|e| anyhow::anyhow!("Error building client for {id}: {e:?}"))
41+
builder.build()
6842
}
6943
}
7044

bin/main.rs

Lines changed: 37 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ mod logging;
2727
mod nats;
2828
mod observer;
2929

30-
use connections::{ControlClientConfig, ControlClientConstructor};
31-
use wasmcloud_control_interface::kv::{CachedKvStore, DirectKvStore};
30+
use connections::ControlClientConstructor;
3231

3332
const EVENT_STREAM_NAME: &str = "wadm_events";
3433
const COMMAND_STREAM_NAME: &str = "wadm_commands";
@@ -181,22 +180,7 @@ async fn main() -> anyhow::Result<()> {
181180
.await?;
182181

183182
// TODO: We will probably need to set up all the flags (like lattice prefix and topic prefix) down the line
184-
let connection_pool: ControlClientConstructor<CachedKvStore> = ControlClientConstructor::new(
185-
client.clone(),
186-
ControlClientConfig {
187-
js_domain: args.domain.clone(),
188-
topic_prefix: None,
189-
},
190-
);
191-
192-
let direct_connection_pool: ControlClientConstructor<DirectKvStore> =
193-
ControlClientConstructor::new(
194-
client.clone(),
195-
ControlClientConfig {
196-
js_domain: args.domain,
197-
topic_prefix: None,
198-
},
199-
);
183+
let connection_pool = ControlClientConstructor::new(client.clone(), None);
200184

201185
let trimmer: &[_] = &['.', '>', '*'];
202186

@@ -286,7 +270,7 @@ async fn main() -> anyhow::Result<()> {
286270
let event_worker_creator = EventWorkerCreator {
287271
state_store: state_storage.clone(),
288272
manifest_store: manifest_storage.clone(),
289-
pool: connection_pool,
273+
pool: connection_pool.clone(),
290274
command_topic_prefix: DEFAULT_COMMANDS_TOPIC.trim_matches(trimmer).to_owned(),
291275
publisher: context.clone(),
292276
notify_stream,
@@ -302,7 +286,7 @@ async fn main() -> anyhow::Result<()> {
302286
debug!("Creating command consumer manager");
303287

304288
let command_worker_creator = CommandWorkerCreator {
305-
pool: direct_connection_pool,
289+
pool: connection_pool,
306290
};
307291
let commands_manager: ConsumerManager<CommandConsumer> = ConsumerManager::new(
308292
permit_pool.clone(),
@@ -363,30 +347,29 @@ async fn main() -> anyhow::Result<()> {
363347

364348
#[derive(Clone)]
365349
struct CommandWorkerCreator {
366-
pool: ControlClientConstructor<DirectKvStore>,
350+
pool: ControlClientConstructor,
367351
}
368352

369353
#[async_trait::async_trait]
370354
impl WorkerCreator for CommandWorkerCreator {
371-
type Output = CommandWorker<DirectKvStore>;
355+
type Output = CommandWorker;
372356

373357
async fn create(
374358
&self,
375359
lattice_id: &str,
376360
multitenant_prefix: Option<&str>,
377361
) -> anyhow::Result<Self::Output> {
378-
self.pool
379-
.get_connection(lattice_id, multitenant_prefix)
380-
.await
381-
.map(CommandWorker::new)
362+
let client = self.pool.get_connection(lattice_id, multitenant_prefix);
363+
364+
Ok(CommandWorker::new(client))
382365
}
383366
}
384367

385368
#[derive(Clone)]
386369
struct EventWorkerCreator<StateStore> {
387370
state_store: StateStore,
388371
manifest_store: async_nats::jetstream::kv::Store,
389-
pool: ControlClientConstructor<CachedKvStore>,
372+
pool: ControlClientConstructor,
390373
command_topic_prefix: String,
391374
publisher: Context,
392375
notify_stream: Stream,
@@ -397,49 +380,38 @@ impl<StateStore> WorkerCreator for EventWorkerCreator<StateStore>
397380
where
398381
StateStore: wadm::storage::Store + Send + Sync + Clone + 'static,
399382
{
400-
type Output =
401-
EventWorker<StateStore, wasmcloud_control_interface::Client<CachedKvStore>, Context>;
383+
type Output = EventWorker<StateStore, wasmcloud_control_interface::Client, Context>;
402384

403385
async fn create(
404386
&self,
405387
lattice_id: &str,
406388
multitenant_prefix: Option<&str>,
407389
) -> anyhow::Result<Self::Output> {
408-
match self
409-
.pool
410-
.get_connection(lattice_id, multitenant_prefix)
411-
.await
412-
{
413-
Ok(client) => {
414-
let command_publisher = CommandPublisher::new(
415-
self.publisher.clone(),
416-
&format!("{}.{lattice_id}", self.command_topic_prefix),
417-
);
418-
let status_publisher = StatusPublisher::new(
419-
self.publisher.clone(),
420-
&format!("wadm.status.{lattice_id}"),
421-
);
422-
let manager = ScalerManager::new(
423-
self.publisher.clone(),
424-
self.notify_stream.clone(),
425-
lattice_id,
426-
multitenant_prefix,
427-
self.state_store.clone(),
428-
self.manifest_store.clone(),
429-
command_publisher.clone(),
430-
status_publisher.clone(),
431-
client.clone(),
432-
)
433-
.await?;
434-
Ok(EventWorker::new(
435-
self.state_store.clone(),
436-
client,
437-
command_publisher,
438-
status_publisher,
439-
manager,
440-
))
441-
}
442-
Err(e) => Err(e),
443-
}
390+
let client = self.pool.get_connection(lattice_id, multitenant_prefix);
391+
let command_publisher = CommandPublisher::new(
392+
self.publisher.clone(),
393+
&format!("{}.{lattice_id}", self.command_topic_prefix),
394+
);
395+
let status_publisher =
396+
StatusPublisher::new(self.publisher.clone(), &format!("wadm.status.{lattice_id}"));
397+
let manager = ScalerManager::new(
398+
self.publisher.clone(),
399+
self.notify_stream.clone(),
400+
lattice_id,
401+
multitenant_prefix,
402+
self.state_store.clone(),
403+
self.manifest_store.clone(),
404+
command_publisher.clone(),
405+
status_publisher.clone(),
406+
client.clone(),
407+
)
408+
.await?;
409+
Ok(EventWorker::new(
410+
self.state_store.clone(),
411+
client,
412+
command_publisher,
413+
status_publisher,
414+
manager,
415+
))
444416
}
445417
}

src/workers/command.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use tracing::{instrument, trace};
2-
use wasmcloud_control_interface::{kv::KvStore, CtlOperationAck};
2+
use wasmcloud_control_interface::CtlOperationAck;
33

44
use crate::{
55
commands::*,
@@ -14,19 +14,19 @@ use super::insert_managed_annotations;
1414

1515
/// A worker implementation for handling incoming commands
1616
#[derive(Clone)]
17-
pub struct CommandWorker<T: Clone> {
18-
client: wasmcloud_control_interface::Client<T>,
17+
pub struct CommandWorker {
18+
client: wasmcloud_control_interface::Client,
1919
}
2020

21-
impl<T: Clone> CommandWorker<T> {
21+
impl CommandWorker {
2222
/// Creates a new command worker with the given connection pool.
23-
pub fn new(ctl_client: wasmcloud_control_interface::Client<T>) -> CommandWorker<T> {
23+
pub fn new(ctl_client: wasmcloud_control_interface::Client) -> CommandWorker {
2424
CommandWorker { client: ctl_client }
2525
}
2626
}
2727

2828
#[async_trait::async_trait]
29-
impl<T: KvStore + Clone + Send + Sync> Worker for CommandWorker<T> {
29+
impl Worker for CommandWorker {
3030
type Message = Command;
3131

3232
#[instrument(level = "trace", skip_all)]

src/workers/event_helpers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashMap};
22
use std::fmt::Debug;
33

44
use tracing::{instrument, warn};
5-
use wasmcloud_control_interface::{kv::KvStore, HostInventory, LinkDefinition};
5+
use wasmcloud_control_interface::{HostInventory, LinkDefinition};
66

77
use crate::{commands::Command, publisher::Publisher, server::StatusInfo, APP_SPEC_ANNOTATION};
88

@@ -43,7 +43,7 @@ pub trait LinkSource {
4343
}
4444

4545
#[async_trait::async_trait]
46-
impl<T: KvStore + Clone + Send + Sync> ClaimsSource for wasmcloud_control_interface::Client<T> {
46+
impl ClaimsSource for wasmcloud_control_interface::Client {
4747
async fn get_claims(&self) -> anyhow::Result<HashMap<String, Claims>> {
4848
Ok(self
4949
.get_claims()
@@ -72,7 +72,7 @@ impl<T: KvStore + Clone + Send + Sync> ClaimsSource for wasmcloud_control_interf
7272
}
7373

7474
#[async_trait::async_trait]
75-
impl<T: KvStore + Clone + Send + Sync> InventorySource for wasmcloud_control_interface::Client<T> {
75+
impl InventorySource for wasmcloud_control_interface::Client {
7676
async fn get_inventory(&self, host_id: &str) -> anyhow::Result<HostInventory> {
7777
Ok(self
7878
.get_host_inventory(host_id)
@@ -86,7 +86,7 @@ impl<T: KvStore + Clone + Send + Sync> InventorySource for wasmcloud_control_int
8686
// the KV store for updates. This would allow us to not have to fetch every time we need to get
8787
// links
8888
#[async_trait::async_trait]
89-
impl<T: KvStore + Clone + Send + Sync> LinkSource for wasmcloud_control_interface::Client<T> {
89+
impl LinkSource for wasmcloud_control_interface::Client {
9090
async fn get_links(&self) -> anyhow::Result<Vec<LinkDefinition>> {
9191
self.query_links()
9292
.await

test/docker-compose-e2e-multitenant.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ services:
3838

3939
# Have hosts in two different accounts
4040
wasmcloud_east:
41-
image: wasmcloud/wasmcloud:0.78.0
41+
image: wasmcloud/wasmcloud:0.79.0
4242
depends_on:
4343
- nats-leaf-a
4444
deploy:
@@ -51,7 +51,7 @@ services:
5151
WASMCLOUD_CLUSTER_SEED: SCAOGJWX53TGI4233T6GAXWYWBIB5ZDGPTCO6ODJQYELS52YCQCBQSRPA4
5252
HOST_region: us-brooks-east
5353
wasmcloud_west:
54-
image: wasmcloud/wasmcloud:0.78.0
54+
image: wasmcloud/wasmcloud:0.79.0
5555
depends_on:
5656
- nats-leaf-b
5757
deploy:

test/docker-compose-e2e-upgrade.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ services:
55
ports:
66
- 4222:4222
77
wasmcloud:
8-
image: wasmcloud/wasmcloud:0.78.0
8+
image: wasmcloud/wasmcloud:0.79.0
99
depends_on:
1010
- nats
1111
deploy:

0 commit comments

Comments
 (0)