From 9b09bb1e5a3ccddf1903fc6f114ca350bf22806f Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Fri, 5 Sep 2025 13:39:26 +0100 Subject: [PATCH 1/2] Implement background job for transaction rebroadcasting Introduces a `RebroadcastPolicy` to manage the automatic rebroadcasting of unconfirmed transactions with exponential backoff. This prevents excessive network spam while systematically retrying stuck transactions. The feature is enabled by default but can be disabled via the builder: `builder.set_auto_rebroadcast_unconfirmed(false)`. Configuration options: - min_rebroadcast_interval: Base delay between attempts (seconds) - max_broadcast_attempts: Total attempts before abandonment - backoff_factor: Multiplier for exponential delay increase Sensible defaults are provided (300s, 24 attempts, 1.5x backoff). --- bindings/ldk_node.udl | 6 + src/config.rs | 61 ++++++++++ src/error.rs | 3 + src/lib.rs | 28 +++++ src/payment/onchain.rs | 13 +++ src/payment/pending_payment_store.rs | 50 +++++++-- src/wallet/mod.rs | 160 +++++++++++++++++++++++++-- tests/integration_tests_rust.rs | 4 +- 8 files changed, 305 insertions(+), 20 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c881dbe09..b3b3e9c41 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; RouteParametersConfig? route_parameters; + boolean auto_rebroadcast_unconfirmed_tx; }; dictionary AnchorChannelsConfig { @@ -266,6 +267,10 @@ interface OnchainPayment { Txid send_to_address([ByRef]Address address, u64 amount_sats, FeeRate? fee_rate); [Throws=NodeError] Txid send_all_to_address([ByRef]Address address, boolean retain_reserve, FeeRate? fee_rate); + [Throws=NodeError] + void rebroadcast_transaction(PaymentId payment_id); + [Throws=NodeError] + Txid bump_fee_rbf(PaymentId payment_id); }; interface FeeRate { @@ -351,6 +356,7 @@ enum NodeError { "InvalidBlindedPaths", "AsyncPaymentServicesDisabled", "HrnParsingFailed", + "InvalidTransaction", }; dictionary NodeStatus { diff --git a/src/config.rs b/src/config.rs index 6c9d1640a..cf70e87d1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,9 @@ const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; +const DEFAULT_MIN_REBROADCAST_INTERVAL_SECS: u64 = 300; +const DEFAULT_MAX_BROADCAST_ATTEMPTS: u32 = 24; +const DEFAULT_BACKOFF_FACTOR: f32 = 1.5; /// The default log level. pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Debug; @@ -107,6 +110,9 @@ pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a parsing/looking up an HRN resolution. pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; +// The time in-between unconfirmed transaction broadcasts. +pub(crate) const UNCONFIRMED_TX_BROADCAST_INTERVAL: Duration = Duration::from_secs(300); + #[derive(Debug, Clone)] /// Represents the configuration of an [`Node`] instance. /// @@ -128,6 +134,7 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | /// | `route_parameters` | None | +/// | `auto_rebroadcast_unconfirmed_tx` | true | /// /// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their /// respective default values. @@ -192,6 +199,16 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// This will determine whether to automatically rebroadcast unconfirmed transactions + /// (e.g., channel funding or sweep transactions). + /// + /// If enabled, the node will periodically attempt to rebroadcast any unconfirmed transactions to + /// increase propagation and confirmation likelihood. This is helpful in cases where transactions + /// were dropped by the mempool or not widely propagated. + /// + /// Defaults to `true`. Disabling this may be desired for privacy-sensitive use cases or low-bandwidth + /// environments, but may result in slower or failed confirmations if transactions are not re-announced. + pub auto_rebroadcast_unconfirmed_tx: bool, } impl Default for Config { @@ -206,6 +223,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + auto_rebroadcast_unconfirmed_tx: true, } } } @@ -561,6 +579,49 @@ pub enum AsyncPaymentsRole { Server, } +/// Policy for controlling transaction rebroadcasting behavior. +/// +/// Determines the strategy for resending unconfirmed transactions to the network +/// to ensure they remain in mempools and eventually get confirmed. +#[derive(Clone, Debug)] +pub struct RebroadcastPolicy { + /// Minimum time between rebroadcast attempts in seconds. + /// + /// This prevents excessive network traffic by ensuring a minimum delay + /// between consecutive rebroadcast attempts. + /// + /// **Recommended values**: 60-600 seconds (1-10 minutes) + pub min_rebroadcast_interval_secs: u64, + /// Maximum number of broadcast attempts before giving up. + /// + /// After reaching this limit, the transaction will no longer be rebroadcast + /// automatically. Manual intervention may be required. + /// + /// **Recommended values**: 12-48 attempts + pub max_broadcast_attempts: u32, + /// Exponential backoff factor for increasing intervals between attempts. + /// + /// Each subsequent rebroadcast wait time is multiplied by this factor, + /// creating an exponential backoff pattern. + /// + /// - `1.0`: No backoff (constant interval) + /// - `1.5`: 50% increase each attempt + /// - `2.0`: 100% increase (doubling) each attempt + /// + /// **Recommended values**: 1.2-2.0 + pub backoff_factor: f32, +} + +impl Default for RebroadcastPolicy { + fn default() -> Self { + Self { + min_rebroadcast_interval_secs: DEFAULT_MIN_REBROADCAST_INTERVAL_SECS, + max_broadcast_attempts: DEFAULT_MAX_BROADCAST_ATTEMPTS, + backoff_factor: DEFAULT_BACKOFF_FACTOR, + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/error.rs b/src/error.rs index ea0bcca3b..928dfb47a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -131,6 +131,8 @@ pub enum Error { AsyncPaymentServicesDisabled, /// Parsing a Human-Readable Name has failed. HrnParsingFailed, + /// The given transaction is invalid. + InvalidTransaction, } impl fmt::Display for Error { @@ -213,6 +215,7 @@ impl fmt::Display for Error { Self::HrnParsingFailed => { write!(f, "Failed to parse a human-readable name.") }, + Self::InvalidTransaction => write!(f, "The given transaction is invalid."), } } } diff --git a/src/lib.rs b/src/lib.rs index d2222d949..e81d12715 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,7 @@ use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + UNCONFIRMED_TX_BROADCAST_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -459,6 +460,33 @@ impl Node { } }); + // Regularly rebroadcast unconfirmed transactions. + let rebroadcast_wallet = Arc::clone(&self.wallet); + let rebroadcast_logger = Arc::clone(&self.logger); + let mut stop_rebroadcast = self.stop_sender.subscribe(); + if self.config.auto_rebroadcast_unconfirmed_tx { + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = tokio::time::interval(UNCONFIRMED_TX_BROADCAST_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_rebroadcast.changed() => { + log_debug!( + rebroadcast_logger, + "Stopping rebroadcasting unconfirmed transactions." + ); + return; + } + _ = interval.tick() => { + if let Err(e) = rebroadcast_wallet.rebroadcast_unconfirmed_transactions() { + log_error!(rebroadcast_logger, "Background rebroadcast failed: {}", e); + } + } + } + } + }); + } + // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); let bcast_pm = Arc::clone(&self.peer_manager); diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index 695f96d43..b8fea3224 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -17,6 +17,8 @@ use crate::logger::{log_info, LdkLogger, Logger}; use crate::types::{ChannelManager, Wallet}; use crate::wallet::OnchainSendAmount; +use lightning::ln::channelmanager::PaymentId; + #[cfg(not(feature = "uniffi"))] type FeeRate = bitcoin::FeeRate; #[cfg(feature = "uniffi")] @@ -120,4 +122,15 @@ impl OnchainPayment { let fee_rate_opt = maybe_map_fee_rate_opt!(fee_rate); self.wallet.send_to_address(address, send_amount, fee_rate_opt) } + + /// Manually trigger a rebroadcast of a specific transaction according to the default policy. + /// + /// This is useful if you suspect a transaction may not have propagated properly through the + /// network and you want to attempt to rebroadcast it immediately rather than waiting for the + /// automatic background job to handle it. + /// + /// updating the attempt count and last broadcast time for the transaction in the payment store. + pub fn rebroadcast_transaction(&self, payment_id: PaymentId) -> Result<(), Error> { + self.wallet.rebroadcast_transaction(payment_id) + } } diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index 580bdcbcc..3bfd971d0 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use bitcoin::Txid; +use bitcoin::{Transaction, Txid}; use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId}; use crate::{ @@ -20,11 +20,20 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// The raw transaction for rebroadcasting + pub raw_tx: Option, + /// Last broadcast attempt timestamp (UNIX seconds) + pub last_broadcast_time: Option, + /// Number of broadcast attempts + pub broadcast_attempts: Option, } impl PendingPaymentDetails { - pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + pub(crate) fn new( + details: PaymentDetails, conflicting_txids: Vec, raw_tx: Option, + last_broadcast_time: Option, broadcast_attempts: Option, + ) -> Self { + Self { details, conflicting_txids, raw_tx, last_broadcast_time, broadcast_attempts } } /// Convert to finalized payment for the main payment store @@ -36,6 +45,9 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (3, raw_tx, option), + (5, last_broadcast_time, option), + (7, broadcast_attempts, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +55,9 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub raw_tx: Option>, + pub last_broadcast_time: Option>, + pub broadcast_attempts: Option>, } impl StorableObject for PendingPaymentDetails { @@ -56,16 +71,34 @@ impl StorableObject for PendingPaymentDetails { fn update(&mut self, update: &Self::Update) -> bool { let mut updated = false; + macro_rules! update_if_necessary { + ($val:expr, $update:expr) => { + if $val != $update { + $val = $update; + updated = true; + } + }; + } + // Update the underlying payment details if present if let Some(payment_update) = &update.payment_update { updated |= self.details.update(payment_update); } if let Some(new_conflicting_txids) = &update.conflicting_txids { - if &self.conflicting_txids != new_conflicting_txids { - self.conflicting_txids = new_conflicting_txids.clone(); - updated = true; - } + update_if_necessary!(self.conflicting_txids, new_conflicting_txids.clone()); + } + + if let Some(new_raw_tx) = &update.raw_tx { + update_if_necessary!(self.raw_tx, new_raw_tx.clone()); + } + + if let Some(new_last_broadcast_time) = update.last_broadcast_time { + update_if_necessary!(self.last_broadcast_time, new_last_broadcast_time); + } + + if let Some(new_broadcast_attempts) = update.broadcast_attempts { + update_if_necessary!(self.broadcast_attempts, new_broadcast_attempts); } updated @@ -88,6 +121,9 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids: Some(value.conflicting_txids.clone()), + raw_tx: Some(value.raw_tx.clone()), + last_broadcast_time: Some(value.last_broadcast_time), + broadcast_attempts: Some(value.broadcast_attempts), } } } diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 05c743bd9..4ed85ecf6 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -29,6 +29,7 @@ use bitcoin::{ Address, Amount, FeeRate, OutPoint, ScriptBuf, Transaction, TxOut, Txid, WPubkeyHash, Weight, WitnessProgram, WitnessVersion, }; + use lightning::chain::chaininterface::BroadcasterInterface; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BestBlock, Listen}; @@ -46,7 +47,7 @@ use lightning::util::message_signing; use lightning_invoice::RawBolt11Invoice; use persist::KVStoreWalletPersister; -use crate::config::Config; +use crate::config::{Config, RebroadcastPolicy}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::payment::store::ConfirmationStatus; @@ -56,6 +57,7 @@ use crate::payment::{ use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore}; use crate::Error; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub(crate) enum OnchainSendAmount { ExactRetainingReserve { amount_sats: u64, cur_anchor_reserve_sats: u64 }, AllRetainingReserve { cur_anchor_reserve_sats: u64 }, @@ -237,8 +239,13 @@ impl Wallet { confirmation_status, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + let pending_payment = self.create_pending_payment_from_tx( + payment.clone(), + Vec::new(), + Some(&tx), + None, + None, + ); self.payment_store.insert_or_update(payment)?; self.pending_payment_store.insert_or_update(pending_payment)?; @@ -285,8 +292,13 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + let pending_payment = self.create_pending_payment_from_tx( + payment.clone(), + Vec::new(), + Some(&tx), + None, + None, + ); self.payment_store.insert_or_update(payment)?; self.pending_payment_store.insert_or_update(pending_payment)?; }, @@ -307,9 +319,15 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment_details = self - .create_pending_payment_from_tx(payment.clone(), conflict_txids.clone()); + let pending_payment_details = self.create_pending_payment_from_tx( + payment.clone(), + conflict_txids.clone(), + None, + None, + None, + ); + self.payment_store.insert_or_update(payment)?; self.pending_payment_store.insert_or_update(pending_payment_details)?; }, WalletEvent::TxDropped { txid, tx } => { @@ -324,8 +342,13 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + let pending_payment = self.create_pending_payment_from_tx( + payment.clone(), + Vec::new(), + Some(&tx), + None, + None, + ); self.payment_store.insert_or_update(payment)?; self.pending_payment_store.insert_or_update(pending_payment)?; }, @@ -957,9 +980,16 @@ impl Wallet { } fn create_pending_payment_from_tx( - &self, payment: PaymentDetails, conflicting_txids: Vec, + &self, payment: PaymentDetails, conflicting_txids: Vec, tx: Option<&Transaction>, + last_broadcast_time: Option, broadcast_attempts: Option, ) -> PendingPaymentDetails { - PendingPaymentDetails::new(payment, conflicting_txids) + PendingPaymentDetails::new( + payment, + conflicting_txids, + tx.cloned(), + last_broadcast_time, + broadcast_attempts, + ) } fn find_payment_by_txid(&self, target_txid: Txid) -> Option { @@ -978,6 +1008,114 @@ impl Wallet { None } + + pub(crate) fn rebroadcast_unconfirmed_transactions(&self) -> Result<(), Error> { + let policy = RebroadcastPolicy::default(); + let unconfirmed_payments = self.pending_payment_store.list_filter(|p| { + matches!( + p.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Unconfirmed, .. } + ) && matches!(p.details.direction, PaymentDirection::Outbound) + }); + + log_debug!(self.logger, "Found {} unconfirmed transactions", unconfirmed_payments.len()); + + if unconfirmed_payments.is_empty() { + log_info!(self.logger, "No unconfirmed transactions to rebroadcast"); + return Ok(()); + } + + let mut rebroadcast_count = 0; + + for mut pending_payment in unconfirmed_payments { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + let Some(ref raw_tx) = pending_payment.raw_tx else { + log_info!( + self.logger, + "No raw transaction data for {}", + pending_payment.details.id + ); + continue; + }; + + let should_rebroadcast = match pending_payment.last_broadcast_time { + Some(last_time) => { + let next_allowed_time = last_time + + self.calculate_backoff_interval( + (pending_payment.broadcast_attempts).unwrap_or(0), + &policy, + ); + now >= next_allowed_time + && (pending_payment.broadcast_attempts).unwrap_or(0) + < policy.max_broadcast_attempts + }, + None => true, + }; + + if !should_rebroadcast { + continue; + } + + pending_payment.last_broadcast_time = Some(now); + pending_payment.broadcast_attempts = + Some(pending_payment.broadcast_attempts.unwrap_or(0) + 1); + + self.broadcaster.broadcast_transactions(&[&raw_tx]); + rebroadcast_count += 1; + + log_info!( + self.logger, + "Rebroadcast unconfirmed transaction {}", + pending_payment.details.id + ); + + self.pending_payment_store.insert_or_update(pending_payment)?; + } + + if rebroadcast_count > 0 { + log_info!(self.logger, "Successfully rebroadcast {} transactions", rebroadcast_count); + } + + Ok(()) + } + + fn calculate_backoff_interval(&self, attempt: u32, policy: &RebroadcastPolicy) -> u64 { + let base_interval = policy.min_rebroadcast_interval_secs as f32; + let interval = base_interval * policy.backoff_factor.powi(attempt as i32); + interval.round() as u64 + } + + pub(crate) fn rebroadcast_transaction(&self, payment_id: PaymentId) -> Result<(), Error> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + if let Some(mut pending_payment) = self.pending_payment_store.get(&payment_id) { + if let Some(ref raw_tx) = pending_payment.raw_tx { + pending_payment.last_broadcast_time = Some(now); + pending_payment.broadcast_attempts = + Some(pending_payment.broadcast_attempts.unwrap_or(0) + 1); + + self.broadcaster.broadcast_transactions(&[&raw_tx]); + log_info!(self.logger, "Rebroadcast transaction {}", payment_id); + + self.pending_payment_store.insert_or_update(pending_payment)?; + + return Ok(()); + } else { + log_info!(self.logger, "No details found for payment {} in store", payment_id); + return Err(Error::InvalidPaymentId); + } + } + + log_info!(self.logger, "No details found for payment {} in store", payment_id); + return Err(Error::InvalidPaymentId); + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4e94dd044..09afe1e61 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -455,7 +455,7 @@ async fn onchain_send_receive() { let payment_a = node_a.payment(&payment_id).unwrap(); match payment_a.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, @@ -464,7 +464,7 @@ async fn onchain_send_receive() { let payment_b = node_a.payment(&payment_id).unwrap(); match payment_b.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, From aa1558afd6d3bfa03e1c552d43fe10f242f81cdb Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Fri, 30 Jan 2026 10:10:09 +0100 Subject: [PATCH 2/2] Implement RBF fee bumping for unconfirmed transactions Add `Replace-by-Fee` functionality to allow users to increase fees on pending outbound transactions, improving confirmation likelihood during network congestion. - Uses BDK's `build_fee_bump` for transaction replacement - Validates transaction eligibility: must be outbound and unconfirmed - Implements fee rate estimation with safety limits - Maintains payment history consistency across wallet updates - Includes integration tests for various RBF scenarios --- src/payment/onchain.rs | 14 +++ src/payment/pending_payment_store.rs | 5 +- src/payment/store.rs | 18 +++- src/wallet/mod.rs | 144 ++++++++++++++++++++++++++- tests/integration_tests_rust.rs | 109 +++++++++++++++++++- 5 files changed, 282 insertions(+), 8 deletions(-) diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index b8fea3224..4cf91d528 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -133,4 +133,18 @@ impl OnchainPayment { pub fn rebroadcast_transaction(&self, payment_id: PaymentId) -> Result<(), Error> { self.wallet.rebroadcast_transaction(payment_id) } + + /// Attempt to bump the fee of an unconfirmed transaction using Replace-by-Fee (RBF). + /// + /// This creates a new transaction that replaces the original one, increasing the fee by the + /// specified increment to improve its chances of confirmation. The original transaction must + /// be signaling RBF replaceability for this to succeed. + /// + /// The new transaction will have the same outputs as the original but with a + /// higher fee, resulting in faster confirmation potential. + /// + /// Returns the Txid of the new replacement transaction if successful. + pub fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + self.wallet.bump_fee_rbf(payment_id) + } } diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index 3bfd971d0..9981a61c1 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -86,7 +86,10 @@ impl StorableObject for PendingPaymentDetails { } if let Some(new_conflicting_txids) = &update.conflicting_txids { - update_if_necessary!(self.conflicting_txids, new_conflicting_txids.clone()); + // Don't overwrite existing conflicts with an empty list + if !new_conflicting_txids.is_empty() { + update_if_necessary!(self.conflicting_txids, new_conflicting_txids.clone()); + } } if let Some(new_raw_tx) = &update.raw_tx { diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..f16b1f32e 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -291,6 +291,15 @@ impl StorableObject for PaymentDetails { } } + if let Some(tx_id) = update.txid { + match self.kind { + PaymentKind::Onchain { ref mut txid, .. } => { + update_if_necessary!(*txid, tx_id); + }, + _ => {}, + } + } + if updated { self.latest_update_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -540,6 +549,7 @@ pub(crate) struct PaymentDetailsUpdate { pub direction: Option, pub status: Option, pub confirmation_status: Option, + pub txid: Option, } impl PaymentDetailsUpdate { @@ -555,6 +565,7 @@ impl PaymentDetailsUpdate { direction: None, status: None, confirmation_status: None, + txid: None, } } } @@ -570,9 +581,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { _ => (None, None, None), }; - let confirmation_status = match value.kind { - PaymentKind::Onchain { status, .. } => Some(status), - _ => None, + let (confirmation_status, txid) = match &value.kind { + PaymentKind::Onchain { status, txid, .. } => (Some(*status), Some(*txid)), + _ => (None, None), }; let counterparty_skimmed_fee_msat = match value.kind { @@ -593,6 +604,7 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { direction: Some(value.direction), status: Some(value.status), confirmation_status, + txid, } } } diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 4ed85ecf6..a6cd5aee5 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_wallet::descriptor::ExtendedDescriptor; +use bdk_wallet::error::{BuildFeeBumpError, CreateTxError}; use bdk_wallet::event::WalletEvent; #[allow(deprecated)] use bdk_wallet::SignOptions; @@ -311,9 +312,11 @@ impl Wallet { let conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); + // Use the last transaction id in the conflicts as the new txid + let new_txid = conflicts.last().map(|(_, new_tx)| *new_tx).unwrap_or(txid); let payment = self.create_payment_from_tx( locked_wallet, - txid, + new_txid, payment_id, &tx, PaymentStatus::Pending, @@ -1116,6 +1119,145 @@ impl Wallet { log_info!(self.logger, "No details found for payment {} in store", payment_id); return Err(Error::InvalidPaymentId); } + + #[allow(deprecated)] + pub(crate) fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + let payment = self.payment_store.get(&payment_id).ok_or(Error::InvalidPaymentId)?; + + let mut locked_wallet = self.inner.lock().unwrap(); + + if payment.direction != PaymentDirection::Outbound { + log_error!(self.logger, "Transaction {} is not an outbound payment", payment_id); + return Err(Error::InvalidPaymentId); + } + + if let PaymentKind::Onchain { status, .. } = &payment.kind { + match status { + ConfirmationStatus::Confirmed { .. } => { + log_error!( + self.logger, + "Transaction {} is already confirmed and cannot be fee bumped", + payment_id + ); + return Err(Error::InvalidPaymentId); + }, + ConfirmationStatus::Unconfirmed => {}, + } + } + + let txid = match &payment.kind { + PaymentKind::Onchain { txid, .. } => *txid, + _ => return Err(Error::InvalidPaymentId), + }; + + let confirmation_target = ConfirmationTarget::OnchainPayment; + let estimated_fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target); + + log_info!(self.logger, "Bumping fee to {}", estimated_fee_rate); + + let mut psbt = { + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump failed for {}: {:?}", txid, e); + match e { + BuildFeeBumpError::TransactionNotFound(_) => Error::InvalidPaymentId, + BuildFeeBumpError::TransactionConfirmed(_) => Error::InvalidPaymentId, + BuildFeeBumpError::IrreplaceableTransaction(_) => Error::InvalidPaymentId, + BuildFeeBumpError::FeeRateUnavailable => Error::InvalidPaymentId, + _ => Error::InvalidFeeRate, + } + })?; + + builder.fee_rate(estimated_fee_rate); + + match builder.finish() { + Ok(psbt) => Ok(psbt), + Err(CreateTxError::FeeRateTooLow { required }) => { + log_info!(self.logger, "BDK requires higher fee rate: {}", required); + + // Safety check + const MAX_REASONABLE_FEE_RATE_SAT_VB: u64 = 1000; + if required.to_sat_per_vb_ceil() > MAX_REASONABLE_FEE_RATE_SAT_VB { + log_error!( + self.logger, + "BDK requires unreasonably high fee rate: {} sat/vB", + required.to_sat_per_vb_ceil() + ); + return Err(Error::InvalidFeeRate); + } + + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump retry failed for {}: {:?}", txid, e); + Error::InvalidFeeRate + })?; + + builder.fee_rate(required); + builder.finish().map_err(|e| { + log_error!( + self.logger, + "Failed to finish PSBT with required fee rate: {:?}", + e + ); + Error::InvalidFeeRate + }) + }, + Err(e) => { + log_error!(self.logger, "Failed to create fee bump PSBT: {:?}", e); + Err(Error::InvalidFeeRate) + }, + }? + }; + + match locked_wallet.sign(&mut psbt, SignOptions::default()) { + Ok(finalized) => { + if !finalized { + return Err(Error::OnchainTxCreationFailed); + } + }, + Err(err) => { + log_error!(self.logger, "Failed to create transaction: {}", err); + return Err(err.into()); + }, + } + + let mut locked_persister = self.persister.lock().unwrap(); + locked_wallet.persist(&mut locked_persister).map_err(|e| { + log_error!(self.logger, "Failed to persist wallet: {}", e); + Error::PersistenceFailed + })?; + + let fee_bumped_tx = psbt.extract_tx().map_err(|e| { + log_error!(self.logger, "Failed to extract transaction: {}", e); + e + })?; + + let new_txid = fee_bumped_tx.compute_txid(); + + self.broadcaster.broadcast_transactions(&[&fee_bumped_tx]); + + let new_payment = self.create_payment_from_tx( + &locked_wallet, + new_txid, + payment.id, + &fee_bumped_tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + + let pending_payment_store = self.create_pending_payment_from_tx( + new_payment.clone(), + Vec::new(), + Some(&fee_bumped_tx), + Some(0), + Some(0), + ); + + self.pending_payment_store.insert_or_update(pending_payment_store)?; + self.payment_store.insert_or_update(new_payment)?; + + log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); + + Ok(new_txid) + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 09afe1e61..6089ad810 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use bitcoin::address::NetworkUnchecked; use bitcoin::hashes::sha256::Hash as Sha256Hash; use bitcoin::hashes::Hash; -use bitcoin::{Address, Amount, ScriptBuf}; +use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, @@ -685,8 +685,111 @@ async fn onchain_wallet_recovery() { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_rbf_via_mempool() { - run_rbf_test(false).await; +async fn onchain_fee_bump_rbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + // Fund both nodes + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 500_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Send a transaction from node_b to node_a that we'll later bump + let amount_to_send_sats = 100_000; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let original_payment = node_b.payment(&payment_id).unwrap(); + let original_fee = original_payment.fee_paid_msat.unwrap(); + + // Non-existent payment id + let fake_txid = + Txid::from_str("0000000000000000000000000000000000000000000000000000000000000000").unwrap(); + let invalid_payment_id = PaymentId(fake_txid.to_byte_array()); + assert_eq!( + Err(NodeError::InvalidPaymentId), + node_b.onchain_payment().bump_fee_rbf(invalid_payment_id) + ); + + // Bump an inbound payment + assert_eq!(Err(NodeError::InvalidPaymentId), node_a.onchain_payment().bump_fee_rbf(payment_id)); + + // Successful fee bump + let new_txid = node_b.onchain_payment().bump_fee_rbf(payment_id).unwrap(); + wait_for_tx(&electrsd.client, new_txid).await; + + // Sleep to allow for transaction propagation + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify fee increased + let new_payment = node_b.payment(&payment_id).unwrap(); + assert!( + new_payment.fee_paid_msat > Some(original_fee), + "Fee should increase after RBF bump. Original: {}, New: {}", + original_fee, + new_payment.fee_paid_msat.unwrap() + ); + + // Multiple consecutive bumps + let second_bump_txid = node_b.onchain_payment().bump_fee_rbf(payment_id).unwrap(); + wait_for_tx(&electrsd.client, second_bump_txid).await; + + // Sleep to allow for transaction propagation + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify second bump payment exists + let second_payment = node_b.payment(&payment_id).unwrap(); + assert!( + second_payment.fee_paid_msat > new_payment.fee_paid_msat, + "Second bump should have higher fee than first bump" + ); + + // Confirm the transaction and try to bump again (should fail) + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + assert_eq!(Err(NodeError::InvalidPaymentId), node_b.onchain_payment().bump_fee_rbf(payment_id)); + + // Verify final payment is confirmed + let final_payment = node_b.payment(&payment_id).unwrap(); + assert_eq!(final_payment.status, PaymentStatus::Succeeded); + match final_payment.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + + // Verify node A received the funds correctly + let node_a_received_payment = node_a.list_payments_with_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == second_bump_txid), + ); + assert_eq!(node_a_received_payment.len(), 1); + assert_eq!(node_a_received_payment[0].amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(node_a_received_payment[0].status, PaymentStatus::Succeeded); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)]