Skip to content

Commit b61af2e

Browse files
authored
Merge pull request #53 from G8XSU/2025-03-12-rabbitmq-impl
Add RabbitMQ-based Implementation of EventPublisher Trait
2 parents e2b2101 + d6c3618 commit b61af2e

File tree

7 files changed

+321
-1
lines changed

7 files changed

+321
-1
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: RabbitMQ Integration Tests
2+
3+
on: [ push, pull_request ]
4+
5+
concurrency:
6+
group: ${{ github.workflow }}-${{ github.ref }}
7+
cancel-in-progress: true
8+
9+
jobs:
10+
integration-tests:
11+
runs-on: ubuntu-latest
12+
13+
services:
14+
rabbitmq:
15+
image: rabbitmq:3
16+
env:
17+
RABBITMQ_DEFAULT_USER: guest
18+
RABBITMQ_DEFAULT_PASS: guest
19+
ports:
20+
- 5672:5672
21+
options: >-
22+
--health-cmd "rabbitmqctl node_health_check"
23+
--health-interval 10s
24+
--health-timeout 5s
25+
--health-retries 5
26+
27+
steps:
28+
- name: Checkout code
29+
uses: actions/checkout@v4
30+
31+
- name: Run RabbitMQ integration tests
32+
run: cargo test --features integration-tests-events-rabbitmq --verbose --color=always -- --nocapture
33+
env:
34+
RUST_BACKTRACE: 1

ldk-server/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,16 @@ hex = { package = "hex-conservative", version = "0.2.1", default-features = fals
1818
rusqlite = { version = "0.31.0", features = ["bundled"] }
1919
rand = { version = "0.8.5", default-features = false }
2020
async-trait = { version = "0.1.85", default-features = false }
21+
22+
# Required for RabittMQ based EventPublisher. Only enabled for `events-rabbitmq` feature.
23+
lapin = { version = "2.4.0", features = ["rustls"], default-features = false, optional = true }
24+
25+
[features]
26+
default = []
27+
events-rabbitmq = ["dep:lapin"]
28+
29+
# Feature-flags related to integration tests.
30+
integration-tests-events-rabbitmq = ["events-rabbitmq"]
31+
32+
[dev-dependencies]
33+
futures-util = "0.3.31"

ldk-server/ldk-server.config

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,12 @@
1818
"bitcoind_rpc_user": "polaruser",
1919

2020
// Bitcoin Core's RPC password.
21-
"bitcoind_rpc_password": "polarpass"
21+
"bitcoind_rpc_user": "polarpass",
22+
23+
// RabbitMQ connection string. (only required if using RabbitMQ based events using `events-rabbitmq` feature)
24+
"rabbitmq_connection_string": "",
25+
26+
// RabbitMQ exchange name. (only required if using RabbitMQ based events using `events-rabbitmq` feature)
27+
"rabbitmq_exchange_name": ""
28+
2229
}

ldk-server/src/io/events/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
pub(crate) mod event_publisher;
22

3+
#[cfg(feature = "events-rabbitmq")]
4+
pub(crate) mod rabbitmq;
5+
36
use ldk_server_protos::events::event_envelope;
47

58
/// Event variant to event name mapping.
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
use crate::api::error::LdkServerError;
2+
use crate::api::error::LdkServerErrorCode::InternalServerError;
3+
use crate::io::events::event_publisher::EventPublisher;
4+
use ::prost::Message;
5+
use async_trait::async_trait;
6+
use lapin::options::{BasicPublishOptions, ConfirmSelectOptions, ExchangeDeclareOptions};
7+
use lapin::types::FieldTable;
8+
use lapin::{
9+
BasicProperties, Channel, Connection, ConnectionProperties, ConnectionState, ExchangeKind,
10+
};
11+
use ldk_server_protos::events::EventEnvelope;
12+
use std::sync::Arc;
13+
use tokio::sync::Mutex;
14+
15+
/// A RabbitMQ-based implementation of the EventPublisher trait.
16+
pub struct RabbitMqEventPublisher {
17+
/// The RabbitMQ connection, used for reconnection logic.
18+
connection: Arc<Mutex<Option<Connection>>>,
19+
/// The RabbitMQ channel used for publishing events.
20+
channel: Arc<Mutex<Option<Channel>>>,
21+
/// Configuration details, including connection string and exchange name.
22+
config: RabbitMqConfig,
23+
}
24+
25+
/// Configuration for the RabbitMQ event publisher.
26+
#[derive(Debug, Clone)]
27+
pub struct RabbitMqConfig {
28+
pub connection_string: String,
29+
pub exchange_name: String,
30+
}
31+
32+
/// Delivery mode for persistent messages (written to disk).
33+
const DELIVERY_MODE_PERSISTENT: u8 = 2;
34+
35+
impl RabbitMqEventPublisher {
36+
/// Creates a new RabbitMqEventPublisher instance.
37+
pub fn new(config: RabbitMqConfig) -> Self {
38+
Self { connection: Arc::new(Mutex::new(None)), channel: Arc::new(Mutex::new(None)), config }
39+
}
40+
41+
async fn connect(config: &RabbitMqConfig) -> Result<(Connection, Channel), LdkServerError> {
42+
let conn = Connection::connect(&config.connection_string, ConnectionProperties::default())
43+
.await
44+
.map_err(|e| {
45+
LdkServerError::new(
46+
InternalServerError,
47+
format!("Failed to connect to RabbitMQ: {}", e),
48+
)
49+
})?;
50+
51+
let channel = conn.create_channel().await.map_err(|e| {
52+
LdkServerError::new(InternalServerError, format!("Failed to create channel: {}", e))
53+
})?;
54+
55+
channel.confirm_select(ConfirmSelectOptions::default()).await.map_err(|e| {
56+
LdkServerError::new(InternalServerError, format!("Failed to enable confirms: {}", e))
57+
})?;
58+
59+
channel
60+
.exchange_declare(
61+
&config.exchange_name,
62+
ExchangeKind::Fanout,
63+
ExchangeDeclareOptions { durable: true, ..Default::default() },
64+
FieldTable::default(),
65+
)
66+
.await
67+
.map_err(|e| {
68+
LdkServerError::new(
69+
InternalServerError,
70+
format!("Failed to declare exchange: {}", e),
71+
)
72+
})?;
73+
74+
Ok((conn, channel))
75+
}
76+
77+
async fn ensure_connected(&self) -> Result<(), LdkServerError> {
78+
{
79+
let connection = self.connection.lock().await;
80+
if let Some(connection) = &*connection {
81+
if connection.status().state() == ConnectionState::Connected {
82+
return Ok(());
83+
}
84+
}
85+
}
86+
87+
// Connection is not alive, attempt reconnecting.
88+
let (connection, channel) = Self::connect(&self.config)
89+
.await
90+
.map_err(|e| LdkServerError::new(InternalServerError, e.to_string()))?;
91+
*self.connection.lock().await = Some(connection);
92+
*self.channel.lock().await = Some(channel);
93+
Ok(())
94+
}
95+
}
96+
97+
#[async_trait]
98+
impl EventPublisher for RabbitMqEventPublisher {
99+
/// Publishes an event to RabbitMQ.
100+
///
101+
/// The event is published to a fanout exchange with persistent delivery mode,
102+
/// and the method waits for confirmation from RabbitMQ to ensure durability.
103+
async fn publish(&self, event: EventEnvelope) -> Result<(), LdkServerError> {
104+
// Ensure connection is alive before proceeding
105+
self.ensure_connected().await?;
106+
107+
let channel_guard = self.channel.lock().await;
108+
let channel = channel_guard.as_ref().ok_or_else(|| {
109+
LdkServerError::new(InternalServerError, "Channel not initialized".to_string())
110+
})?;
111+
112+
// Publish the event with persistent delivery mode
113+
let confirm = channel
114+
.basic_publish(
115+
&self.config.exchange_name,
116+
"", // Empty routing key should be used for fanout exchange, since it is ignored.
117+
BasicPublishOptions::default(),
118+
&event.encode_to_vec(),
119+
BasicProperties::default().with_delivery_mode(DELIVERY_MODE_PERSISTENT),
120+
)
121+
.await
122+
.map_err(|e| {
123+
LdkServerError::new(
124+
InternalServerError,
125+
format!("Failed to publish event, error: {}", e),
126+
)
127+
})?;
128+
129+
let confirmation = confirm.await.map_err(|e| {
130+
LdkServerError::new(InternalServerError, format!("Failed to get confirmation: {}", e))
131+
})?;
132+
133+
match confirmation {
134+
lapin::publisher_confirm::Confirmation::Ack(_) => Ok(()),
135+
lapin::publisher_confirm::Confirmation::Nack(_) => Err(LdkServerError::new(
136+
InternalServerError,
137+
"Message not acknowledged".to_string(),
138+
)),
139+
_ => {
140+
Err(LdkServerError::new(InternalServerError, "Unexpected confirmation".to_string()))
141+
},
142+
}
143+
}
144+
}
145+
146+
#[cfg(test)]
147+
#[cfg(feature = "integration-tests-events-rabbitmq")]
148+
mod integration_tests_events_rabbitmq {
149+
use super::*;
150+
use lapin::{
151+
options::{BasicAckOptions, BasicConsumeOptions, QueueBindOptions, QueueDeclareOptions},
152+
types::FieldTable,
153+
Channel, Connection,
154+
};
155+
use ldk_server_protos::events::event_envelope::Event;
156+
use ldk_server_protos::events::PaymentForwarded;
157+
use std::io;
158+
use std::time::Duration;
159+
use tokio;
160+
161+
use futures_util::stream::StreamExt;
162+
#[tokio::test]
163+
async fn test_publish_and_consume_event() {
164+
let config = RabbitMqConfig {
165+
connection_string: "amqp://guest:guest@localhost:5672/%2f".to_string(),
166+
exchange_name: "test_exchange".to_string(),
167+
};
168+
169+
let publisher = RabbitMqEventPublisher::new(config.clone());
170+
171+
let conn = Connection::connect(&config.connection_string, ConnectionProperties::default())
172+
.await
173+
.expect("Failed make rabbitmq connection");
174+
let channel = conn.create_channel().await.expect("Failed to create rabbitmq channel");
175+
176+
let queue_name = "test_queue";
177+
setup_queue(&queue_name, &channel, &config).await;
178+
179+
let event =
180+
EventEnvelope { event: Some(Event::PaymentForwarded(PaymentForwarded::default())) };
181+
publisher.publish(event.clone()).await.expect("Failed to publish event");
182+
183+
consume_event(&queue_name, &channel, &event).await.expect("Failed to consume event");
184+
}
185+
186+
async fn setup_queue(queue_name: &str, channel: &Channel, config: &RabbitMqConfig) {
187+
channel
188+
.queue_declare(queue_name, QueueDeclareOptions::default(), FieldTable::default())
189+
.await
190+
.unwrap();
191+
channel
192+
.exchange_declare(
193+
&config.exchange_name,
194+
ExchangeKind::Fanout,
195+
ExchangeDeclareOptions { durable: true, ..Default::default() },
196+
FieldTable::default(),
197+
)
198+
.await
199+
.unwrap();
200+
201+
channel
202+
.queue_bind(
203+
queue_name,
204+
&config.exchange_name,
205+
"",
206+
QueueBindOptions::default(),
207+
FieldTable::default(),
208+
)
209+
.await
210+
.unwrap();
211+
}
212+
213+
async fn consume_event(
214+
queue_name: &str, channel: &Channel, expected_event: &EventEnvelope,
215+
) -> io::Result<()> {
216+
let mut consumer = channel
217+
.basic_consume(
218+
queue_name,
219+
"test_consumer",
220+
BasicConsumeOptions::default(),
221+
FieldTable::default(),
222+
)
223+
.await
224+
.unwrap();
225+
let delivery =
226+
tokio::time::timeout(Duration::from_secs(10), consumer.next()).await?.unwrap().unwrap();
227+
let received_event = EventEnvelope::decode(&*delivery.data)?;
228+
assert_eq!(received_event, *expected_event, "Event mismatch");
229+
channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).await.unwrap();
230+
Ok(())
231+
}
232+
}

ldk-server/src/main.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use hyper_util::rt::TokioIo;
1515

1616
use crate::io::events::event_publisher::{EventPublisher, NoopEventPublisher};
1717
use crate::io::events::get_event_name;
18+
#[cfg(feature = "events-rabbitmq")]
19+
use crate::io::events::rabbitmq::{RabbitMqConfig, RabbitMqEventPublisher};
1820
use crate::io::persist::paginated_kv_store::PaginatedKVStore;
1921
use crate::io::persist::sqlite_store::SqliteStore;
2022
use crate::io::persist::{
@@ -106,6 +108,15 @@ fn main() {
106108

107109
let event_publisher: Arc<dyn EventPublisher> = Arc::new(NoopEventPublisher);
108110

111+
#[cfg(feature = "events-rabbitmq")]
112+
let event_publisher: Arc<dyn EventPublisher> = {
113+
let rabbitmq_config = RabbitMqConfig {
114+
connection_string: config_file.rabbitmq_connection_string,
115+
exchange_name: config_file.rabbitmq_exchange_name,
116+
};
117+
Arc::new(RabbitMqEventPublisher::new(rabbitmq_config))
118+
};
119+
109120
println!("Starting up...");
110121
match node.start_with_runtime(Arc::clone(&runtime)) {
111122
Ok(()) => {},

ldk-server/src/util/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub struct Config {
1616
pub bitcoind_rpc_addr: SocketAddr,
1717
pub bitcoind_rpc_user: String,
1818
pub bitcoind_rpc_password: String,
19+
pub rabbitmq_connection_string: String,
20+
pub rabbitmq_exchange_name: String,
1921
}
2022

2123
impl TryFrom<JsonConfig> for Config {
@@ -45,6 +47,16 @@ impl TryFrom<JsonConfig> for Config {
4547
)
4648
})?;
4749

50+
#[cfg(feature = "events-rabbitmq")]
51+
if json_config.rabbitmq_connection_string.as_deref().map_or(true, |s| s.is_empty())
52+
|| json_config.rabbitmq_exchange_name.as_deref().map_or(true, |s| s.is_empty())
53+
{
54+
return Err(io::Error::new(
55+
io::ErrorKind::InvalidInput,
56+
"Both `rabbitmq_connection_string` and `rabbitmq_exchange_name` must be configured if enabling `events-rabbitmq` feature.".to_string(),
57+
));
58+
}
59+
4860
Ok(Config {
4961
listening_addr,
5062
network: json_config.network,
@@ -53,6 +65,8 @@ impl TryFrom<JsonConfig> for Config {
5365
bitcoind_rpc_addr,
5466
bitcoind_rpc_user: json_config.bitcoind_rpc_user,
5567
bitcoind_rpc_password: json_config.bitcoind_rpc_password,
68+
rabbitmq_connection_string: json_config.rabbitmq_connection_string.unwrap_or_default(),
69+
rabbitmq_exchange_name: json_config.rabbitmq_exchange_name.unwrap_or_default(),
5670
})
5771
}
5872
}
@@ -67,6 +81,8 @@ pub struct JsonConfig {
6781
bitcoind_rpc_address: String,
6882
bitcoind_rpc_user: String,
6983
bitcoind_rpc_password: String,
84+
rabbitmq_connection_string: Option<String>,
85+
rabbitmq_exchange_name: Option<String>,
7086
}
7187

7288
/// Loads the configuration from a JSON file at the given path.
@@ -114,6 +130,8 @@ mod tests {
114130
"bitcoind_rpc_address":"127.0.0.1:8332", // comment-1
115131
"bitcoind_rpc_user": "bitcoind-testuser",
116132
"bitcoind_rpc_password": "bitcoind-testpassword",
133+
"rabbitmq_connection_string": "rabbitmq_connection_string",
134+
"rabbitmq_exchange_name": "rabbitmq_exchange_name",
117135
"unknown_key": "random-value"
118136
// comment-2
119137
}"#;
@@ -130,6 +148,8 @@ mod tests {
130148
bitcoind_rpc_addr: SocketAddr::from_str("127.0.0.1:8332").unwrap(),
131149
bitcoind_rpc_user: "bitcoind-testuser".to_string(),
132150
bitcoind_rpc_password: "bitcoind-testpassword".to_string(),
151+
rabbitmq_connection_string: "rabbitmq_connection_string".to_string(),
152+
rabbitmq_exchange_name: "rabbitmq_exchange_name".to_string(),
133153
}
134154
)
135155
}

0 commit comments

Comments
 (0)