Skip to content

Commit 72fc61c

Browse files
Merge pull request #687 from FrankYang0529/reuse-redis-connection
feat(outbound-redis): reuse connecton
2 parents d0c3d8d + 873d085 commit 72fc61c

File tree

5 files changed

+94
-20
lines changed

5 files changed

+94
-20
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/outbound-redis/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ redis = { version = "0.21", features = [ "tokio-comp" ] }
1212
spin-engine = { path = "../engine" }
1313
spin-manifest = { path = "../manifest" }
1414
wit-bindgen-wasmtime = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "dde4694aaa6acf9370206527a798ac4ba6a8c5b8" }
15+
tracing = { version = "0.1", features = [ "log" ] }
16+
tracing-futures = "0.2"
17+
owning_ref = "0.4.1"

crates/outbound-redis/src/lib.rs

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
use outbound_redis::*;
2+
use owning_ref::RwLockReadGuardRef;
23
use redis::Commands;
4+
use std::{
5+
collections::HashMap,
6+
sync::{Arc, Mutex, RwLock},
7+
};
38

49
pub use outbound_redis::add_to_linker;
510
use spin_engine::{
@@ -11,8 +16,9 @@ use wit_bindgen_wasmtime::wasmtime::Linker;
1116
wit_bindgen_wasmtime::export!("../../wit/ephemeral/outbound-redis.wit");
1217

1318
/// A simple implementation to support outbound Redis commands.
14-
#[derive(Default, Clone)]
15-
pub struct OutboundRedis;
19+
pub struct OutboundRedis {
20+
pub connections: Arc<RwLock<HashMap<String, Mutex<redis::Connection>>>>,
21+
}
1622

1723
impl HostComponent for OutboundRedis {
1824
type State = Self;
@@ -24,42 +30,87 @@ impl HostComponent for OutboundRedis {
2430
add_to_linker(linker, move |ctx| state_handle.get_mut(ctx))
2531
}
2632

27-
fn build_state(
28-
&self,
29-
_component: &spin_manifest::CoreComponent,
30-
) -> anyhow::Result<Self::State> {
31-
Ok(Self)
33+
fn build_state(&self, component: &spin_manifest::CoreComponent) -> anyhow::Result<Self::State> {
34+
let mut conn_map = HashMap::new();
35+
if let Some(address) = component.wasm.environment.get("REDIS_ADDRESS") {
36+
let client = redis::Client::open(address.to_string())?;
37+
let conn = client.get_connection()?;
38+
conn_map.insert(address.to_owned(), Mutex::new(conn));
39+
}
40+
Ok(Self {
41+
connections: Arc::new(RwLock::new(conn_map)),
42+
})
3243
}
3344
}
3445

3546
impl outbound_redis::OutboundRedis for OutboundRedis {
3647
fn publish(&mut self, address: &str, channel: &str, payload: &[u8]) -> Result<(), Error> {
37-
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
38-
let mut pubsub_conn = client.get_connection().map_err(|_| Error::Error)?;
39-
pubsub_conn
40-
.publish(channel, payload)
48+
let conn_map = self.get_reused_conn_map(address)?;
49+
let mut conn = conn_map
50+
.get(address)
51+
.unwrap()
52+
.lock()
4153
.map_err(|_| Error::Error)?;
54+
conn.publish(channel, payload).map_err(|_| Error::Error)?;
4255
Ok(())
4356
}
4457

4558
fn get(&mut self, address: &str, key: &str) -> Result<Vec<u8>, Error> {
46-
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
47-
let mut conn = client.get_connection().map_err(|_| Error::Error)?;
59+
let conn_map = self.get_reused_conn_map(address)?;
60+
let mut conn = conn_map
61+
.get(address)
62+
.unwrap()
63+
.lock()
64+
.map_err(|_| Error::Error)?;
4865
let value = conn.get(key).map_err(|_| Error::Error)?;
4966
Ok(value)
5067
}
5168

5269
fn set(&mut self, address: &str, key: &str, value: &[u8]) -> Result<(), Error> {
53-
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
54-
let mut conn = client.get_connection().map_err(|_| Error::Error)?;
70+
let conn_map = self.get_reused_conn_map(address)?;
71+
let mut conn = conn_map
72+
.get(address)
73+
.unwrap()
74+
.lock()
75+
.map_err(|_| Error::Error)?;
5576
conn.set(key, value).map_err(|_| Error::Error)?;
5677
Ok(())
5778
}
5879

5980
fn incr(&mut self, address: &str, key: &str) -> Result<i64, Error> {
60-
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
61-
let mut conn = client.get_connection().map_err(|_| Error::Error)?;
81+
let conn_map = self.get_reused_conn_map(address)?;
82+
let mut conn = conn_map
83+
.get(address)
84+
.unwrap()
85+
.lock()
86+
.map_err(|_| Error::Error)?;
6287
let value = conn.incr(key, 1).map_err(|_| Error::Error)?;
6388
Ok(value)
6489
}
6590
}
91+
92+
impl OutboundRedis {
93+
fn get_reused_conn_map<'ret, 'me: 'ret, 'c>(
94+
&'me mut self,
95+
address: &'c str,
96+
) -> Result<RwLockReadGuardRef<'ret, HashMap<String, Mutex<redis::Connection>>>, Error> {
97+
let conn_map = self.connections.read().map_err(|_| Error::Error)?;
98+
if conn_map.get(address).is_some() {
99+
tracing::debug!("Reuse connection: {:?}", address);
100+
return Ok(RwLockReadGuardRef::new(conn_map));
101+
}
102+
// Get rid of our read lock
103+
drop(conn_map);
104+
105+
let mut conn_map = self.connections.write().map_err(|_| Error::Error)?;
106+
let client = redis::Client::open(address).map_err(|_| Error::Error)?;
107+
let conn = client.get_connection().map_err(|_| Error::Error)?;
108+
tracing::debug!("Build new connection: {:?}", address);
109+
conn_map.insert(address.to_string(), Mutex::new(conn));
110+
// Get rid of our write lock
111+
drop(conn_map);
112+
113+
let conn_map = self.connections.read().map_err(|_| Error::Error)?;
114+
Ok(RwLockReadGuardRef::new(conn_map))
115+
}
116+
}

crates/trigger/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use std::{error::Error, marker::PhantomData, path::PathBuf};
1+
use std::{
2+
collections::HashMap,
3+
error::Error,
4+
marker::PhantomData,
5+
path::PathBuf,
6+
sync::{Arc, RwLock},
7+
};
28

39
use anyhow::Result;
410
use async_trait::async_trait;
@@ -119,7 +125,9 @@ impl<Executor: TriggerExecutor> TriggerExecutorBuilder<Executor> {
119125
/// Add the default set of host components to the given builder.
120126
pub fn add_default_host_components<T: Default + 'static>(builder: &mut Builder<T>) -> Result<()> {
121127
builder.add_host_component(wasi_outbound_http::OutboundHttpComponent)?;
122-
builder.add_host_component(outbound_redis::OutboundRedis)?;
128+
builder.add_host_component(outbound_redis::OutboundRedis {
129+
connections: Arc::new(RwLock::new(HashMap::new())),
130+
})?;
123131
builder.add_host_component(outbound_pg::OutboundPg)?;
124132
Ok(())
125133
}

tests/integration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ mod integration_tests {
790790
async fn wait_tcp(url: &str, process: &mut Child, target: &str) -> Result<()> {
791791
let mut wait_count = 0;
792792
loop {
793-
if wait_count >= 180 {
793+
if wait_count >= 240 {
794794
panic!(
795795
"Ran out of retries waiting for {} to start on URL {}",
796796
target, url

0 commit comments

Comments
 (0)