Skip to content

Commit 7feca16

Browse files
committed
Add RabbitMQ based EventPublisher Impl.
RabbitMQ was selected because it is one of the most robust and battle-tested open-source messaging systems, allowing for durable, buffered messages and flexible exchange options. This ensures that events are not lost, even in the case of restarts, network failures, or consumer failures, while keeping consumers decoupled. * Feature-based: Only enabled if we enable `events-rabbitmq` feature, this allows us to support multiple implementations and no strict coupling with a single impl while keep our dependencies to the minimum. * Durable Publishing: Uses publisher confirms and persistent delivery mode to guarantee events are stored by RabbitMQ before returning Ok(()). * Lazy Initialized connection: Wraps Connection and Channel in Option for lazy connection init via ensure_connected. * Reconnection Handling: Implements a single reconnect attempt per publish call. Event publishing is already retried, so this keeps it simple for now.
1 parent e2b2101 commit 7feca16

File tree

3 files changed

+154
-0
lines changed

3 files changed

+154
-0
lines changed

ldk-server/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,10 @@ hex = { package = "hex-conservative", version = "0.2.1", default-features = fals
1818
rusqlite = { version = "0.31.0", features = ["bundled"] }
1919
rand = { version = "0.8.5", default-features = false }
2020
async-trait = { version = "0.1.85", default-features = false }
21+
22+
# Required for RabittMQ based EventPublisher. Only enabled for `events-rabbitmq` feature.
23+
lapin = { version = "2.4.0", features = ["rustls"], default-features = false, optional = true }
24+
25+
[features]
26+
default = []
27+
events-rabbitmq = ["dep:lapin"]

ldk-server/src/io/events/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
pub(crate) mod event_publisher;
22

3+
#[cfg(feature = "events-rabbitmq")]
4+
pub(crate) mod rabbitmq;
5+
36
use ldk_server_protos::events::event_envelope;
47

58
/// Event variant to event name mapping.
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use crate::api::error::LdkServerError;
2+
use crate::api::error::LdkServerErrorCode::InternalServerError;
3+
use crate::io::events::event_publisher::EventPublisher;
4+
use ::prost::Message;
5+
use async_trait::async_trait;
6+
use lapin::options::{BasicPublishOptions, ConfirmSelectOptions, ExchangeDeclareOptions};
7+
use lapin::types::FieldTable;
8+
use lapin::{
9+
BasicProperties, Channel, Connection, ConnectionProperties, ConnectionState, ExchangeKind,
10+
};
11+
use ldk_server_protos::events::EventEnvelope;
12+
use std::sync::Arc;
13+
use tokio::sync::Mutex;
14+
15+
/// A RabbitMQ-based implementation of the EventPublisher trait.
16+
pub struct RabbitMqEventPublisher {
17+
/// The RabbitMQ connection, used for reconnection logic.
18+
connection: Arc<Mutex<Option<Connection>>>,
19+
/// The RabbitMQ channel used for publishing events.
20+
channel: Arc<Mutex<Option<Channel>>>,
21+
/// Configuration details, including connection string and exchange name.
22+
config: RabbitMqConfig,
23+
}
24+
25+
/// Configuration for the RabbitMQ event publisher.
26+
#[derive(Debug, Clone)]
27+
pub struct RabbitMqConfig {
28+
pub connection_string: String,
29+
pub exchange_name: String,
30+
}
31+
32+
/// Delivery mode for persistent messages (written to disk).
33+
const DELIVERY_MODE_PERSISTENT: u8 = 2;
34+
35+
impl RabbitMqEventPublisher {
36+
/// Creates a new RabbitMqEventPublisher instance.
37+
pub fn new(config: RabbitMqConfig) -> Self {
38+
Self { connection: Arc::new(Mutex::new(None)), channel: Arc::new(Mutex::new(None)), config }
39+
}
40+
41+
async fn connect(config: &RabbitMqConfig) -> Result<(Connection, Channel), LdkServerError> {
42+
let conn = Connection::connect(&config.connection_string, ConnectionProperties::default())
43+
.await
44+
.map_err(|e| {
45+
LdkServerError::new(
46+
InternalServerError,
47+
format!("Failed to connect to RabbitMQ: {}", e),
48+
)
49+
})?;
50+
51+
let channel = conn.create_channel().await.map_err(|e| {
52+
LdkServerError::new(InternalServerError, format!("Failed to create channel: {}", e))
53+
})?;
54+
55+
channel.confirm_select(ConfirmSelectOptions::default()).await.map_err(|e| {
56+
LdkServerError::new(InternalServerError, format!("Failed to enable confirms: {}", e))
57+
})?;
58+
59+
channel
60+
.exchange_declare(
61+
&config.exchange_name,
62+
ExchangeKind::Fanout,
63+
ExchangeDeclareOptions { durable: true, ..Default::default() },
64+
FieldTable::default(),
65+
)
66+
.await
67+
.map_err(|e| {
68+
LdkServerError::new(
69+
InternalServerError,
70+
format!("Failed to declare exchange: {}", e),
71+
)
72+
})?;
73+
74+
Ok((conn, channel))
75+
}
76+
77+
async fn ensure_connected(&self) -> Result<(), LdkServerError> {
78+
{
79+
let connection = self.connection.lock().await;
80+
if let Some(connection) = &*connection {
81+
if connection.status().state() == ConnectionState::Connected {
82+
return Ok(());
83+
}
84+
}
85+
}
86+
87+
// Connection is not alive, attempt reconnecting.
88+
let (connection, channel) = Self::connect(&self.config)
89+
.await
90+
.map_err(|e| LdkServerError::new(InternalServerError, e.to_string()))?;
91+
*self.connection.lock().await = Some(connection);
92+
*self.channel.lock().await = Some(channel);
93+
Ok(())
94+
}
95+
}
96+
97+
#[async_trait]
98+
impl EventPublisher for RabbitMqEventPublisher {
99+
/// Publishes an event to RabbitMQ.
100+
///
101+
/// The event is published to a fanout exchange with persistent delivery mode,
102+
/// and the method waits for confirmation from RabbitMQ to ensure durability.
103+
async fn publish(&self, event: EventEnvelope) -> Result<(), LdkServerError> {
104+
// Ensure connection is alive before proceeding
105+
self.ensure_connected().await?;
106+
107+
let channel_guard = self.channel.lock().await;
108+
let channel = channel_guard.as_ref().ok_or_else(|| {
109+
LdkServerError::new(InternalServerError, "Channel not initialized".to_string())
110+
})?;
111+
112+
// Publish the event with persistent delivery mode
113+
let confirm = channel
114+
.basic_publish(
115+
&self.config.exchange_name,
116+
"", // Empty routing key should be used for fanout exchange, since it is ignored.
117+
BasicPublishOptions::default(),
118+
&event.encode_to_vec(),
119+
BasicProperties::default().with_delivery_mode(DELIVERY_MODE_PERSISTENT),
120+
)
121+
.await
122+
.map_err(|e| {
123+
LdkServerError::new(
124+
InternalServerError,
125+
format!("Failed to publish event, error: {}", e),
126+
)
127+
})?;
128+
129+
let confirmation = confirm.await.map_err(|e| {
130+
LdkServerError::new(InternalServerError, format!("Failed to get confirmation: {}", e))
131+
})?;
132+
133+
match confirmation {
134+
lapin::publisher_confirm::Confirmation::Ack(_) => Ok(()),
135+
lapin::publisher_confirm::Confirmation::Nack(_) => Err(LdkServerError::new(
136+
InternalServerError,
137+
"Message not acknowledged".to_string(),
138+
)),
139+
_ => {
140+
Err(LdkServerError::new(InternalServerError, "Unexpected confirmation".to_string()))
141+
},
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)