Skip to content

Commit 01e752e

Browse files
committed
Migrate outbound-redis to new core
Signed-off-by: Lann Martin <lann.martin@fermyon.com>
1 parent 2966bfe commit 01e752e

File tree

5 files changed

+55
-110
lines changed

5 files changed

+55
-110
lines changed

Cargo.lock

Lines changed: 2 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/outbound-redis/Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ doctest = false
88

99
[dependencies]
1010
anyhow = "1.0"
11-
owning_ref = "0.4.1"
12-
redis = { version = "0.21", features = [ "tokio-comp" ] }
13-
spin-engine = { path = "../engine" }
14-
spin-manifest = { path = "../manifest" }
15-
tracing = { version = "0.1", features = [ "log" ] }
11+
redis = { version = "0.21", features = ["tokio-comp"] }
12+
spin-core = { path = "../core" }
13+
tokio = { version = "1", features = ["sync"] }
14+
tracing = { version = "0.1", features = ["log"] }
1615

1716
[dependencies.wit-bindgen-wasmtime]
1817
git = "https://github.com/bytecodealliance/wit-bindgen"

crates/outbound-redis/src/lib.rs

Lines changed: 46 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,118 +1,84 @@
1-
use outbound_redis::*;
2-
use owning_ref::RwLockReadGuardRef;
3-
use redis::Commands;
4-
use std::{
5-
collections::HashMap,
6-
sync::{Arc, Mutex, RwLock},
7-
};
1+
use std::{collections::HashMap, sync::Arc};
82

9-
pub use outbound_redis::add_to_linker;
10-
use spin_engine::{
11-
host_component::{HostComponent, HostComponentsStateHandle},
12-
RuntimeContext,
13-
};
14-
use wit_bindgen_wasmtime::{async_trait, wasmtime::Linker};
3+
use anyhow::Result;
4+
use redis::{aio::Connection, AsyncCommands};
5+
use spin_core::{HostComponent, Linker};
6+
use tokio::sync::{Mutex, RwLock};
7+
use wit_bindgen_wasmtime::async_trait;
158

169
wit_bindgen_wasmtime::export!({paths: ["../../wit/ephemeral/outbound-redis.wit"], async: *});
10+
use outbound_redis::Error;
1711

18-
/// A simple implementation to support outbound Redis commands.
12+
#[derive(Clone, Default)]
1913
pub struct OutboundRedis {
20-
pub connections: Arc<RwLock<HashMap<String, Mutex<redis::Connection>>>>,
14+
connections: Arc<RwLock<HashMap<String, Arc<Mutex<Connection>>>>>,
2115
}
2216

2317
impl HostComponent for OutboundRedis {
24-
type State = Self;
18+
type Data = Self;
2519

2620
fn add_to_linker<T: Send>(
27-
linker: &mut Linker<RuntimeContext<T>>,
28-
state_handle: HostComponentsStateHandle<Self::State>,
21+
linker: &mut Linker<T>,
22+
get: impl Fn(&mut spin_core::Data<T>) -> &mut Self::Data + Send + Sync + Copy + 'static,
2923
) -> anyhow::Result<()> {
30-
add_to_linker(linker, move |ctx| state_handle.get_mut(ctx))
24+
crate::outbound_redis::add_to_linker(linker, get)
3125
}
3226

33-
fn build_state(&self, component: &spin_manifest::CoreComponent) -> anyhow::Result<Self::State> {
34-
let mut conn_map = HashMap::new();
35-
if let Some(address) = component.wasm.environment.get("REDIS_ADDRESS") {
36-
let client = redis::Client::open(address.to_string())?;
37-
let conn = client.get_connection()?;
38-
conn_map.insert(address.to_owned(), Mutex::new(conn));
39-
}
40-
Ok(Self {
41-
connections: Arc::new(RwLock::new(conn_map)),
42-
})
27+
fn build_data(&self) -> Self::Data {
28+
self.clone()
4329
}
4430
}
4531

46-
// TODO: use spawn_blocking or async client methods (redis::aio)
4732
#[async_trait]
4833
impl outbound_redis::OutboundRedis for OutboundRedis {
4934
async fn publish(&mut self, address: &str, channel: &str, payload: &[u8]) -> Result<(), Error> {
50-
let conn_map = self.get_reused_conn_map(address)?;
51-
let mut conn = conn_map
52-
.get(address)
53-
.unwrap()
54-
.lock()
55-
.map_err(|_| Error::Error)?;
56-
conn.publish(channel, payload).map_err(|_| Error::Error)?;
35+
let conn = self.get_conn(address).await.map_err(log_error)?;
36+
conn.lock()
37+
.await
38+
.publish(channel, payload)
39+
.await
40+
.map_err(log_error)?;
5741
Ok(())
5842
}
5943

6044
async fn get(&mut self, address: &str, key: &str) -> Result<Vec<u8>, Error> {
61-
let conn_map = self.get_reused_conn_map(address)?;
62-
let mut conn = conn_map
63-
.get(address)
64-
.unwrap()
65-
.lock()
66-
.map_err(|_| Error::Error)?;
67-
let value = conn.get(key).map_err(|_| Error::Error)?;
45+
let conn = self.get_conn(address).await.map_err(log_error)?;
46+
let value = conn.lock().await.get(key).await.map_err(log_error)?;
6847
Ok(value)
6948
}
7049

7150
async fn set(&mut self, address: &str, key: &str, value: &[u8]) -> Result<(), Error> {
72-
let conn_map = self.get_reused_conn_map(address)?;
73-
let mut conn = conn_map
74-
.get(address)
75-
.unwrap()
76-
.lock()
77-
.map_err(|_| Error::Error)?;
78-
conn.set(key, value).map_err(|_| Error::Error)?;
51+
let conn = self.get_conn(address).await.map_err(log_error)?;
52+
conn.lock().await.set(key, value).await.map_err(log_error)?;
7953
Ok(())
8054
}
8155

8256
async fn incr(&mut self, address: &str, key: &str) -> Result<i64, Error> {
83-
let conn_map = self.get_reused_conn_map(address)?;
84-
let mut conn = conn_map
85-
.get(address)
86-
.unwrap()
87-
.lock()
88-
.map_err(|_| Error::Error)?;
89-
let value = conn.incr(key, 1).map_err(|_| Error::Error)?;
57+
let conn = self.get_conn(address).await.map_err(log_error)?;
58+
let value = conn.lock().await.incr(key, 1).await.map_err(log_error)?;
9059
Ok(value)
9160
}
9261
}
9362

9463
impl OutboundRedis {
95-
fn get_reused_conn_map<'ret, 'me: 'ret, 'c>(
96-
&'me mut self,
97-
address: &'c str,
98-
) -> Result<RwLockReadGuardRef<'ret, HashMap<String, Mutex<redis::Connection>>>, Error> {
99-
let conn_map = self.connections.read().map_err(|_| Error::Error)?;
100-
if conn_map.get(address).is_some() {
101-
tracing::debug!("Reuse connection: {:?}", address);
102-
return Ok(RwLockReadGuardRef::new(conn_map));
103-
}
104-
// Get rid of our read lock
105-
drop(conn_map);
106-
107-
let mut conn_map = self.connections.write().map_err(|_| Error::Error)?;
108-
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
109-
let conn = client.get_connection().map_err(|_| Error::Error)?;
110-
tracing::debug!("Build new connection: {:?}", address);
111-
conn_map.insert(address.to_string(), Mutex::new(conn));
112-
// Get rid of our write lock
113-
drop(conn_map);
114-
115-
let conn_map = self.connections.read().map_err(|_| Error::Error)?;
116-
Ok(RwLockReadGuardRef::new(conn_map))
64+
async fn get_conn(&self, address: &str) -> Result<Arc<Mutex<Connection>>> {
65+
let conn_map = self.connections.read().await;
66+
let conn = if let Some(conn) = conn_map.get(address) {
67+
conn.clone()
68+
} else {
69+
let conn = redis::Client::open(address)?.get_async_connection().await?;
70+
let conn = Arc::new(Mutex::new(conn));
71+
self.connections
72+
.write()
73+
.await
74+
.insert(address.to_string(), conn.clone());
75+
conn
76+
};
77+
Ok(conn)
11778
}
11879
}
80+
81+
fn log_error(err: impl std::fmt::Debug) -> Error {
82+
tracing::warn!("Outbound Redis error: {err:?}");
83+
Error::Error
84+
}

crates/trigger-new/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ impl<Executor: TriggerExecutor> TriggerExecutorBuilder<Executor> {
9393
let mut builder = Engine::builder(&self.config)?;
9494

9595
if !self.disable_default_host_components {
96-
// FIXME(lann): migrate host components from prototype
97-
// builder.add_host_component(outbound_redis::OutboundRedis::default())?;
96+
builder.add_host_component(outbound_redis::OutboundRedis::default())?;
9897
builder.add_host_component(outbound_pg::OutboundPg)?;
9998
self.loader.add_dynamic_host_component(
10099
&mut builder,

crates/trigger/src/lib.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
use std::{
2-
collections::HashMap,
3-
error::Error,
4-
marker::PhantomData,
5-
path::PathBuf,
6-
sync::{Arc, RwLock},
7-
};
1+
use std::{error::Error, marker::PhantomData, path::PathBuf};
82

93
use anyhow::Result;
104
use async_trait::async_trait;
@@ -124,10 +118,7 @@ impl<Executor: TriggerExecutor> TriggerExecutorBuilder<Executor> {
124118

125119
/// Add the default set of host components to the given builder.
126120
pub fn add_default_host_components<T: Default + Send + 'static>(
127-
builder: &mut Builder<T>,
121+
_builder: &mut Builder<T>,
128122
) -> Result<()> {
129-
builder.add_host_component(outbound_redis::OutboundRedis {
130-
connections: Arc::new(RwLock::new(HashMap::new())),
131-
})?;
132123
Ok(())
133124
}

0 commit comments

Comments
 (0)