diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 57f0ca87d45..ff499d049d4 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -2784,17 +2784,13 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { } // If we finish updating the monitor, we should free the holding cell right away (this did - // not occur prior to #756). + // not occur prior to #756). This should result in a new monitor update. chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let (mon_id, _) = get_latest_mon_update_id(&nodes[0], chan_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id, mon_id); expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); - - // New outbound messages should be generated immediately upon a call to - // get_and_clear_pending_msg_events (but not before). - check_added_monitors(&nodes[0], 0); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); check_added_monitors(&nodes[0], 1); + let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); // Deliver the pending in-flight CS @@ -3556,12 +3552,10 @@ fn do_test_blocked_chan_preimage_release(completion_mode: BlockedUpdateComplMode } // The event processing should release the last RAA update. - check_added_monitors(&nodes[1], 1); - - // When we fetch the next update the message getter will generate the next update for nodes[2], - // generating a further monitor update. + // It should also generate the next update for nodes[2]. + check_added_monitors(&nodes[1], 2); let mut bs_htlc_fulfill = get_htlc_update_msgs(&nodes[1], &node_c_id); - check_added_monitors(&nodes[1], 1); + check_added_monitors(&nodes[1], 0); nodes[2] .node @@ -5142,13 +5136,12 @@ fn test_mpp_claim_to_holding_cell() { nodes[3].chain_monitor.chain_monitor.channel_monitor_updated(chan_4_id, latest_id).unwrap(); // Once we process monitor events (in this case by checking for the `PaymentClaimed` event, the // RAA monitor update blocked above will be released. + // At the same time, the RAA monitor update completion will allow the C <-> D channel to + // generate its fulfill update. expect_payment_claimed!(nodes[3], paymnt_hash_1, 500_000); - check_added_monitors(&nodes[3], 1); - - // After the RAA monitor update completes, the C <-> D channel will be able to generate its - // fulfill updates as well. + check_added_monitors(&nodes[3], 2); let mut c_claim = get_htlc_update_msgs(&nodes[3], &node_c_id); - check_added_monitors(&nodes[3], 1); + check_added_monitors(&nodes[3], 0); // Finally, clear all the pending payments. let path = [&[&nodes[1], &nodes[3]][..], &[&nodes[2], &nodes[3]][..]]; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1ab87a70c72..778897bc292 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3223,6 +3223,14 @@ pub struct PhantomRouteHints { pub real_node_pubkey: PublicKey, } +/// The return type of [`ChannelManager::check_free_peer_holding_cells`] +type FreeHoldingCellsResult = Vec<( + ChannelId, + PublicKey, + Option, + Vec<(HTLCSource, PaymentHash)>, +)>; + macro_rules! insert_short_channel_id { ($short_to_chan_info: ident, $channel: expr) => {{ if let Some(real_scid) = $channel.funding.get_short_channel_id() { @@ -3430,6 +3438,7 @@ macro_rules! process_events_body { } if !post_event_actions.is_empty() { + let _read_guard = $self.total_consistency_lock.read().unwrap(); $self.handle_post_event_actions(post_event_actions); // If we had some actions, go around again as we may have more events now processed_all_events = false; @@ -7115,10 +7124,6 @@ where } self.forward_htlcs(&mut phantom_receives); - // Freeing the holding cell here is relatively redundant - in practice we'll do it when we - // next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's - // nice to do the work now if we can rather than while we're trying to get messages in the - // network stack. if self.check_free_holding_cells() { should_persist = NotifyOption::DoPersist; } @@ -8316,10 +8321,21 @@ where self.check_refresh_async_receive_offer_cache(true); - // Technically we don't need to do this here, but if we have holding cell entries in a - // channel that need freeing, it's better to do that here and block a background task - // than block the message queueing pipeline. if self.check_free_holding_cells() { + // While we try to ensure we clear holding cells immediately, its possible we miss + // one somewhere. Thus, its useful to try regularly to ensure even if something + // gets stuck its only for a minute or so. Still, good to panic here in debug to + // ensure we discover the missing free. + // Note that in cases where we had a fee update in the loop above, we expect to + // need to free holding cells now, thus we only report an error if `should_persist` + // has not been updated to `DoPersist`. + if should_persist != NotifyOption::DoPersist { + debug_assert!(false, "Holding cells are cleared immediately"); + log_error!( + self.logger, + "Holding cells were freed in last-ditch cleanup. Please report this (performance) bug." + ); + } should_persist = NotifyOption::DoPersist; } @@ -10199,10 +10215,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ chan, ); + let holding_cell_res = self.check_free_peer_holding_cells(peer_state); + mem::drop(peer_state_lock); mem::drop(per_peer_state); self.handle_post_monitor_update_chan_resume(completion_data); + self.handle_holding_cell_free_result(holding_cell_res); } else { log_trace!(logger, "Channel is open but not awaiting update"); } @@ -12246,7 +12265,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ #[rustfmt::skip] fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { - let (inferred_splice_locked, need_lnd_workaround) = { + let (inferred_splice_locked, need_lnd_workaround, holding_cell_res) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -12307,7 +12326,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ peer_state.pending_msg_events.push(upd); } - (responses.inferred_splice_locked, need_lnd_workaround) + let holding_cell_res = self.check_free_peer_holding_cells(peer_state); + (responses.inferred_splice_locked, need_lnd_workaround, holding_cell_res) } else { return try_channel_entry!(self, peer_state, Err(ChannelError::close( "Got a channel_reestablish message for an unfunded channel!".into())), chan_entry); @@ -12350,6 +12370,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } }; + self.handle_holding_cell_free_result(holding_cell_res); + if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } @@ -12686,70 +12708,83 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ has_pending_monitor_events } + fn handle_holding_cell_free_result(&self, result: FreeHoldingCellsResult) { + debug_assert_ne!( + self.total_consistency_lock.held_by_thread(), + LockHeldState::NotHeldByThread + ); + for (chan_id, cp_node_id, post_update_data, failed_htlcs) in result { + if let Some(data) = post_update_data { + self.handle_post_monitor_update_chan_resume(data); + } + + self.fail_holding_cell_htlcs(failed_htlcs, chan_id, &cp_node_id); + self.needs_persist_flag.store(true, Ordering::Release); + self.event_persist_notifier.notify(); + } + } + + /// Frees all holding cells in all the channels for a peer. + /// + /// Includes elements in the returned Vec only for channels which changed (implying persistence + /// is required). + #[must_use] + fn check_free_peer_holding_cells( + &self, peer_state: &mut PeerState, + ) -> FreeHoldingCellsResult { + debug_assert_ne!( + self.total_consistency_lock.held_by_thread(), + LockHeldState::NotHeldByThread + ); + + let mut updates = Vec::new(); + let funded_chan_iter = peer_state + .channel_by_id + .iter_mut() + .filter_map(|(chan_id, chan)| chan.as_funded_mut().map(|chan| (chan_id, chan))); + for (chan_id, chan) in funded_chan_iter { + let (monitor_opt, holding_cell_failed_htlcs) = chan.maybe_free_holding_cell_htlcs( + &self.fee_estimator, + &&WithChannelContext::from(&self.logger, &chan.context, None), + ); + if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { + let update_res = monitor_opt + .map(|monitor_update| { + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + chan.funding.get_funding_txo().unwrap(), + monitor_update, + ) + }) + .flatten(); + let cp_node_id = chan.context.get_counterparty_node_id(); + updates.push((*chan_id, cp_node_id, update_res, holding_cell_failed_htlcs)); + } + } + updates + } + /// Check the holding cell in each channel and free any pending HTLCs in them if possible. /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor /// update was applied. fn check_free_holding_cells(&self) -> bool { - let mut has_monitor_update = false; - let mut failed_htlcs = Vec::new(); + let mut unlocked_results = Vec::new(); - // Walk our list of channels and find any that need to update. Note that when we do find an - // update, if it includes actions that must be taken afterwards, we have to drop the - // per-peer state lock as well as the top level per_peer_state lock. Thus, we loop until we - // manage to go through all our peers without finding a single channel to update. - 'peer_loop: loop { + { let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { - 'chan_loop: loop { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state: &mut PeerState<_> = &mut *peer_state_lock; - for (channel_id, chan) in - peer_state.channel_by_id.iter_mut().filter_map(|(chan_id, chan)| { - chan.as_funded_mut().map(|chan| (chan_id, chan)) - }) { - let counterparty_node_id = chan.context.get_counterparty_node_id(); - let funding_txo = chan.funding.get_funding_txo(); - let (monitor_opt, holding_cell_failed_htlcs) = chan - .maybe_free_holding_cell_htlcs( - &self.fee_estimator, - &&WithChannelContext::from(&self.logger, &chan.context, None), - ); - if !holding_cell_failed_htlcs.is_empty() { - failed_htlcs.push(( - holding_cell_failed_htlcs, - *channel_id, - counterparty_node_id, - )); - } - if let Some(monitor_update) = monitor_opt { - has_monitor_update = true; - - if let Some(data) = self.handle_new_monitor_update( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - funding_txo.unwrap(), - monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } - continue 'peer_loop; - } - } - break 'chan_loop; - } + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state: &mut PeerState<_> = &mut *peer_state_lock; + unlocked_results.append(&mut self.check_free_peer_holding_cells(peer_state)); } - break 'peer_loop; } - let has_update = has_monitor_update || !failed_htlcs.is_empty(); - for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) { - self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id); - } + let has_update = !unlocked_results.is_empty(); + self.handle_holding_cell_free_result(unlocked_results); has_update } @@ -13081,27 +13116,32 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ #[cfg(any(test, fuzzing))] #[rustfmt::skip] pub fn exit_quiescence(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) -> Result { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex = per_peer_state.get(counterparty_node_id) - .ok_or_else(|| APIError::ChannelUnavailable { - err: format!("Can't find a peer matching the passed counterparty node_id {counterparty_node_id}") - })?; - let mut peer_state = peer_state_mutex.lock().unwrap(); - let initiator = match peer_state.channel_by_id.entry(*channel_id) { - hash_map::Entry::Occupied(mut chan_entry) => { - if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - chan.exit_quiescence() - } else { - return Err(APIError::APIMisuseError { - err: format!("Unfunded channel {} cannot be quiescent", channel_id), - }) - } - }, - hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable { - err: format!("Channel with id {} not found for the passed counterparty node_id {}", - channel_id, counterparty_node_id), - }), + let _read_guard = self.total_consistency_lock.read().unwrap(); + + let initiator = { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { + err: format!("Can't find a peer matching the passed counterparty node_id {counterparty_node_id}") + })?; + let mut peer_state = peer_state_mutex.lock().unwrap(); + match peer_state.channel_by_id.entry(*channel_id) { + hash_map::Entry::Occupied(mut chan_entry) => { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + chan.exit_quiescence() + } else { + return Err(APIError::APIMisuseError { + err: format!("Unfunded channel {} cannot be quiescent", channel_id), + }) + } + }, + hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", + channel_id, counterparty_node_id), + }), + } }; + self.check_free_holding_cells(); Ok(initiator) } @@ -14165,7 +14205,7 @@ where if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(logger, "Unlocking monitor updating and updating monitor", ); - if let Some(data) = self.handle_new_monitor_update( + let post_update_data = self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, &mut peer_state.monitor_update_blocked_actions, &mut peer_state.pending_msg_events, @@ -14173,11 +14213,18 @@ where chan, channel_funding_outpoint, monitor_update, - ) { - mem::drop(peer_state_lck); - mem::drop(per_peer_state); + ); + let holding_cell_res = self.check_free_peer_holding_cells(peer_state); + + mem::drop(peer_state_lck); + mem::drop(per_peer_state); + + if let Some(data) = post_update_data { self.handle_post_monitor_update_chan_resume(data); } + + self.handle_holding_cell_free_result(holding_cell_res); + if further_update_exists { // If there are more `ChannelMonitorUpdate`s to process, restart at the // top of the loop. @@ -14596,19 +14643,32 @@ where PersistenceNotifierGuard::optionally_notify(self, || { let mut result = NotifyOption::SkipPersistNoEvents; + // This method is quite performance-sensitive. Not only is it called very often, but it + // *is* the critical path between generating a message for a peer and giving it to the + // `PeerManager` to send. Thus, we should avoid adding any more logic here than we + // need, especially anything that might end up causing I/O (like a + // `ChannelMonitorUpdate`)! + // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. if self.process_pending_monitor_events() { result = NotifyOption::DoPersist; } - if self.check_free_holding_cells() { - result = NotifyOption::DoPersist; - } if self.maybe_generate_initial_closing_signed() { result = NotifyOption::DoPersist; } + #[cfg(test)] + if self.check_free_holding_cells() { + // In tests, we want to ensure that we never forget to free holding cells + // immediately, so we check it here. + // Note that we can't turn this on for `debug_assertions` because there's a race in + // (at least) the fee-update logic in `timer_tick_occurred` which can lead to us + // freeing holding cells here while its running. + debug_assert!(false, "Holding cells should always be auto-free'd"); + } + // Quiescence is an in-memory protocol, so we don't have to persist because of it. self.maybe_send_stfu();