Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ prost = { version = "0.11.6", default-features = false}
#bitcoin-payment-instructions = { version = "0.6" }
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ea50a9d2a8da524b69a2af43233706666cf2ffa5" }

payjoin = { git = "https://github.com/payjoin/rust-payjoin.git", package = "payjoin", default-features = false, features = ["v2", "io"] }

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase"] }

Expand Down
65 changes: 57 additions & 8 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use vss_client::headers::VssHeaderProvider;
use crate::chain::ChainSource;
use crate::config::{
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, PayjoinConfig,
DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
};
use crate::connection::ConnectionManager;
Expand All @@ -56,12 +56,13 @@ use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
read_scorer, write_node_metrics,
read_node_metrics, read_output_sweeper, read_payjoin_sessions, read_payments, read_peer_info,
read_pending_payments, read_scorer, write_node_metrics,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
self, PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
};
Expand All @@ -71,13 +72,14 @@ use crate::liquidity::{
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::payment::payjoin::manager::PayjoinManager;
use crate::peer_store::PeerStore;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
Persister, SyncAndAsyncKVStore,
KeysManager, MessageRouter, OnionMessenger, PayjoinSessionStore, PaymentStore, PeerManager,
PendingPaymentStore, Persister, SyncAndAsyncKVStore,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand Down Expand Up @@ -547,6 +549,15 @@ impl NodeBuilder {
Ok(self)
}

/// Configures the [`Node`] instance to enable payjoin payments.
///
/// The `payjoin_config` specifies the PayJoin directory and OHTTP relay URLs required
/// for payjoin V2 protocol.
pub fn set_payjoin_config(&mut self, payjoin_config: PayjoinConfig) -> &mut Self {
self.config.payjoin_config = Some(payjoin_config);
self
}

/// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any
/// historical wallet funds.
///
Expand Down Expand Up @@ -933,6 +944,14 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_async_payments_role(role).map(|_| ())
}

/// Configures the [`Node`] instance to enable payjoin payments.
///
/// The `payjoin_config` specifies the PayJoin directory and OHTTP relay URLs required
/// for payjoin V2 protocol.
pub fn set_payjoin_config(&self, payjoin_config: PayjoinConfig) {
self.inner.write().unwrap().set_payjoin_config(payjoin_config);
}

/// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any
/// historical wallet funds.
///
Expand Down Expand Up @@ -1083,12 +1102,13 @@ fn build_with_store_internal(

let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let (payment_store_res, node_metris_res, pending_payment_store_res) =
let (payment_store_res, node_metris_res, pending_payment_store_res, payjoin_session_store_res) =
runtime.block_on(async move {
tokio::join!(
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
read_payjoin_sessions(&*kv_store_ref, Arc::clone(&logger_ref))
)
});

Expand Down Expand Up @@ -1771,6 +1791,34 @@ fn build_with_store_internal(

let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());

let payjoin_session_store = match payjoin_session_store_res {
Ok(payjoin_sessions) => Arc::new(PayjoinSessionStore::new(
payjoin_sessions,
PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(),
PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read payjoin session data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let payjoin_manager = Arc::new(PayjoinManager::new(
Arc::clone(&payjoin_session_store),
Arc::clone(&logger),
Arc::clone(&config),
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&chain_source),
Arc::clone(&channel_manager),
stop_sender.subscribe(),
Arc::clone(&payment_store),
Arc::clone(&pending_payment_store),
Arc::clone(&tx_broadcaster),
));

#[cfg(cycle_tests)]
let mut _leak_checker = crate::LeakChecker(Vec::new());
#[cfg(cycle_tests)]
Expand Down Expand Up @@ -1817,6 +1865,7 @@ fn build_with_store_internal(
hrn_resolver,
#[cfg(cycle_tests)]
_leak_checker,
payjoin_manager,
})
}

Expand Down
91 changes: 91 additions & 0 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,57 @@ impl BitcoindChainSource {
}
}
}

pub(crate) async fn can_broadcast_transaction(&self, tx: &Transaction) -> Result<bool, Error> {
let timeout_fut = tokio::time::timeout(
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
self.api_client.test_mempool_accept(tx),
);

match timeout_fut.await {
Ok(res) => res.map_err(|e| {
log_error!(
self.logger,
"Failed to test mempool accept for transaction {}: {}",
tx.compute_txid(),
e
);
Error::TxBroadcastFailed
}),
Err(e) => {
log_error!(
self.logger,
"Failed to test mempool accept for transaction {} due to timeout: {}",
tx.compute_txid(),
e
);
log_trace!(
self.logger,
"Failed test mempool accept transaction bytes: {}",
log_bytes!(tx.encode())
);
Err(Error::TxBroadcastFailed)
},
}
}

pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let timeout_fut = tokio::time::timeout(
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
self.api_client.get_raw_transaction(txid),
);

match timeout_fut.await {
Ok(res) => res.map_err(|e| {
log_error!(self.logger, "Failed to get transaction {}: {}", txid, e);
Error::TxSyncFailed
}),
Err(e) => {
log_error!(self.logger, "Failed to get transaction {} due to timeout: {}", txid, e);
Err(Error::TxSyncTimeout)
},
}
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -1229,6 +1280,46 @@ impl BitcoindClient {
.collect();
Ok(evicted_txids)
}

/// Tests whether the provided transaction would be accepted by the mempool.
pub(crate) async fn test_mempool_accept(&self, tx: &Transaction) -> std::io::Result<bool> {
match self {
BitcoindClient::Rpc { rpc_client, .. } => {
Self::test_mempool_accept_inner(Arc::clone(rpc_client), tx).await
},
BitcoindClient::Rest { rpc_client, .. } => {
// We rely on the internal RPC client to make this call, as this
// operation is not supported by Bitcoin Core's REST interface.
Self::test_mempool_accept_inner(Arc::clone(rpc_client), tx).await
},
}
}

async fn test_mempool_accept_inner(
rpc_client: Arc<RpcClient>, tx: &Transaction,
) -> std::io::Result<bool> {
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
let tx_array = serde_json::json!([tx_serialized]);

let resp =
rpc_client.call_method::<serde_json::Value>("testmempoolaccept", &[tx_array]).await?;

if let Some(array) = resp.as_array() {
if let Some(first_result) = array.first() {
Ok(first_result.get("allowed").and_then(|v| v.as_bool()).unwrap_or(false))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Empty array response from testmempoolaccept",
))
}
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"testmempoolaccept did not return an array",
))
}
}
}

impl BlockSource for BitcoindClient {
Expand Down
57 changes: 57 additions & 0 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ impl ElectrumChainSource {
electrum_client.broadcast(tx).await;
}
}

pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let electrum_client: Arc<ElectrumRuntimeClient> =
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
Arc::clone(client)
} else {
debug_assert!(
false,
"We should have started the chain source before getting transactions"
);
return Err(Error::TxSyncFailed);
};

electrum_client.get_transaction(txid).await
}
}

impl Filter for ElectrumChainSource {
Expand Down Expand Up @@ -652,6 +667,48 @@ impl ElectrumRuntimeClient {

Ok(new_fee_rate_cache)
}

async fn get_transaction(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
let electrum_client = Arc::clone(&self.electrum_client);
let txid_copy = *txid;

let spawn_fut =
self.runtime.spawn_blocking(move || electrum_client.transaction_get(&txid_copy));
let timeout_fut = tokio::time::timeout(
Duration::from_secs(
self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs,
),
spawn_fut,
);

match timeout_fut.await {
Ok(res) => match res {
Ok(inner_res) => match inner_res {
Ok(tx) => Ok(Some(tx)),
Err(e) => {
// Check if it's a "not found" error
let error_str = e.to_string();
if error_str.contains("No such mempool or blockchain transaction")
|| error_str.contains("not found")
{
Ok(None)
} else {
log_error!(self.logger, "Failed to get transaction {}: {}", txid, e);
Err(Error::TxSyncFailed)
}
},
},
Err(e) => {
log_error!(self.logger, "Failed to get transaction {}: {}", txid, e);
Err(Error::TxSyncFailed)
},
},
Err(e) => {
log_error!(self.logger, "Failed to get transaction {} due to timeout: {}", txid, e);
Err(Error::TxSyncTimeout)
},
}
}
}

impl Filter for ElectrumRuntimeClient {
Expand Down
7 changes: 7 additions & 0 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ impl EsploraChainSource {
}
}
}

pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
self.esplora_client.get_tx(txid).await.map_err(|e| {
log_error!(self.logger, "Failed to get transaction {}: {}", txid, e);
Error::TxSyncFailed
})
}
}

impl Filter for EsploraChainSource {
Expand Down
34 changes: 33 additions & 1 deletion src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use bitcoin::{Script, Txid};
use bitcoin::{Script, Transaction, Txid};
use lightning::chain::{BestBlock, Filter};

use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient};
Expand Down Expand Up @@ -468,6 +468,38 @@ impl ChainSource {
}
}
}

pub(crate) fn can_broadcast_transaction(&self, tx: &Transaction) -> Result<bool, Error> {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
match &self.kind {
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source.can_broadcast_transaction(tx).await
},
ChainSourceKind::Esplora{..} => {
// Esplora doesn't support testmempoolaccept equivalent.
unreachable!("Mempool accept testing is not supported with Esplora backend. Use BitcoindRpc for this functionality.")
},
ChainSourceKind::Electrum{..} => {
// Electrum doesn't support testmempoolaccept equivalent.
unreachable!("Mempool accept testing is not supported with Electrum backend. Use BitcoindRpc for this functionality.")
},
}
})
})
}

pub(crate) fn get_transaction(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
match &self.kind {
ChainSourceKind::Bitcoind(bitcoind) => bitcoind.get_transaction(txid).await,
ChainSourceKind::Esplora(esplora) => esplora.get_transaction(txid).await,
ChainSourceKind::Electrum(electrum) => electrum.get_transaction(txid).await,
}
})
})
}
}

impl Filter for ChainSource {
Expand Down
Loading
Loading