diff --git a/Cargo.lock b/Cargo.lock index 878a09ae4..16806a41c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3671,7 +3671,6 @@ dependencies = [ "anyhow", "clap", "ctrlc", - "futures-lite", "hex", "pkarr", "rand 0.9.2", diff --git a/pkarr-republisher/Cargo.toml b/pkarr-republisher/Cargo.toml index 7c592e190..fa0a6ead1 100644 --- a/pkarr-republisher/Cargo.toml +++ b/pkarr-republisher/Cargo.toml @@ -15,7 +15,6 @@ anyhow.workspace = true pkarr = { workspace = true, features = ["default"] } tokio = { workspace = true, features = ["full"] } tracing.workspace = true -futures-lite = { version = "2.6.1" } thiserror.workspace = true # bin dependencies diff --git a/pkarr-republisher/examples/publish_and_save.rs b/pkarr-republisher/examples/publish_and_save.rs index 850e5c022..9925569b9 100644 --- a/pkarr-republisher/examples/publish_and_save.rs +++ b/pkarr-republisher/examples/publish_and_save.rs @@ -141,9 +141,9 @@ async fn publish_records(num_records: usize, thread_id: usize) -> Vec { .build(&key) .unwrap(); let elapsed_time = instant.elapsed().as_millis(); - match rclient.publish(packet, None).await { + match rclient.publish(packet).await { Ok(info) => { - tracing::info!("- t{thread_id:<2} {i:>3}/{num_records} Published {} within {elapsed_time}ms to {} nodes {} attempts", key.public_key(), info.published_nodes_count, info.attempts_needed); + tracing::info!("- t{thread_id:<2} {i:>3}/{num_records} Published {} within {elapsed_time}ms stored_at={:?} attempts={}", key.public_key(), info.stored_at, info.attempts_needed); records.push(key); } Err(e) => { diff --git a/pkarr-republisher/examples/read_and_verify.rs b/pkarr-republisher/examples/read_and_verify.rs deleted file mode 100644 index c11e3fcbb..000000000 --- a/pkarr-republisher/examples/read_and_verify.rs +++ /dev/null @@ -1,113 +0,0 @@ -//! -//! Reads `published_secrets.txt` and outputs how many nodes store this public key. -//! Run `publish_and_save` first to publish some packets to verify -//! Freshly stored once should have 15+. -//! <10 is ready for a republish. -//! 0 = Packet unavailable. -//! -//! Run with `cargo run --example read_and_verify -- --num_keys 20` -//! - -use clap::Parser; -use pkarr::{Client, Keypair}; -use pkarr_republisher::{ResilientClient, RetrySettings}; -use rand::rng; -use rand::seq::SliceRandom; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use tracing::{info, level_filters::LevelFilter}; -use tracing_subscriber::EnvFilter; - -#[derive(Parser, Debug)] -#[command(author, about = "Verify pkarr packets on the DHT.")] -struct Cli { - /// Verify x keys by checking how many nodes it was stored on. - #[arg(long, default_value_t = 20)] - num_records: usize, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let cli = Cli::parse(); - println!("read_and_verify started."); - - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into())) - .init(); - - // Set up the Ctrl+C handler - let ctrlc_pressed: Arc = Arc::new(AtomicBool::new(false)); - let r = ctrlc_pressed.clone(); - ctrlc::set_handler(move || { - r.store(true, Ordering::SeqCst); - println!("Ctrl+C detected, shutting down..."); - std::process::exit(0); - }) - .expect("Error setting Ctrl+C handler"); - - println!("Press Ctrl+C to stop..."); - - println!("Read published_secrets.txt"); - let published_keys = read_keys(); - println!("Read {} keys", published_keys.len()); - - let num_verify_keys = cli.num_records; - info!("Randomly verify: {num_verify_keys} keys"); - verify_published(&published_keys, num_verify_keys).await; - Ok(()) -} - -fn read_keys() -> Vec { - let secret_srs = std::fs::read_to_string("published_secrets.txt").expect("File not found"); - let keys = secret_srs - .lines() - .map(|line| line.to_string()) - .collect::>(); - keys.into_iter() - .map(|key| { - let secret = hex::decode(key).expect("invalid hex"); - let secret: [u8; 32] = secret.try_into().unwrap(); - Keypair::from_secret_key(&secret) - }) - .collect::>() -} - -async fn verify_published(keys: &[Keypair], count: usize) { - // Shuffle and take {count} elements to verify. - let mut keys: Vec = keys.to_owned(); - let mut rng = rng(); - keys.shuffle(&mut rng); - let keys: Vec = keys.into_iter().take(count).collect(); - - let client = Client::builder().no_relays().build().unwrap(); - let rclient = ResilientClient::new_with_client(client, RetrySettings::default()).unwrap(); - let mut success = 0; - let mut warn = 0; - let mut error = 0; - for (i, key) in keys.into_iter().enumerate() { - let nodes_count = rclient.verify_node_count(&key.public_key()).await; - if nodes_count == 0 { - tracing::error!( - "- {i}/{count} Verify {} found on {nodes_count} nodes.", - key.public_key() - ); - error += 1; - } else if nodes_count < 5 { - tracing::warn!( - "- {i}/{count} Verify {} found on {nodes_count} nodes.", - key.public_key() - ); - warn += 1; - } else { - tracing::info!( - "- {i}/{count} Verify {} found on {nodes_count} nodes.", - key.public_key() - ); - success += 1; - } - } - println!("Success: {success}, Warn: {warn}, Error: {error}"); -} diff --git a/pkarr-republisher/src/lib.rs b/pkarr-republisher/src/lib.rs index 90e28cdb7..24da6b1a9 100644 --- a/pkarr-republisher/src/lib.rs +++ b/pkarr-republisher/src/lib.rs @@ -2,10 +2,8 @@ mod multi_republisher; mod publisher; mod republisher; mod resilient_client; -mod verify; pub use multi_republisher::*; pub use publisher::*; pub use republisher::*; pub use resilient_client::*; -pub use verify::count_key_on_dht; diff --git a/pkarr-republisher/src/multi_republisher.rs b/pkarr-republisher/src/multi_republisher.rs index c596eb7ef..3d4cb3173 100644 --- a/pkarr-republisher/src/multi_republisher.rs +++ b/pkarr-republisher/src/multi_republisher.rs @@ -1,7 +1,5 @@ -use crate::{ - republisher::{RepublishError, RepublishInfo, RepublisherSettings}, - ResilientClient, ResilientClientBuilderError, -}; +use crate::republisher::{RepublishError, RepublishInfo, RepublisherSettings}; +use crate::{ResilientClient, ResilientClientBuilderError}; use pkarr::PublicKey; use std::collections::HashMap; use tokio::time::Instant; @@ -113,19 +111,14 @@ impl MultiRepublisher { ResilientClient::new_with_client(client, self.settings.retry_settings.clone())?; for key in public_keys { let start = Instant::now(); - let res = rclient - .republish( - key.clone(), - Some(self.settings.min_sufficient_node_publish_count), - ) - .await; + let res = rclient.republish(key.clone()).await; let elapsed = start.elapsed().as_millis(); match &res { Ok(info) => { tracing::info!( - "Republished {key} successfully on {} nodes within {elapsed}ms. attemps={}", - info.published_nodes_count, + "Republished {key} successfully within {elapsed}ms. stored_at={:?} attempts={}", + info.stored_at, info.attempts_needed ) } @@ -185,8 +178,6 @@ impl MultiRepublisher { #[cfg(test)] mod tests { - use std::num::NonZeroU8; - use pkarr::{dns::Name, Keypair, PublicKey}; use crate::{multi_republisher::MultiRepublisher, republisher::RepublisherSettings}; @@ -211,45 +202,23 @@ mod tests { .build() .unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_keys = publish_sample_packets(&pkarr_client, 1).await; let public_key = public_keys.first().unwrap().clone(); let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(3).unwrap()); - let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder)); - let results = publisher.run_serially(public_keys).await.unwrap(); + settings.pkarr_client(pkarr_client); + let republisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder)); + let results = republisher.run_serially(public_keys).await.unwrap(); let result = results.get(&public_key).unwrap(); if let Err(e) = result { println!("Err {e}"); } assert!(result.is_ok()); } - - #[tokio::test] - async fn single_key_republish_insufficient() { - let dht = pkarr::mainline::Testnet::builder(3) - .seeded(false) - .build() - .unwrap(); - let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); - let pkarr_client = pkarr_builder.clone().build().unwrap(); - - let public_keys = publish_sample_packets(&pkarr_client, 1).await; - let public_key = public_keys.first().unwrap().clone(); - - let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(4).unwrap()); - let publisher = MultiRepublisher::new_with_settings(settings, Some(pkarr_builder)); - let results = publisher.run_serially(public_keys).await.unwrap(); - let result = results.get(&public_key).unwrap(); - assert!(result.is_err()); - } } diff --git a/pkarr-republisher/src/publisher.rs b/pkarr-republisher/src/publisher.rs index 5474de26f..a81ee2e2d 100644 --- a/pkarr-republisher/src/publisher.rs +++ b/pkarr-republisher/src/publisher.rs @@ -2,48 +2,28 @@ //! Publishes a single pkarr packet with retries in case it fails. //! -use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket}; +use pkarr::SignedPacket; use std::{num::NonZeroU8, time::Duration}; -use crate::verify::count_key_on_dht; - #[derive(thiserror::Error, Debug, Clone)] pub enum PublishError { - #[error("Packet has been republished but to an insufficient number of {published_nodes_count} nodes.")] - InsuffientlyPublished { published_nodes_count: usize }, #[error(transparent)] PublishFailed(#[from] pkarr::errors::PublishError), } -impl PublishError { - pub fn is_insufficiently_published(&self) -> bool { - if let PublishError::InsuffientlyPublished { .. } = self { - return true; - } - false - } - - pub fn is_publish_failed(&self) -> bool { - if let PublishError::PublishFailed { .. } = self { - return true; - } - false - } -} - #[derive(Debug, Clone)] pub struct PublishInfo { - /// How many nodes the key got published on. - pub published_nodes_count: usize, /// Number of publishing attempts needed to successfully publish. pub attempts_needed: usize, + /// Number of DHT nodes that acknowledged storing the packet, or None for relay-only publishes. + pub stored_at: Option, } impl PublishInfo { - pub fn new(published_nodes_count: usize, attempts_needed: usize) -> Self { + pub fn new(attempts_needed: usize, stored_at: Option) -> Self { Self { - published_nodes_count, attempts_needed, + stored_at, } } } @@ -92,11 +72,10 @@ impl Default for RetrySettings { } } -/// Settings for creating a republisher +/// Settings for creating a publisher #[derive(Debug, Clone)] pub struct PublisherSettings { pub(crate) client: Option, - pub(crate) min_sufficient_node_publish_count: NonZeroU8, pub retry_settings: RetrySettings, } @@ -104,7 +83,6 @@ impl Default for PublisherSettings { fn default() -> Self { Self { client: None, - min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"), retry_settings: RetrySettings::default(), } } @@ -121,13 +99,6 @@ impl PublisherSettings { self } - /// Set the minimum sufficient number of nodes a key needs to be stored in - /// to be considered a success - pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self { - self.min_sufficient_node_publish_count = count; - self - } - /// Set settings in relation to retries. pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self { self.retry_settings = settings; @@ -135,15 +106,11 @@ impl PublisherSettings { } } -/// Tries to publish a single key and verifies the keys has been published to -/// a sufficient number of nodes. -/// Retries in case of errors with an exponential backoff. +/// Tries to publish a single key with retries and exponential backoff. #[derive(Debug, Clone)] pub struct Publisher { pub packet: SignedPacket, client: pkarr::Client, - dht: AsyncDht, - min_sufficient_node_publish_count: NonZeroU8, retry_settings: RetrySettings, } @@ -162,21 +129,13 @@ impl Publisher { Some(c) => c.clone(), None => pkarr::Client::builder().build()?, }; - let dht = client.dht().expect("infallible").as_async(); Ok(Self { packet, client, - dht, - min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count, retry_settings: settings.retry_settings, }) } - /// Get the public key of the signer of the packet - fn get_public_key(&self) -> PublicKey { - self.packet.public_key() - } - /// Exponential backoff delay starting with `INITIAL_DELAY_MS` and maxing out at `MAX_DELAY_MS` fn get_retry_delay(&self, retry_count: u8) -> Duration { let initial_ms = self.retry_settings.initial_retry_delay.as_millis() as u64; @@ -186,23 +145,10 @@ impl Publisher { delay.min(self.retry_settings.max_retry_delay) } - /// Republish a single public key. + /// Publish a single packet once. pub async fn publish_once(&self) -> Result { - if let Err(e) = self.client.publish(&self.packet, None).await { - return Err(e.into()); - } - - // TODO: This counting could really be done with the put response in the mainline library already. It's not exposed though. - // This would really speed up the publishing and reduce the load on the DHT. - // -- Sev April 2025 -- - let published_nodes_count = count_key_on_dht(&self.get_public_key(), &self.dht).await; - if published_nodes_count < self.min_sufficient_node_publish_count.get().into() { - return Err(PublishError::InsuffientlyPublished { - published_nodes_count, - }); - } - - Ok(PublishInfo::new(published_nodes_count, 1)) + let result = self.client.publish_with_info(&self.packet, None).await?; + Ok(PublishInfo::new(1, result.stored_at)) } // Publishes the key with an exponential backoff @@ -219,7 +165,7 @@ impl Publisher { Err(e) => { tracing::debug!( "{human_retry_count}/{max_retries} Failed to publish {}: {e}", - self.get_public_key() + self.packet.public_key() ); last_error = Some(e); } @@ -228,7 +174,7 @@ impl Publisher { let delay = self.get_retry_delay(retry_count); tracing::debug!( "{} {human_retry_count}/{max_retries} Sleep for {delay:?} before trying again.", - self.get_public_key() + self.packet.public_key() ); tokio::time::sleep(delay).await; } @@ -241,85 +187,55 @@ impl Publisher { mod tests { use std::{num::NonZeroU8, time::Duration}; - use pkarr::{dns::Name, Keypair, PublicKey, SignedPacket}; + use pkarr::{dns::Name, Keypair, SignedPacket}; - use crate::publisher::{PublishError, Publisher, PublisherSettings}; + use crate::publisher::{Publisher, PublisherSettings}; - fn sample_packet() -> (PublicKey, SignedPacket) { + fn sample_packet() -> SignedPacket { let key = Keypair::random(); - let packet = pkarr::SignedPacketBuilder::default() + pkarr::SignedPacketBuilder::default() .cname(Name::new("test").unwrap(), Name::new("test2").unwrap(), 600) .build(&key) - .unwrap(); - (key.public_key(), packet) + .unwrap() } #[tokio::test] - async fn single_key_republish_success() { + async fn single_key_publish_success() { let dht = pkarr::mainline::Testnet::builder(3) .seeded(false) .build() .unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); - let (_, packet) = sample_packet(); + let packet = sample_packet(); - let required_nodes = 3; let mut settings = PublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings.pkarr_client(pkarr_client); let publisher = Publisher::new_with_settings(packet, settings).unwrap(); let res = publisher.publish_once().await; assert!(res.is_ok()); - let success = res.unwrap(); - assert_eq!(success.published_nodes_count, 3); - } - - #[tokio::test] - async fn single_key_republish_insufficient() { - let dht = pkarr::mainline::Testnet::builder(3) - .seeded(false) - .build() - .unwrap(); - let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); - let pkarr_client = pkarr_builder.clone().build().unwrap(); - let (_, packet) = sample_packet(); - - let required_nodes = 4; - let mut settings = PublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); - let publisher = Publisher::new_with_settings(packet, settings).unwrap(); - let res = publisher.publish_once().await; - - assert!(res.is_err()); - let err = res.unwrap_err(); - assert!(err.is_insufficiently_published()); - if let PublishError::InsuffientlyPublished { - published_nodes_count, - } = err - { - assert_eq!(published_nodes_count, 3); - }; + let info = res.unwrap(); + assert!(info.stored_at.is_some()); + assert!(info.stored_at.unwrap() > 0); } #[tokio::test] async fn retry_delay() { let dht = pkarr::mainline::Testnet::builder(3).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); - let (_, packet) = sample_packet(); + let packet = sample_packet(); - let required_nodes = 1; let mut settings = PublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings.pkarr_client(pkarr_client); settings .retry_settings .max_retries(NonZeroU8::new(10).unwrap()) diff --git a/pkarr-republisher/src/republisher.rs b/pkarr-republisher/src/republisher.rs index 95eae7c45..843e0892b 100644 --- a/pkarr-republisher/src/republisher.rs +++ b/pkarr-republisher/src/republisher.rs @@ -3,7 +3,7 @@ //! use pkarr::PublicKey; use pkarr::SignedPacket; -use std::{num::NonZeroU8, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use crate::{ publisher::{PublishError, Publisher, PublisherSettings}, @@ -36,8 +36,8 @@ impl RepublishError { #[derive(Debug, Clone)] pub struct RepublishInfo { - /// How many nodes the key got published on. - pub published_nodes_count: usize, + /// Number of DHT nodes that acknowledged storing the packet, or None for relay-only publishes. + pub stored_at: Option, /// Number of publishing attempts needed to successfully republish. pub attempts_needed: usize, /// Whether the `republish_condition` was negative. @@ -46,12 +46,12 @@ pub struct RepublishInfo { impl RepublishInfo { pub fn new( - published_nodes_count: usize, + stored_at: Option, attempts_needed: usize, should_republish_condition_failed: bool, ) -> Self { Self { - published_nodes_count, + stored_at, attempts_needed, condition_failed: should_republish_condition_failed, } @@ -64,7 +64,6 @@ pub type RepublishCondition = dyn Fn(&SignedPacket) -> bool + Send + Sync; #[derive(Clone)] pub struct RepublisherSettings { pub(crate) client: Option, - pub(crate) min_sufficient_node_publish_count: NonZeroU8, pub(crate) retry_settings: RetrySettings, pub(crate) republish_condition: Option>, } @@ -73,10 +72,6 @@ impl std::fmt::Debug for RepublisherSettings { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RepublisherSettings") .field("client", &self.client) - .field( - "min_sufficient_node_publish_count", - &self.min_sufficient_node_publish_count, - ) .field("retry_settings", &self.retry_settings) .finish_non_exhaustive() } @@ -93,13 +88,6 @@ impl RepublisherSettings { self } - /// Set the minimum sufficient number of nodes a key needs to be stored in - /// to be considered a success - pub fn min_sufficient_node_publish_count(&mut self, count: NonZeroU8) -> &mut Self { - self.min_sufficient_node_publish_count = count; - self - } - /// Set settings in relation to retries. pub fn retry_settings(&mut self, settings: RetrySettings) -> &mut Self { self.retry_settings = settings; @@ -120,7 +108,6 @@ impl Default for RepublisherSettings { fn default() -> Self { Self { client: None, - min_sufficient_node_publish_count: NonZeroU8::new(10).expect("Should always be > 0"), retry_settings: RetrySettings::default(), republish_condition: None, } @@ -132,7 +119,6 @@ impl Default for RepublisherSettings { pub struct Republisher { pub public_key: PublicKey, client: pkarr::Client, - min_sufficient_node_publish_count: NonZeroU8, retry_settings: RetrySettings, republish_condition: Arc bool + Send + Sync>, } @@ -142,10 +128,6 @@ impl std::fmt::Debug for Republisher { f.debug_struct("Republisher") .field("public_key", &self.public_key) .field("client", &self.client) - .field( - "min_sufficient_node_publish_count", - &self.min_sufficient_node_publish_count, - ) .field("retry_settings", &self.retry_settings) .finish_non_exhaustive() } @@ -169,7 +151,6 @@ impl Republisher { Ok(Republisher { public_key, client, - min_sufficient_node_publish_count: settings.min_sufficient_node_publish_count, retry_settings: settings.retry_settings, republish_condition: settings .republish_condition @@ -196,17 +177,15 @@ impl Republisher { // Check if the packet should be republished if !(self.republish_condition)(&packet) { - return Ok(RepublishInfo::new(0, 1, true)); + return Ok(RepublishInfo::new(None, 1, true)); } let mut settings = PublisherSettings::default(); - settings - .pkarr_client(self.client.clone()) - .min_sufficient_node_publish_count(self.min_sufficient_node_publish_count); + settings.pkarr_client(self.client.clone()); let publisher = Publisher::new_with_settings(packet, settings) .expect("infallible because pkarr client provided"); match publisher.publish_once().await { - Ok(info) => Ok(RepublishInfo::new(info.published_nodes_count, 1, false)), + Ok(info) => Ok(RepublishInfo::new(info.stored_at, 1, false)), Err(e) => Err(e.into()), } } @@ -277,33 +256,31 @@ mod tests { let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = publish_sample_packets(&pkarr_client).await; - let required_nodes = 1; let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); - let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); - let res = publisher.republish_once().await; + settings.pkarr_client(pkarr_client); + let republisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = republisher.republish_once().await; assert!(res.is_ok()); - let success = res.unwrap(); - assert_eq!(success.published_nodes_count, 1); + let info = res.unwrap(); + assert!(info.stored_at.is_some()); + assert!(info.stored_at.unwrap() > 0); } #[tokio::test] async fn single_key_republish_missing() { let dht = pkarr::mainline::Testnet::builder(1).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = Keypair::random().public_key(); - let required_nodes = 1; let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); - let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); - let res = publisher.republish_once().await; + settings.pkarr_client(pkarr_client); + let republisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = republisher.republish_once().await; assert!(res.is_err()); let err = res.unwrap_err(); @@ -314,29 +291,29 @@ mod tests { async fn retry_delay() { let dht = pkarr::mainline::Testnet::builder(1).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = Keypair::random().public_key(); - let required_nodes = 1; let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings.pkarr_client(pkarr_client); settings .retry_settings .max_retries(NonZeroU8::new(10).unwrap()) .initial_retry_delay(Duration::from_millis(100)) .max_retry_delay(Duration::from_secs(10)); - let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let republisher = Republisher::new_with_settings(public_key, settings).unwrap(); - let first_delay = publisher.get_retry_delay(0); + let first_delay = republisher.get_retry_delay(0); assert_eq!(first_delay.as_millis(), 100); - let second_delay = publisher.get_retry_delay(1); + let second_delay = republisher.get_retry_delay(1); assert_eq!(second_delay.as_millis(), 200); - let third_delay = publisher.get_retry_delay(2); + let third_delay = republisher.get_retry_delay(2); assert_eq!(third_delay.as_millis(), 400); - let ninth_delay = publisher.get_retry_delay(9); + let ninth_delay = republisher.get_retry_delay(9); assert_eq!(ninth_delay.as_millis(), 10_000); } @@ -344,21 +321,21 @@ mod tests { async fn republish_retry_missing() { let dht = pkarr::mainline::Testnet::builder(1).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = Keypair::random().public_key(); - let required_nodes = 1; let mut settings = RepublisherSettings::default(); - settings - .pkarr_client(pkarr_client) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()); + settings.pkarr_client(pkarr_client); settings .retry_settings .max_retries(NonZeroU8::new(3).unwrap()) .initial_retry_delay(Duration::from_millis(100)); - let publisher = Republisher::new_with_settings(public_key, settings).unwrap(); - let res = publisher.republish().await; + let republisher = Republisher::new_with_settings(public_key, settings).unwrap(); + let res = republisher.republish().await; assert!(res.is_err()); assert!(res.unwrap_err().is_missing()); @@ -368,23 +345,23 @@ mod tests { async fn republish_with_condition_fail() { let dht = pkarr::mainline::Testnet::builder(1).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = publish_sample_packets(&pkarr_client).await; - let required_nodes = 1; let mut settings = RepublisherSettings::default(); settings .pkarr_client(pkarr_client.clone()) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()) - // Only republish if the packet has a TTL greater than 300 .republish_condition(|_| false); - let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); - let res = publisher.republish_once().await; + let republisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); + let res = republisher.republish_once().await; assert!(res.is_ok()); let info = res.unwrap(); - assert_eq!(info.published_nodes_count, 0); + assert!(info.stored_at.is_none()); assert!(info.condition_failed); } @@ -392,23 +369,24 @@ mod tests { async fn republish_with_condition_success() { let dht = pkarr::mainline::Testnet::builder(1).build().unwrap(); let mut pkarr_builder = pkarr::ClientBuilder::default(); - pkarr_builder.bootstrap(&dht.bootstrap).no_relays(); + pkarr_builder + .no_default_network() + .bootstrap(&dht.bootstrap) + .no_relays(); let pkarr_client = pkarr_builder.clone().build().unwrap(); let public_key = publish_sample_packets(&pkarr_client).await; - let required_nodes = 1; let mut settings = RepublisherSettings::default(); settings .pkarr_client(pkarr_client.clone()) - .min_sufficient_node_publish_count(NonZeroU8::new(required_nodes).unwrap()) - // Only republish if the packet has a TTL greater than 300 .republish_condition(|_| true); - let publisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); - let res = publisher.republish_once().await; + let republisher = Republisher::new_with_settings(public_key.clone(), settings).unwrap(); + let res = republisher.republish_once().await; assert!(res.is_ok()); let info = res.unwrap(); - assert_eq!(info.published_nodes_count, 1); + assert!(info.stored_at.is_some()); + assert!(info.stored_at.unwrap() > 0); assert!(!info.condition_failed); } } diff --git a/pkarr-republisher/src/resilient_client.rs b/pkarr-republisher/src/resilient_client.rs index c48eda224..4dcd5b317 100644 --- a/pkarr-republisher/src/resilient_client.rs +++ b/pkarr-republisher/src/resilient_client.rs @@ -1,30 +1,22 @@ -use std::num::NonZeroU8; - -use pkarr::{mainline::async_dht::AsyncDht, PublicKey, SignedPacket}; +use pkarr::{PublicKey, SignedPacket}; use crate::{ - count_key_on_dht, PublishError, PublishInfo, Publisher, PublisherSettings, RepublishError, - RepublishInfo, Republisher, RepublisherSettings, RetrySettings, + PublishError, PublishInfo, Publisher, PublisherSettings, RepublishError, RepublishInfo, + Republisher, RepublisherSettings, RetrySettings, }; #[derive(Debug, thiserror::Error)] pub enum ResilientClientBuilderError { - #[error("pkarr client was built without DHT and is only using relays. This is not supported.")] - DhtNotEnabled, #[error(transparent)] BuildError(#[from] pkarr::errors::BuildError), } /// Simple pkarr client that focuses on resilience -/// and verification compared to the regular client that -/// might experience inreliability due to the underlying UDP connection. -/// -/// This client requires a pkarr client that was built with the `dht` feature. -/// Relays only are not supported. +/// compared to the regular client that might experience +/// unreliability due to the underlying UDP connection. #[derive(Debug, Clone)] pub struct ResilientClient { client: pkarr::Client, - dht: AsyncDht, retry_settings: RetrySettings, } @@ -38,54 +30,35 @@ impl ResilientClient { client: pkarr::Client, retry_settings: RetrySettings, ) -> Result { - let dht = client.dht(); - if dht.is_none() { - return Err(ResilientClientBuilderError::DhtNotEnabled); - } - let dht = dht.unwrap().as_async(); Ok(Self { client, - dht, retry_settings, }) } - /// Publishes a pkarr packet with retries. Verifies it's been stored correctly. + /// Publishes a pkarr packet with retries. pub async fn publish( &self, packet: SignedPacket, - min_sufficient_node_publish_count: Option, ) -> Result { let mut settings = PublisherSettings::default(); settings.pkarr_client(self.client.clone()); settings.retry_settings(self.retry_settings.clone()); - if let Some(count) = min_sufficient_node_publish_count { - settings.min_sufficient_node_publish_count = count; - }; let publisher = Publisher::new_with_settings(packet, settings) .expect("infallible because pkarr client provided."); publisher.publish().await } - /// Republishes a pkarr packet with retries. Verifies it's been stored correctly. + /// Republishes a pkarr packet with retries. pub async fn republish( &self, public_key: PublicKey, - min_sufficient_node_publish_count: Option, ) -> Result { let mut settings = RepublisherSettings::default(); settings.pkarr_client(self.client.clone()); - if let Some(count) = min_sufficient_node_publish_count { - settings.min_sufficient_node_publish_count = count; - }; settings.retry_settings(self.retry_settings.clone()); let publisher = Republisher::new_with_settings(public_key, settings) .expect("infallible because pkarr client provided."); publisher.republish().await } - - /// Counts the number of nodes the public key has been stored on. - pub async fn verify_node_count(&self, public_key: &PublicKey) -> usize { - count_key_on_dht(public_key, &self.dht).await - } } diff --git a/pkarr-republisher/src/verify.rs b/pkarr-republisher/src/verify.rs deleted file mode 100644 index a37f87812..000000000 --- a/pkarr-republisher/src/verify.rs +++ /dev/null @@ -1,12 +0,0 @@ -use futures_lite::StreamExt; -use pkarr::{mainline::async_dht::AsyncDht, PublicKey}; - -/// Verifies the number of nodes that store the public key. -pub async fn count_key_on_dht(public_key: &PublicKey, dht: &AsyncDht) -> usize { - let mut response_count = 0; - let mut stream = dht.get_mutable(public_key.as_bytes(), None, None); - while (stream.next().await).is_some() { - response_count += 1; - } - response_count -} diff --git a/pubky-homeserver/src/republishers/user_keys_republisher.rs b/pubky-homeserver/src/republishers/user_keys_republisher.rs index b3fa3474d..d2980f8d8 100644 --- a/pubky-homeserver/src/republishers/user_keys_republisher.rs +++ b/pubky-homeserver/src/republishers/user_keys_republisher.rs @@ -60,8 +60,7 @@ impl UserKeysRepublisher { ); } - let mut pkarr_builder = context.pkarr_builder.clone(); - pkarr_builder.no_relays(); // Disable relays to avoid their rate limiting. + let pkarr_builder = context.pkarr_builder.clone(); let handle = tokio::spawn(async move { tokio::time::sleep(initial_delay).await; Self::run_loop(&db, republish_interval, pkarr_builder).await