Skip to content

Commit e778bbd

Browse files
committed
Start publishing more payment events using EventPublisher.
1 parent bc83901 commit e778bbd

File tree

2 files changed

+84
-26
lines changed

2 files changed

+84
-26
lines changed

ldk-server/src/io/events/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,13 @@
11
pub(crate) mod event_publisher;
2+
3+
use ldk_server_protos::events::event_envelope;
4+
5+
/// Event variant to event name mapping.
6+
pub(crate) fn get_event_name(event: &event_envelope::Event) -> &'static str {
7+
match event {
8+
event_envelope::Event::PaymentReceived(_) => "PaymentReceived",
9+
event_envelope::Event::PaymentSuccessful(_) => "PaymentSuccessful",
10+
event_envelope::Event::PaymentFailed(_) => "PaymentFailed",
11+
event_envelope::Event::PaymentForwarded(_) => "PaymentForwarded",
12+
}
13+
}

ldk-server/src/main.rs

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use hyper::server::conn::http1;
1414
use hyper_util::rt::TokioIo;
1515

1616
use crate::io::events::event_publisher::{EventPublisher, NoopEventPublisher};
17+
use crate::io::events::get_event_name;
1718
use crate::io::persist::paginated_kv_store::PaginatedKVStore;
1819
use crate::io::persist::sqlite_store::SqliteStore;
1920
use crate::io::persist::{
@@ -29,12 +30,14 @@ use ldk_node::lightning::ln::channelmanager::PaymentId;
2930
use ldk_node::logger::LogLevel;
3031
use ldk_server_protos::events;
3132
use ldk_server_protos::events::{event_envelope, EventEnvelope};
33+
use ldk_server_protos::types::Payment;
3234
use prost::Message;
3335
use rand::Rng;
3436
use std::fs;
3537
use std::path::{Path, PathBuf};
3638
use std::sync::Arc;
3739
use std::time::{SystemTime, UNIX_EPOCH};
40+
use tokio::select;
3841

3942
const USAGE_GUIDE: &str = "Usage: ldk-server <config_path>";
4043

@@ -124,14 +127,14 @@ fn main() {
124127
Err(e) => {
125128
println!("Failed to register for SIGTERM stream: {}", e);
126129
std::process::exit(-1);
127-
},
130+
}
128131
};
129132
let event_node = Arc::clone(&node);
130133
let rest_svc_listener = TcpListener::bind(config_file.rest_service_addr)
131134
.await
132135
.expect("Failed to bind listening port");
133136
loop {
134-
tokio::select! {
137+
select! {
135138
event = event_node.next_event_async() => {
136139
match event {
137140
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
@@ -154,18 +157,44 @@ fn main() {
154157
payment_id, payment_hash, amount_msat
155158
);
156159
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
157-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
160+
161+
publish_event_and_upsert_payment(&payment_id,
162+
|payment_ref| event_envelope::Event::PaymentReceived(events::PaymentReceived {
163+
payment: Some(payment_ref.clone()),
164+
}),
165+
&event_node,
166+
Arc::clone(&event_publisher),
167+
Arc::clone(&paginated_store)).await;
158168
},
159169
Event::PaymentSuccessful {payment_id, ..} => {
160170
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
161-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
171+
172+
publish_event_and_upsert_payment(&payment_id,
173+
|payment_ref| event_envelope::Event::PaymentSuccessful(events::PaymentSuccessful {
174+
payment: Some(payment_ref.clone()),
175+
}),
176+
&event_node,
177+
Arc::clone(&event_publisher),
178+
Arc::clone(&paginated_store)).await;
162179
},
163180
Event::PaymentFailed {payment_id, ..} => {
164181
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");
165-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
182+
183+
publish_event_and_upsert_payment(&payment_id,
184+
|payment_ref| event_envelope::Event::PaymentFailed(events::PaymentFailed {
185+
payment: Some(payment_ref.clone()),
186+
}),
187+
&event_node,
188+
Arc::clone(&event_publisher),
189+
Arc::clone(&paginated_store)).await;
166190
},
167191
Event::PaymentClaimable {payment_id, ..} => {
168-
upsert_payment_details(&event_node, Arc::clone(&paginated_store) as Arc<dyn PaginatedKVStore>, &payment_id);
192+
if let Some(payment_details) = event_node.payment(&payment_id) {
193+
let payment = payment_to_proto(payment_details);
194+
upsert_payment_details(&event_node, Arc::clone(&paginated_store), &payment);
195+
} else {
196+
eprintln!("Unable to find payment with paymentId: {}", payment_id.to_string());
197+
}
169198
},
170199
Event::PaymentForwarded {
171200
prev_channel_id,
@@ -234,7 +263,6 @@ fn main() {
234263
event_node.event_handled();
235264
},
236265
}
237-
238266
},
239267
res = rest_svc_listener.accept() => {
240268
match res {
@@ -266,30 +294,48 @@ fn main() {
266294
println!("Shutdown complete..");
267295
}
268296

269-
fn upsert_payment_details(
270-
event_node: &Node, paginated_store: Arc<dyn PaginatedKVStore>, payment_id: &PaymentId,
297+
async fn publish_event_and_upsert_payment(
298+
payment_id: &PaymentId, payment_to_event: fn(&Payment) -> event_envelope::Event,
299+
event_node: &Node, event_publisher: Arc<dyn EventPublisher>,
300+
paginated_store: Arc<dyn PaginatedKVStore>,
271301
) {
272302
if let Some(payment_details) = event_node.payment(payment_id) {
273303
let payment = payment_to_proto(payment_details);
274-
let time =
275-
SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs()
276-
as i64;
277-
278-
match paginated_store.write(
279-
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
280-
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
281-
&payment_id.0.to_lower_hex_string(),
282-
time,
283-
&payment.encode_to_vec(),
284-
) {
285-
Ok(_) => {
286-
event_node.event_handled();
287-
},
304+
305+
let event = payment_to_event(&payment);
306+
let event_name = get_event_name(&event);
307+
match event_publisher.publish(EventEnvelope { event: Some(event) }).await {
308+
Ok(_) => {},
288309
Err(e) => {
289-
eprintln!("Failed to write payment to persistence: {}", e);
310+
println!("Failed to publish '{}' event, : {}", event_name, e);
311+
return;
290312
},
291-
}
313+
};
314+
315+
upsert_payment_details(event_node, Arc::clone(&paginated_store), &payment);
292316
} else {
293-
eprintln!("Unable to find payment with paymentId: {}", payment_id.0.to_lower_hex_string());
317+
eprintln!("Unable to find payment with paymentId: {}", payment_id);
318+
}
319+
}
320+
321+
fn upsert_payment_details(
322+
event_node: &Node, paginated_store: Arc<dyn PaginatedKVStore>, payment: &Payment,
323+
) {
324+
let time =
325+
SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as i64;
326+
327+
match paginated_store.write(
328+
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
329+
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
330+
&payment.id,
331+
time,
332+
&payment.encode_to_vec(),
333+
) {
334+
Ok(_) => {
335+
event_node.event_handled();
336+
},
337+
Err(e) => {
338+
eprintln!("Failed to write payment to persistence: {}", e);
339+
},
294340
}
295341
}

0 commit comments

Comments
 (0)