Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Commit 971641c

Browse files
committed
Handle HTLCs around a successfully forwarded payment
This will: - Queue any additional HTLCs coming in before a successful forward. - Forward queued commits only when the payment is forwarded successfully. - Immediately forward any future HTLCs after the successful forward.
1 parent 0384ccd commit 971641c

File tree

1 file changed

+174
-53
lines changed

1 file changed

+174
-53
lines changed

src/lsps2/service.rs

Lines changed: 174 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ struct FeePayment {
6161
opening_fee_msat: u64,
6262
}
6363

64+
#[derive(Debug)]
6465
struct ChannelStateError(String);
6566

6667
impl From<ChannelStateError> for LightningError {
@@ -69,6 +70,23 @@ impl From<ChannelStateError> for LightningError {
6970
}
7071
}
7172

73+
/// Possible actions that need to be taken when an HTLC is intercepted.
74+
#[derive(Debug, PartialEq)]
75+
enum HTLCInterceptedAction {
76+
/// The opening of the JIT channel.
77+
OpenChannel(OpenChannelParams),
78+
/// The forwarding of the intercepted HTLC.
79+
ForwardHTLC(ChannelId),
80+
}
81+
82+
/// The forwarding of a payment while skimming the JIT channel opening fee.
83+
#[derive(Debug, PartialEq)]
84+
struct ForwardPaymentAction(ChannelId, FeePayment);
85+
86+
/// The forwarding of previously intercepted HTLCs without skimming any further fees.
87+
#[derive(Debug, PartialEq)]
88+
struct ForwardHTLCsAction(ChannelId, Vec<InterceptedHTLC>);
89+
7290
/// The different states a requested JIT channel can be in.
7391
#[derive(Debug)]
7492
enum OutboundJITChannelState {
@@ -79,10 +97,14 @@ enum OutboundJITChannelState {
7997
/// opening of the channel. We are awaiting the completion of the channel establishment.
8098
PendingChannelOpen { payment_queue: Arc<Mutex<PaymentQueue>>, opening_fee_msat: u64 },
8199
/// The channel is open and a payment was forwarded while skimming the JIT channel fee.
82-
PendingPaymentForward { payment_queue: Arc<Mutex<PaymentQueue>>, _opening_fee_msat: u64 },
100+
PendingPaymentForward {
101+
payment_queue: Arc<Mutex<PaymentQueue>>,
102+
opening_fee_msat: u64,
103+
channel_id: ChannelId,
104+
},
83105
/// The channel is open and a payment was successfully forwarded while skimming the JIT channel
84106
/// fee. Any subsequent HTLCs can be forwarded without additional logic.
85-
PaymentForwarded,
107+
PaymentForwarded { channel_id: ChannelId },
86108
}
87109

88110
impl OutboundJITChannelState {
@@ -95,7 +117,7 @@ impl OutboundJITChannelState {
95117
fn htlc_intercepted(
96118
&mut self, opening_fee_params: &OpeningFeeParams, payment_size_msat: &Option<u64>,
97119
htlc: InterceptedHTLC,
98-
) -> Result<(Self, Option<OpenChannelParams>), ChannelStateError> {
120+
) -> Result<(Self, Option<HTLCInterceptedAction>), ChannelStateError> {
99121
match self {
100122
OutboundJITChannelState::PendingInitialPayment { payment_queue } => {
101123
let (total_expected_outbound_amount_msat, num_htlcs) =
@@ -148,9 +170,11 @@ impl OutboundJITChannelState {
148170
payment_queue: Arc::clone(&payment_queue),
149171
opening_fee_msat,
150172
};
151-
let open_channel_params =
152-
OpenChannelParams { opening_fee_msat, amt_to_forward_msat };
153-
Ok((pending_channel_open, Some(open_channel_params)))
173+
let open_channel = HTLCInterceptedAction::OpenChannel(OpenChannelParams {
174+
opening_fee_msat,
175+
amt_to_forward_msat,
176+
});
177+
Ok((pending_channel_open, Some(open_channel)))
154178
} else {
155179
if mpp_mode {
156180
let pending_initial_payment =
@@ -165,14 +189,41 @@ impl OutboundJITChannelState {
165189
}
166190
}
167191
},
168-
state => Err(ChannelStateError(format!(
169-
"Intercepted HTLC when JIT Channel was in state: {:?}",
170-
state
171-
))),
192+
OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
193+
let mut payment_queue_lock = payment_queue.lock().unwrap();
194+
payment_queue_lock.add_htlc(htlc);
195+
let pending_channel_open = OutboundJITChannelState::PendingChannelOpen {
196+
payment_queue: payment_queue.clone(),
197+
opening_fee_msat: *opening_fee_msat,
198+
};
199+
Ok((pending_channel_open, None))
200+
},
201+
OutboundJITChannelState::PendingPaymentForward {
202+
payment_queue,
203+
opening_fee_msat,
204+
channel_id,
205+
} => {
206+
let mut payment_queue_lock = payment_queue.lock().unwrap();
207+
payment_queue_lock.add_htlc(htlc);
208+
let pending_payment_forward = OutboundJITChannelState::PendingPaymentForward {
209+
payment_queue: payment_queue.clone(),
210+
opening_fee_msat: *opening_fee_msat,
211+
channel_id: *channel_id,
212+
};
213+
Ok((pending_payment_forward, None))
214+
},
215+
OutboundJITChannelState::PaymentForwarded { channel_id } => {
216+
let payment_forwarded =
217+
OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
218+
let forward = HTLCInterceptedAction::ForwardHTLC(*channel_id);
219+
Ok((payment_forwarded, Some(forward)))
220+
},
172221
}
173222
}
174223

175-
fn channel_ready(&self) -> Result<(Self, FeePayment), ChannelStateError> {
224+
fn channel_ready(
225+
&self, channel_id: ChannelId,
226+
) -> Result<(Self, ForwardPaymentAction), ChannelStateError> {
176227
match self {
177228
OutboundJITChannelState::PendingChannelOpen { payment_queue, opening_fee_msat } => {
178229
let mut payment_queue_lock = payment_queue.lock().unwrap();
@@ -181,10 +232,14 @@ impl OutboundJITChannelState {
181232
{
182233
let pending_payment_forward = OutboundJITChannelState::PendingPaymentForward {
183234
payment_queue: Arc::clone(&payment_queue),
184-
_opening_fee_msat: *opening_fee_msat,
235+
opening_fee_msat: *opening_fee_msat,
236+
channel_id,
185237
};
186-
let fee_payment = FeePayment { opening_fee_msat: *opening_fee_msat, htlcs };
187-
Ok((pending_payment_forward, fee_payment))
238+
let forward_payment = ForwardPaymentAction(
239+
channel_id,
240+
FeePayment { opening_fee_msat: *opening_fee_msat, htlcs },
241+
);
242+
Ok((pending_payment_forward, forward_payment))
188243
} else {
189244
Err(ChannelStateError(
190245
"No forwardable payment available when moving to channel ready."
@@ -199,11 +254,23 @@ impl OutboundJITChannelState {
199254
}
200255
}
201256

202-
fn payment_forwarded(&mut self) -> Result<(Self, Vec<InterceptedHTLC>), ChannelStateError> {
257+
fn payment_forwarded(
258+
&mut self,
259+
) -> Result<(Self, Option<ForwardHTLCsAction>), ChannelStateError> {
203260
match self {
204-
OutboundJITChannelState::PendingPaymentForward { payment_queue, .. } => {
261+
OutboundJITChannelState::PendingPaymentForward {
262+
payment_queue, channel_id, ..
263+
} => {
205264
let mut payment_queue_lock = payment_queue.lock().unwrap();
206-
Ok((OutboundJITChannelState::PaymentForwarded, payment_queue_lock.clear()))
265+
let payment_forwarded =
266+
OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
267+
let forward_htlcs = ForwardHTLCsAction(*channel_id, payment_queue_lock.clear());
268+
Ok((payment_forwarded, Some(forward_htlcs)))
269+
},
270+
OutboundJITChannelState::PaymentForwarded { channel_id } => {
271+
let payment_forwarded =
272+
OutboundJITChannelState::PaymentForwarded { channel_id: *channel_id };
273+
Ok((payment_forwarded, None))
207274
},
208275
state => Err(ChannelStateError(format!(
209276
"Payment forwarded when JIT Channel was in state: {:?}",
@@ -234,23 +301,25 @@ impl OutboundJITChannel {
234301

235302
fn htlc_intercepted(
236303
&mut self, htlc: InterceptedHTLC,
237-
) -> Result<Option<OpenChannelParams>, LightningError> {
238-
let (new_state, open_channel_params) =
304+
) -> Result<Option<HTLCInterceptedAction>, LightningError> {
305+
let (new_state, action) =
239306
self.state.htlc_intercepted(&self.opening_fee_params, &self.payment_size_msat, htlc)?;
240307
self.state = new_state;
241-
Ok(open_channel_params)
308+
Ok(action)
242309
}
243310

244-
fn channel_ready(&mut self) -> Result<FeePayment, LightningError> {
245-
let (new_state, payment) = self.state.channel_ready()?;
311+
fn channel_ready(
312+
&mut self, channel_id: ChannelId,
313+
) -> Result<ForwardPaymentAction, LightningError> {
314+
let (new_state, action) = self.state.channel_ready(channel_id)?;
246315
self.state = new_state;
247-
Ok(payment)
316+
Ok(action)
248317
}
249318

250-
fn payment_forwarded(&mut self) -> Result<Vec<InterceptedHTLC>, LightningError> {
251-
let (new_state, payments) = self.state.payment_forwarded()?;
319+
fn payment_forwarded(&mut self) -> Result<Option<ForwardHTLCsAction>, LightningError> {
320+
let (new_state, action) = self.state.payment_forwarded()?;
252321
self.state = new_state;
253-
Ok(payments)
322+
Ok(action)
254323
}
255324
}
256325

@@ -482,7 +551,7 @@ where
482551
payment_hash,
483552
};
484553
match jit_channel.htlc_intercepted(htlc) {
485-
Ok(Some(open_channel_params)) => {
554+
Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => {
486555
self.enqueue_event(Event::LSPS2Service(
487556
LSPS2ServiceEvent::OpenChannel {
488557
their_network_key: counterparty_node_id.clone(),
@@ -494,6 +563,14 @@ where
494563
},
495564
));
496565
},
566+
Ok(Some(HTLCInterceptedAction::ForwardHTLC(channel_id))) => {
567+
self.channel_manager.get_cm().forward_intercepted_htlc(
568+
intercept_id,
569+
&channel_id,
570+
*counterparty_node_id,
571+
expected_outbound_amount_msat,
572+
)?;
573+
},
497574
Ok(None) => {},
498575
Err(e) => {
499576
self.channel_manager
@@ -519,6 +596,70 @@ where
519596
Ok(())
520597
}
521598

599+
/// Forward [`Event::PaymentForwarded`] event parameter into this function.
600+
///
601+
/// Will register the forwarded payment as having paid the JIT channel fee, and forward any held
602+
/// and future HTLCs for the SCID of the initial invoice. In the future, this will verify the
603+
/// `skimmed_fee_msat` in [`Event::PaymentForwarded`].
604+
///
605+
/// Note that `next_channel_id` is required to be provided. Therefore, the corresponding
606+
/// [`Event::PaymentForwarded`] events need to be generated and serialized by LDK versions
607+
/// greater or equal to 0.0.107.
608+
///
609+
/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
610+
pub fn payment_forwarded(&self, next_channel_id: ChannelId) -> Result<(), APIError> {
611+
if let Some(counterparty_node_id) =
612+
self.peer_by_channel_id.read().unwrap().get(&next_channel_id)
613+
{
614+
let outer_state_lock = self.per_peer_state.read().unwrap();
615+
match outer_state_lock.get(&counterparty_node_id) {
616+
Some(inner_state_lock) => {
617+
let mut peer_state = inner_state_lock.lock().unwrap();
618+
if let Some(intercept_scid) =
619+
peer_state.intercept_scid_by_channel_id.get(&next_channel_id).copied()
620+
{
621+
if let Some(jit_channel) =
622+
peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
623+
{
624+
match jit_channel.payment_forwarded() {
625+
Ok(Some(ForwardHTLCsAction(channel_id, htlcs))) => {
626+
for htlc in htlcs {
627+
self.channel_manager.get_cm().forward_intercepted_htlc(
628+
htlc.intercept_id,
629+
&channel_id,
630+
*counterparty_node_id,
631+
htlc.expected_outbound_amount_msat,
632+
)?;
633+
}
634+
},
635+
Ok(None) => {},
636+
Err(e) => {
637+
return Err(APIError::APIMisuseError {
638+
err: format!(
639+
"Forwarded payment was not applicable for JIT channel: {}",
640+
e.err
641+
),
642+
})
643+
},
644+
}
645+
}
646+
} else {
647+
return Err(APIError::APIMisuseError {
648+
err: format!("No state for for channel id: {}", next_channel_id),
649+
});
650+
}
651+
},
652+
None => {
653+
return Err(APIError::APIMisuseError {
654+
err: format!("No counterparty state for: {}", counterparty_node_id),
655+
});
656+
},
657+
}
658+
}
659+
660+
Ok(())
661+
}
662+
522663
/// Forward [`Event::ChannelReady`] event parameters into this function.
523664
///
524665
/// Will forward the intercepted HTLC if it matches a channel
@@ -543,8 +684,11 @@ where
543684
if let Some(jit_channel) =
544685
peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid)
545686
{
546-
match jit_channel.channel_ready() {
547-
Ok(FeePayment { opening_fee_msat, htlcs }) => {
687+
match jit_channel.channel_ready(*channel_id) {
688+
Ok(ForwardPaymentAction(
689+
channel_id,
690+
FeePayment { opening_fee_msat, htlcs },
691+
)) => {
548692
let amounts_to_forward_msat =
549693
calculate_amount_to_forward_per_htlc(&htlcs, opening_fee_msat);
550694

@@ -553,34 +697,11 @@ where
553697
{
554698
self.channel_manager.get_cm().forward_intercepted_htlc(
555699
intercept_id,
556-
channel_id,
700+
&channel_id,
557701
*counterparty_node_id,
558702
amount_to_forward_msat,
559703
)?;
560704
}
561-
562-
match jit_channel.payment_forwarded() {
563-
Ok(htlcs) => {
564-
for htlc in htlcs {
565-
self.channel_manager
566-
.get_cm()
567-
.forward_intercepted_htlc(
568-
htlc.intercept_id,
569-
channel_id,
570-
*counterparty_node_id,
571-
htlc.expected_outbound_amount_msat,
572-
)?;
573-
}
574-
},
575-
Err(e) => {
576-
return Err(APIError::APIMisuseError {
577-
err: format!(
578-
"Failed to free queued payments: {}",
579-
e.err
580-
),
581-
})
582-
},
583-
}
584705
},
585706
Err(e) => {
586707
return Err(APIError::APIMisuseError {

0 commit comments

Comments
 (0)