Skip to content

Commit e2b2101

Browse files
authored
Merge pull request #57 from G8XSU/2025-03-14-more-events
Start publishing more payment events using EventPublisher.
2 parents a30379b + e778bbd commit e2b2101

File tree

7 files changed

+144
-37
lines changed

7 files changed

+144
-37
lines changed

ldk-server-protos/src/events.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#[allow(clippy::derive_partial_eq_without_eq)]
33
#[derive(Clone, PartialEq, ::prost::Message)]
44
pub struct EventEnvelope {
5-
#[prost(oneof = "event_envelope::Event", tags = "2")]
5+
#[prost(oneof = "event_envelope::Event", tags = "2, 3, 4, 6")]
66
pub event: ::core::option::Option<event_envelope::Event>,
77
}
88
/// Nested message and enum types in `EventEnvelope`.
@@ -11,9 +11,39 @@ pub mod event_envelope {
1111
#[derive(Clone, PartialEq, ::prost::Oneof)]
1212
pub enum Event {
1313
#[prost(message, tag = "2")]
14+
PaymentReceived(super::PaymentReceived),
15+
#[prost(message, tag = "3")]
16+
PaymentSuccessful(super::PaymentSuccessful),
17+
#[prost(message, tag = "4")]
18+
PaymentFailed(super::PaymentFailed),
19+
#[prost(message, tag = "6")]
1420
PaymentForwarded(super::PaymentForwarded),
1521
}
1622
}
23+
/// PaymentReceived indicates a payment has been received.
24+
#[allow(clippy::derive_partial_eq_without_eq)]
25+
#[derive(Clone, PartialEq, ::prost::Message)]
26+
pub struct PaymentReceived {
27+
/// The payment details for the payment in event.
28+
#[prost(message, optional, tag = "1")]
29+
pub payment: ::core::option::Option<super::types::Payment>,
30+
}
31+
/// PaymentSuccessful indicates a sent payment was successful.
32+
#[allow(clippy::derive_partial_eq_without_eq)]
33+
#[derive(Clone, PartialEq, ::prost::Message)]
34+
pub struct PaymentSuccessful {
35+
/// The payment details for the payment in event.
36+
#[prost(message, optional, tag = "1")]
37+
pub payment: ::core::option::Option<super::types::Payment>,
38+
}
39+
/// PaymentFailed indicates a sent payment has failed.
40+
#[allow(clippy::derive_partial_eq_without_eq)]
41+
#[derive(Clone, PartialEq, ::prost::Message)]
42+
pub struct PaymentFailed {
43+
/// The payment details for the payment in event.
44+
#[prost(message, optional, tag = "1")]
45+
pub payment: ::core::option::Option<super::types::Payment>,
46+
}
1747
/// PaymentForwarded indicates a payment was forwarded through the node.
1848
#[allow(clippy::derive_partial_eq_without_eq)]
1949
#[derive(Clone, PartialEq, ::prost::Message)]

ldk-server-protos/src/proto/events.proto

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,31 @@ package events;
55
// EventEnvelope wraps different event types in a single message to be used by EventPublisher.
66
message EventEnvelope {
77
oneof event {
8-
PaymentForwarded payment_forwarded = 2;
8+
PaymentReceived payment_received = 2;
9+
PaymentSuccessful payment_successful = 3;
10+
PaymentFailed payment_failed = 4;
11+
PaymentForwarded payment_forwarded = 6;
912
}
1013
}
1114

15+
// PaymentReceived indicates a payment has been received.
16+
message PaymentReceived {
17+
// The payment details for the payment in event.
18+
types.Payment payment = 1;
19+
}
20+
21+
// PaymentSuccessful indicates a sent payment was successful.
22+
message PaymentSuccessful {
23+
// The payment details for the payment in event.
24+
types.Payment payment = 1;
25+
}
26+
27+
// PaymentFailed indicates a sent payment has failed.
28+
message PaymentFailed {
29+
// The payment details for the payment in event.
30+
types.Payment payment = 1;
31+
}
32+
1233
// PaymentForwarded indicates a payment was forwarded through the node.
1334
message PaymentForwarded {
1435
types.ForwardedPayment forwarded_payment = 1;

ldk-server/src/io/events/event_publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use ldk_server_protos::events::EventEnvelope;
1818
/// The underlying messaging system is expected to support durably buffered events,
1919
/// enabling easy decoupling between the LDK Server and event consumers.
2020
#[async_trait]
21-
pub trait EventPublisher {
21+
pub trait EventPublisher: Send + Sync {
2222
/// Publishes an event to the underlying messaging system.
2323
///
2424
/// # Arguments

ldk-server/src/io/events/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,13 @@
11
pub(crate) mod event_publisher;
2+
3+
use ldk_server_protos::events::event_envelope;
4+
5+
/// Event variant to event name mapping.
6+
pub(crate) fn get_event_name(event: &event_envelope::Event) -> &'static str {
7+
match event {
8+
event_envelope::Event::PaymentReceived(_) => "PaymentReceived",
9+
event_envelope::Event::PaymentSuccessful(_) => "PaymentSuccessful",
10+
event_envelope::Event::PaymentFailed(_) => "PaymentFailed",
11+
event_envelope::Event::PaymentForwarded(_) => "PaymentForwarded",
12+
}
13+
}

ldk-server/src/io/persist/paginated_kv_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::io;
2626
/// [`KVStore`]: ldk_node::lightning::util::persist::KVStore
2727
/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`]: ldk_node::lightning::util::persist::KVSTORE_NAMESPACE_KEY_ALPHABET
2828
/// [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]: ldk_node::lightning::util::persist::KVSTORE_NAMESPACE_KEY_MAX_LEN
29-
pub trait PaginatedKVStore {
29+
pub trait PaginatedKVStore: Send + Sync {
3030
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and `key`.
3131
///
3232
/// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given

ldk-server/src/main.rs

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use hyper::server::conn::http1;
1414
use hyper_util::rt::TokioIo;
1515

1616
use crate::io::events::event_publisher::{EventPublisher, NoopEventPublisher};
17+
use crate::io::events::get_event_name;
1718
use crate::io::persist::paginated_kv_store::PaginatedKVStore;
1819
use crate::io::persist::sqlite_store::SqliteStore;
1920
use crate::io::persist::{
@@ -29,12 +30,14 @@ use ldk_node::lightning::ln::channelmanager::PaymentId;
2930
use ldk_node::logger::LogLevel;
3031
use ldk_server_protos::events;
3132
use ldk_server_protos::events::{event_envelope, EventEnvelope};
33+
use ldk_server_protos::types::Payment;
3234
use prost::Message;
3335
use rand::Rng;
3436
use std::fs;
3537
use std::path::{Path, PathBuf};
3638
use std::sync::Arc;
3739
use std::time::{SystemTime, UNIX_EPOCH};
40+
use tokio::select;
3841

3942
const USAGE_GUIDE: &str = "Usage: ldk-server <config_path>";
4043

@@ -92,7 +95,7 @@ fn main() {
9295
},
9396
};
9497

95-
let paginated_store =
98+
let paginated_store: Arc<dyn PaginatedKVStore> =
9699
Arc::new(match SqliteStore::new(PathBuf::from(config_file.storage_dir_path), None, None) {
97100
Ok(store) => store,
98101
Err(e) => {
@@ -124,14 +127,14 @@ fn main() {
124127
Err(e) => {
125128
println!("Failed to register for SIGTERM stream: {}", e);
126129
std::process::exit(-1);
127-
},
130+
}
128131
};
129132
let event_node = Arc::clone(&node);
130133
let rest_svc_listener = TcpListener::bind(config_file.rest_service_addr)
131134
.await
132135
.expect("Failed to bind listening port");
133136
loop {
134-
tokio::select! {
137+
select! {
135138
event = event_node.next_event_async() => {
136139
match event {
137140
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
@@ -154,18 +157,44 @@ fn main() {
154157
payment_id, payment_hash, amount_msat
155158
);
156159
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
157-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
160+
161+
publish_event_and_upsert_payment(&payment_id,
162+
|payment_ref| event_envelope::Event::PaymentReceived(events::PaymentReceived {
163+
payment: Some(payment_ref.clone()),
164+
}),
165+
&event_node,
166+
Arc::clone(&event_publisher),
167+
Arc::clone(&paginated_store)).await;
158168
},
159169
Event::PaymentSuccessful {payment_id, ..} => {
160170
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
161-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
171+
172+
publish_event_and_upsert_payment(&payment_id,
173+
|payment_ref| event_envelope::Event::PaymentSuccessful(events::PaymentSuccessful {
174+
payment: Some(payment_ref.clone()),
175+
}),
176+
&event_node,
177+
Arc::clone(&event_publisher),
178+
Arc::clone(&paginated_store)).await;
162179
},
163180
Event::PaymentFailed {payment_id, ..} => {
164181
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
165-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
182+
183+
publish_event_and_upsert_payment(&payment_id,
184+
|payment_ref| event_envelope::Event::PaymentFailed(events::PaymentFailed {
185+
payment: Some(payment_ref.clone()),
186+
}),
187+
&event_node,
188+
Arc::clone(&event_publisher),
189+
Arc::clone(&paginated_store)).await;
166190
},
167191
Event::PaymentClaimable {payment_id, ..} => {
168-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
192+
if let Some(payment_details) = event_node.payment(&payment_id) {
193+
let payment = payment_to_proto(payment_details);
194+
upsert_payment_details(&event_node, Arc::clone(&paginated_store), &payment);
195+
} else {
196+
eprintln!("Unable to find payment with paymentId: {}", payment_id.to_string());
197+
}
169198
},
170199
Event::PaymentForwarded {
171200
prev_channel_id,
@@ -234,13 +263,12 @@ fn main() {
234263
event_node.event_handled();
235264
},
236265
}
237-
238266
},
239267
res = rest_svc_listener.accept() => {
240268
match res {
241269
Ok((stream, _)) => {
242270
let io_stream = TokioIo::new(stream);
243-
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore + Send + Sync>);
271+
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store));
244272
runtime.spawn(async move {
245273
if let Err(err) = http1::Builder::new().serve_connection(io_stream, node_service).await {
246274
eprintln!("Failed to serve connection: {}", err);
@@ -266,30 +294,48 @@ fn main() {
266294
println!("Shutdown complete..");
267295
}
268296

269-
fn upsert_payment_details(
270-
event_node: &Node, paginated_store: Arc<dyn PaginatedKVStore>, payment_id: &PaymentId,
297+
async fn publish_event_and_upsert_payment(
298+
payment_id: &PaymentId, payment_to_event: fn(&Payment) -> event_envelope::Event,
299+
event_node: &Node, event_publisher: Arc<dyn EventPublisher>,
300+
paginated_store: Arc<dyn PaginatedKVStore>,
271301
) {
272302
if let Some(payment_details) = event_node.payment(payment_id) {
273303
let payment = payment_to_proto(payment_details);
274-
let time =
275-
SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs()
276-
as i64;
277-
278-
match paginated_store.write(
279-
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
280-
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
281-
&payment_id.0.to_lower_hex_string(),
282-
time,
283-
&payment.encode_to_vec(),
284-
) {
285-
Ok(_) => {
286-
event_node.event_handled();
287-
},
304+
305+
let event = payment_to_event(&payment);
306+
let event_name = get_event_name(&event);
307+
match event_publisher.publish(EventEnvelope { event: Some(event) }).await {
308+
Ok(_) => {},
288309
Err(e) => {
289-
eprintln!("Failed to write payment to persistence: {}", e);
310+
println!("Failed to publish '{}' event, : {}", event_name, e);
311+
return;
290312
},
291-
}
313+
};
314+
315+
upsert_payment_details(event_node, Arc::clone(&paginated_store), &payment);
292316
} else {
293-
eprintln!("Unable to find payment with paymentId: {}", payment_id.0.to_lower_hex_string());
317+
eprintln!("Unable to find payment with paymentId: {}", payment_id);
318+
}
319+
}
320+
321+
fn upsert_payment_details(
322+
event_node: &Node, paginated_store: Arc<dyn PaginatedKVStore>, payment: &Payment,
323+
) {
324+
let time =
325+
SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as i64;
326+
327+
match paginated_store.write(
328+
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
329+
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
330+
&payment.id,
331+
time,
332+
&payment.encode_to_vec(),
333+
) {
334+
Ok(_) => {
335+
event_node.event_handled();
336+
},
337+
Err(e) => {
338+
eprintln!("Failed to write payment to persistence: {}", e);
339+
},
294340
}
295341
}

ldk-server/src/service.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,18 @@ use std::sync::Arc;
3939
#[derive(Clone)]
4040
pub struct NodeService {
4141
node: Arc<Node>,
42-
paginated_kv_store: Arc<dyn PaginatedKVStore + Send + Sync>,
42+
paginated_kv_store: Arc<dyn PaginatedKVStore>,
4343
}
4444

4545
impl NodeService {
46-
pub(crate) fn new(
47-
node: Arc<Node>, paginated_kv_store: Arc<dyn PaginatedKVStore + Send + Sync>,
48-
) -> Self {
46+
pub(crate) fn new(node: Arc<Node>, paginated_kv_store: Arc<dyn PaginatedKVStore>) -> Self {
4947
Self { node, paginated_kv_store }
5048
}
5149
}
5250

5351
pub(crate) struct Context {
5452
pub(crate) node: Arc<Node>,
55-
pub(crate) paginated_kv_store: Arc<dyn PaginatedKVStore + Send + Sync>,
53+
pub(crate) paginated_kv_store: Arc<dyn PaginatedKVStore>,
5654
}
5755

5856
impl Service<Request<Incoming>> for NodeService {

0 commit comments

Comments
 (0)