diff --git a/Cargo.toml b/Cargo.toml index 3b3562c..841e014 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ ciborium = "0.2.2" clap = { version = "4.5.38", features = ["derive"] } caretta-sync-core.path = "core" futures = { version = "0.3.31", features = ["executor"] } -libp2p = { version = "0.55.0", features = ["macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux" ] } serde = { version = "1.0.219", features = ["derive"] } thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 9dddd29..5375f6b 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -15,7 +15,6 @@ ciborium.workspace = true clap.workspace = true dirs = "6.0.0" caretta-sync-core = { workspace = true, features = ["cli"] } -libp2p.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/core/Cargo.toml b/core/Cargo.toml index b3ad9da..ebdfdf0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,12 +19,12 @@ ciborium.workspace = true clap = {workspace = true, optional = true} dirs = "6.0.0" futures.workspace = true -libp2p.workspace = true -libp2p-core = { version = "0.43.0", features = ["serde"] } -libp2p-identity = { version = "0.2.11", features = ["ed25519", "peerid", "rand", "serde"] } +iroh = { version = "0.91.2", features = ["discovery-local-network", "discovery-pkarr-dht"] } prost = "0.14.1" prost-types = "0.14.1" +rusqlite = { version = "0.37.0", features = ["bundled"] } serde.workspace = true +sysinfo = "0.37.0" tempfile = { version = "3.20.0", optional = true } thiserror.workspace = true tokio.workspace = true @@ -35,9 +35,8 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } uuid.workspace = true url.workspace = true -sysinfo = "0.37.0" whoami = "1.6.1" -rusqlite = { version = "0.37.0", features = ["bundled"] } +rand = "0.8.5" [target.'cfg(target_os="android")'.dependencies] jni = "0.21.1" diff --git a/core/src/config/iroh.rs b/core/src/config/iroh.rs new file mode 100644 index 0000000..1bd3b84 --- /dev/null +++ b/core/src/config/iroh.rs @@ -0,0 +1,134 @@ +use std::{net::{IpAddr, Ipv4Addr}, ops, path::{Path, PathBuf}}; + +use base64::{prelude::BASE64_STANDARD, Engine}; +#[cfg(feature="cli")] +use clap::Args; +use futures::StreamExt; +use iroh::{Endpoint, SecretKey}; +use serde::{Deserialize, Serialize}; +use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}}; +use tracing_subscriber::EnvFilter; + + +use crate::{ + config::PartialConfig, + error::Error, p2p, utils::{emptiable::Emptiable, mergeable::Mergeable} +}; + +#[derive(Clone, Debug)] +pub struct IrohConfig { + pub enable: bool, + pub secret_key: SecretKey, + pub use_n0_discovery_service: bool, +} + +impl IrohConfig { + async fn into_endpoint(config: Self) -> Result, crate::error::Error> { + if config.enable { + let mut endpoint = Endpoint::builder() + .secret_key(config.secret_key) + .discovery_dht() + .discovery_local_network(); + if config.use_n0_discovery_service { + endpoint = endpoint.discovery_n0(); + } + Ok(Some(endpoint.bind().await?)) + } else { + Ok(None) + } + } +} + +impl TryFrom for IrohConfig { + type Error = crate::error::Error; + fn try_from(raw: PartialIrohConfig) -> Result { + Ok(IrohConfig { + enable: raw.enable.ok_or(Error::MissingConfig("iroh.enable"))?, + secret_key: raw.secret_key.ok_or(Error::MissingConfig("iroh.secret_key"))?, + use_n0_discovery_service: raw.use_n0_discovery_service.ok_or(Error::MissingConfig("iroh.use_n0_discovery_service"))? + }) + } +} + + + +#[cfg_attr(feature="cli",derive(Args))] +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct PartialIrohConfig { + #[cfg_attr(feature="cli",arg(long="p2p_enable"))] + pub enable: Option, + #[cfg_attr(feature="cli",arg(long))] + pub secret_key: Option, + #[cfg_attr(feature="cli",arg(long))] + pub use_n0_discovery_service: Option, +} + +impl PartialIrohConfig { + pub fn with_new_secret_key(mut self) -> Self { + let mut rng = rand::rngs::OsRng; + self.secret_key = Some(SecretKey::generate(&mut rng)); + self + } +} + +impl From for PartialIrohConfig { + fn from(config: IrohConfig) -> Self { + Self { + enable: Some(config.enable), + secret_key: Some(config.secret_key), + use_n0_discovery_service: Some(config.use_n0_discovery_service) + } + } +} + +impl Default for PartialIrohConfig { + fn default() -> Self { + Self { + enable: Some(true), + secret_key: None, + use_n0_discovery_service: Some(true) + } + } +} + +impl Emptiable for PartialIrohConfig { + fn empty() -> Self { + Self{ + enable: None, + secret_key: None, + use_n0_discovery_service: None + } + } + + fn is_empty(&self) -> bool { + self.enable.is_none() && self.secret_key.is_none() && self.use_n0_discovery_service.is_none() + } +} + +impl Mergeable for PartialIrohConfig { + fn merge(&mut self, mut other: Self) { + if let Some(x) = other.enable.take() { + let _ = self.enable.insert(x); + }; + if let Some(x) = other.secret_key.take() { + let _ = self.secret_key.insert(x); + }; + if let Some(x) = other.use_n0_discovery_service.take() { + let _ = self.use_n0_discovery_service.insert(x); + }; + } +} +impl Mergeable for Option { + fn merge(&mut self, mut other: Self) { + match other.take() { + Some(x) => { + if let Some(y) = self.as_mut() { + y.merge(x); + } else { + let _ = self.insert(x); + } + }, + None => {} + }; + } +} diff --git a/core/src/config/mod.rs b/core/src/config/mod.rs index b7074dc..7b1f94e 100644 --- a/core/src/config/mod.rs +++ b/core/src/config/mod.rs @@ -1,16 +1,16 @@ pub mod error; mod storage; -mod p2p; +mod iroh; mod rpc; -use std::{path::Path, default::Default}; +use std::{default::Default, fs::File, io::{Read, Write}, path::Path}; use crate::{utils::{emptiable::Emptiable, mergeable::Mergeable}}; pub use error::ConfigError; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{io::{AsyncReadExt, AsyncWriteExt}}; pub use storage::{StorageConfig, PartialStorageConfig}; -pub use p2p::{P2pConfig, PartialP2pConfig}; +pub use iroh::{IrohConfig, PartialIrohConfig}; pub use rpc::*; #[cfg(feature="cli")] @@ -18,7 +18,7 @@ use clap::Args; #[derive(Clone, Debug)] pub struct Config { - pub p2p: P2pConfig, + pub iroh: IrohConfig, pub storage: StorageConfig, pub rpc: RpcConfig, } @@ -29,9 +29,9 @@ impl AsRef for Config { } } -impl AsRef for Config { - fn as_ref(&self) -> &P2pConfig { - &self.p2p +impl AsRef for Config { + fn as_ref(&self) -> &IrohConfig { + &self.iroh } } @@ -46,17 +46,17 @@ impl TryFrom for Config { fn try_from(value: PartialConfig) -> Result { Ok(Self{ rpc: value.rpc.ok_or(crate::error::Error::MissingConfig("rpc"))?.try_into()?, - p2p: value.p2p.ok_or(crate::error::Error::MissingConfig("p2p"))?.try_into()?, + iroh: value.iroh.ok_or(crate::error::Error::MissingConfig("p2p"))?.try_into()?, storage: value.storage.ok_or(crate::error::Error::MissingConfig("storage"))?.try_into()? }) } } #[cfg_attr(feature="cli", derive(Args))] -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct PartialConfig { #[cfg_attr(feature="cli", command(flatten))] - pub p2p: Option, + pub iroh: Option, #[cfg_attr(feature="cli", command(flatten))] pub storage: Option, #[cfg_attr(feature="cli", command(flatten))] @@ -66,7 +66,7 @@ pub struct PartialConfig { impl PartialConfig { pub fn new() -> Self { Self { - p2p : Some(PartialP2pConfig::empty().with_new_private_key()), + iroh : Some(PartialIrohConfig::empty().with_new_secret_key()), storage: Some(PartialStorageConfig::empty()), rpc: Some(PartialRpcConfig::empty()), } @@ -77,16 +77,16 @@ impl PartialConfig { pub fn into_toml(&self) -> Result { toml::to_string(self) } - pub async fn read_or_create(path: T) -> Result + pub fn read_or_create(path: T) -> Result where T: AsRef { if !path.as_ref().exists() { - Self::new().write_to(&path).await?; + Self::new().write_to(&path)?; } - Self::read_from(&path).await + Self::read_from(&path) } - pub async fn read_from(path:T) -> Result + pub fn read_from(path:T) -> Result where T: AsRef { @@ -94,15 +94,15 @@ impl PartialConfig { if let Some(x) = path.as_ref().parent() { std::fs::create_dir_all(x)?; }; - let _ = File::create(&path).await?; + let _ = File::create(&path)?; } - let mut file = File::open(path.as_ref()).await?; + let mut file = File::open(path.as_ref())?; let mut content = String::new(); - file.read_to_string(&mut content).await?; + file.read_to_string(&mut content)?; let config: Self = toml::from_str(&content)?; Ok(config) } - pub async fn write_to(&self, path:T) -> Result<(), ConfigError> + pub fn write_to(&self, path:T) -> Result<(), ConfigError> where T: AsRef { @@ -110,15 +110,15 @@ impl PartialConfig { if let Some(x) = path.as_ref().parent() { std::fs::create_dir_all(x)?; }; - let _ = File::create(&path).await?; + let _ = File::create(&path)?; } - let mut file = File::create(&path).await?; - file.write_all(toml::to_string(self)?.as_bytes()).await?; + let mut file = File::create(&path)?; + file.write_all(toml::to_string(self)?.as_bytes())?; Ok(()) } pub fn default(app_name: &'static str) -> Self { Self { - p2p: Some(PartialP2pConfig::default()), + iroh: Some(PartialIrohConfig::default()), rpc: Some(PartialRpcConfig::default(app_name)), storage: Some(PartialStorageConfig::default(app_name)), } @@ -128,7 +128,7 @@ impl PartialConfig { impl From for PartialConfig { fn from(value: Config) -> Self { Self { - p2p: Some(value.p2p.into()), + iroh: Some(value.iroh.into()), storage: Some(value.storage.into()), rpc: Some(value.rpc.into()) } @@ -138,20 +138,20 @@ impl From for PartialConfig { impl Emptiable for PartialConfig { fn empty() -> Self { Self { - p2p: None, + iroh: None, storage: None, rpc: None, } } fn is_empty(&self) -> bool { - self.p2p.is_empty() && self.rpc.is_empty() && self.storage.is_empty() + self.iroh.is_empty() && self.rpc.is_empty() && self.storage.is_empty() } } impl Mergeable for PartialConfig { fn merge(&mut self, other: Self) { - self.p2p.merge(other.p2p); + self.iroh.merge(other.iroh); self.rpc.merge(other.rpc); self.storage.merge(other.storage); } diff --git a/core/src/config/p2p.rs b/core/src/config/p2p.rs deleted file mode 100644 index 970c144..0000000 --- a/core/src/config/p2p.rs +++ /dev/null @@ -1,181 +0,0 @@ -use std::{net::{IpAddr, Ipv4Addr}, ops, path::{Path, PathBuf}}; - -use base64::{prelude::BASE64_STANDARD, Engine}; -#[cfg(feature="cli")] -use clap::Args; -use futures::StreamExt; -use libp2p::{identity::{self, DecodingError, Keypair}, noise, ping, swarm::SwarmEvent, tcp, yamux, Swarm}; -use serde::{Deserialize, Serialize}; -use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}}; -use tracing_subscriber::EnvFilter; - - -use crate::{ - config::PartialConfig, - error::Error, p2p, utils::{emptiable::Emptiable, mergeable::Mergeable} -}; - -static DEFAULT_P2P_LISTEN_IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))]; -static DEFAULT_P2P_PORT: u16 = 0; - -fn keypair_to_base64(keypair: &Keypair) -> String { - let vec = match keypair.to_protobuf_encoding() { - Ok(x) => x, - Err(_) => unreachable!(), - }; - BASE64_STANDARD.encode(vec) -} - -fn base64_to_keypair(base64: &str) -> Result { - let vec = BASE64_STANDARD.decode(base64)?; - Ok(Keypair::from_protobuf_encoding(&vec)?) -} - -#[derive(Clone, Debug)] -pub struct P2pConfig { - pub private_key: Keypair, - pub listen_ips: Vec, - pub port: u16, -} - -impl P2pConfig { - async fn try_into_swarm (self) -> Result, Error> { - let mut swarm = libp2p::SwarmBuilder::with_existing_identity(self.private_key) - .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - )? - .with_behaviour(|keypair| p2p::Behaviour::try_from(keypair).unwrap())? - .build(); - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - Ok(swarm) - } - pub async fn launch_swarm(self) -> Result<(), Error>{ - let mut swarm = self.try_into_swarm().await?; - loop{ - let swarm_event = swarm.select_next_some().await; - tokio::spawn(async move{ - match swarm_event { - SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"), - SwarmEvent::Behaviour(event) => { - println!("{event:?}"); - event.run().await; - }, - _ => {} - } - }); - } - } -} - -impl TryFrom for P2pConfig { - type Error = crate::error::Error; - fn try_from(raw: PartialP2pConfig) -> Result { - Ok(P2pConfig { - private_key: base64_to_keypair(&raw.private_key.ok_or(Error::MissingConfig("secret"))?)?, - listen_ips: raw.listen_ips.ok_or(Error::MissingConfig("listen_ips"))?, - port: raw.port.ok_or(Error::MissingConfig("port"))? - }) - } -} - -#[cfg_attr(feature="cli",derive(Args))] -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct PartialP2pConfig { - #[cfg_attr(feature="cli",arg(long))] - pub private_key: Option, - #[cfg_attr(feature="cli",arg(long))] - pub listen_ips: Option>, - #[cfg_attr(feature="cli",arg(long))] - pub port: Option, -} -impl PartialP2pConfig { - pub fn with_new_private_key(mut self) -> Self { - self.private_key = Some(keypair_to_base64(&Keypair::generate_ed25519())); - self - } - pub fn init_private_key(&mut self) { - let _ = self.private_key.insert(keypair_to_base64(&Keypair::generate_ed25519())); - } -} - -impl From for PartialP2pConfig { - fn from(config: P2pConfig) -> Self { - Self { - private_key: Some(keypair_to_base64(&config.private_key)), - listen_ips: Some(config.listen_ips), - port: Some(config.port) - } - } -} - -impl Default for PartialP2pConfig { - fn default() -> Self { - Self { - private_key: None, - listen_ips: Some(Vec::from(DEFAULT_P2P_LISTEN_IPS)), - port: Some(DEFAULT_P2P_PORT), - } - } -} - -impl Emptiable for PartialP2pConfig { - fn empty() -> Self { - Self{ - private_key: None, - listen_ips: None, - port: None - } - } - - fn is_empty(&self) -> bool { - self.private_key.is_none() && self.listen_ips.is_none() && self.port.is_none() - } -} - -impl Mergeable for PartialP2pConfig { - fn merge(&mut self, mut other: Self) { - if let Some(x) = other.private_key.take() { - let _ = self.private_key.insert(x); - }; - if let Some(x) = other.listen_ips.take() { - let _ = self.listen_ips.insert(x); - }; - if let Some(x) = other.port.take() { - let _ = self.port.insert(x); - }; - } -} -impl Mergeable for Option { - fn merge(&mut self, mut other: Self) { - match other.take() { - Some(x) => { - if let Some(y) = self.as_mut() { - y.merge(x); - } else { - let _ = self.insert(x); - } - }, - None => {} - }; - } -} - - -#[cfg(test)] -mod tests { - use libp2p::identity; - use super::*; - - - #[tokio::test] - async fn parse_keypair() { - let keypair = identity::Keypair::generate_ed25519(); - let keypair2 = base64_to_keypair(&keypair_to_base64(&keypair)).unwrap(); - - assert_eq!(keypair.public(), keypair2.public()); - } - -} diff --git a/core/src/config/rpc.rs b/core/src/config/rpc.rs index 6cc3774..b09a728 100644 --- a/core/src/config/rpc.rs +++ b/core/src/config/rpc.rs @@ -3,7 +3,6 @@ use std::{net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::PathBuf, str:: use clap::Args; use url::Url; use crate::{config::PartialConfig, utils::{emptiable::Emptiable, mergeable::Mergeable}}; -use libp2p::mdns::Config; use serde::{Deserialize, Serialize}; use crate::config::error::ConfigError; diff --git a/core/src/config/storage.rs b/core/src/config/storage.rs index 5ae99bb..eb4a22f 100644 --- a/core/src/config/storage.rs +++ b/core/src/config/storage.rs @@ -3,10 +3,10 @@ use std::path::PathBuf; #[cfg(feature="cli")] use clap::Args; +use rusqlite::Connection; #[cfg(any(test, feature="test"))] use tempfile::tempdir; -use crate::{config::{ConfigError, PartialConfig}, utils::{emptiable::Emptiable, get_binary_name, mergeable::Mergeable}}; -use libp2p::mdns::Config; +use crate::{config::{ConfigError, PartialConfig}, data::local::LocalDatabaseConnection, utils::{emptiable::Emptiable, get_binary_name, mergeable::Mergeable}}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug)] @@ -19,6 +19,9 @@ impl StorageConfig { pub fn get_local_database_path(&self) -> PathBuf { self.data_directory.join("local.sqlite") } + pub fn create_local_database_connection(&self) -> Connection { + Connection::from_storage_config(self) + } } impl TryFrom for StorageConfig { @@ -31,6 +34,7 @@ impl TryFrom for StorageConfig { }) } } + #[cfg_attr(feature="cli", derive(Args))] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PartialStorageConfig { diff --git a/core/src/error.rs b/core/src/error.rs index 025232c..396588d 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -10,20 +10,14 @@ pub enum Error { CiborSerialize(#[from] ciborium::ser::Error), #[error("Config error: {0}")] Config(#[from] crate::config::error::ConfigError), - #[error("Dial Error: {0}")] - Dial(#[from] libp2p::swarm::DialError), - #[error("Decoding identity error: {0}")] - IdentityDecoding(#[from] libp2p::identity::DecodingError), #[error("Infallible: {0}")] Infallible(#[from] std::convert::Infallible), #[error("IO Error: {0}")] Io(#[from]std::io::Error), + #[error("Iroh bind error: {0}")] + IrohBind(#[from] iroh::endpoint::BindError), #[error("mandatory config `{0}` is missing")] MissingConfig(&'static str), - #[error("Multiaddr error: {0}")] - Multiaddr(#[from] libp2p::multiaddr::Error), - #[error("Noise error: {0}")] - Noise(#[from] libp2p::noise::Error), #[error("Parse OsString error: {0:?}")] OsStringConvert(std::ffi::OsString), #[cfg(feature="cli")] @@ -33,8 +27,6 @@ pub enum Error { TomlDe(#[from] toml::de::Error), #[error("toml serialization error: {0}")] TomlSer(#[from] toml::ser::Error), - #[error("Transport error: {0}")] - Transport(#[from]libp2p::TransportError) } impl From for Error { diff --git a/core/src/global/config.rs b/core/src/global/config.rs index 4181eff..1bdc935 100644 --- a/core/src/global/config.rs +++ b/core/src/global/config.rs @@ -2,7 +2,7 @@ use tempfile::TempDir; use tokio::sync::OnceCell; -use crate::{config::{Config, ConfigError, PartialP2pConfig, PartialRpcConfig, PartialStorageConfig, StorageConfig}, error::Error}; +use crate::{config::{Config, ConfigError, PartialIrohConfig, PartialRpcConfig, PartialStorageConfig, StorageConfig}, error::Error}; pub static CONFIG: GlobalConfig = GlobalConfig::const_new(); pub struct GlobalConfig { diff --git a/core/src/global/mod.rs b/core/src/global/mod.rs index 13cbd3a..5baf371 100644 --- a/core/src/global/mod.rs +++ b/core/src/global/mod.rs @@ -1,7 +1,6 @@ use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::LazyLock}; -use crate::{config::{P2pConfig, PartialP2pConfig, StorageConfig}, error::Error }; -use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId}; +use crate::{config::{StorageConfig}, error::Error }; use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard}; mod config; @@ -12,9 +11,6 @@ pub fn generate_uuid() -> Uuid { Uuid::new_v7(Timestamp::now(ContextV7::new())) } -pub static DEFAULT_LISTEN_IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))]; - - fn uninitialized_message(var: T) -> String { format!("{} is uninitialized!", &stringify!(var)) } diff --git a/core/src/p2p/error.rs b/core/src/p2p/error.rs deleted file mode 100644 index 4b58cad..0000000 --- a/core/src/p2p/error.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[derive(Debug, thiserror::Error)] -pub enum P2pError { - -} \ No newline at end of file diff --git a/core/src/p2p/mod.rs b/core/src/p2p/mod.rs deleted file mode 100644 index 36634e2..0000000 --- a/core/src/p2p/mod.rs +++ /dev/null @@ -1,104 +0,0 @@ -pub mod error; -use chrono::Local; -use libp2p::{ identity::Keypair, mdns, ping, swarm, Multiaddr, PeerId}; -use sea_orm::{prelude::DateTimeUtc, ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, ModelTrait, QueryFilter}; -use tracing::{event, Level}; - -use crate::{cache::entity::{CachedPeerActiveModel, CachedAddressActiveModel, CachedAddressColumn, CachedAddressEntity, CachedAddressModel, CachedPeerColumn, CachedPeerEntity, CachedPeerModel}, data::value::{MultiaddrValue, PeerIdValue}, error::Error, global::DATABASE_CONNECTIONS}; - -#[derive(swarm::NetworkBehaviour)] -#[behaviour(to_swarm = "Event")] -pub struct Behaviour { - pub mdns: mdns::tokio::Behaviour, - pub ping: ping::Behaviour, -} - -impl TryFrom<&Keypair> for Behaviour { - type Error = Error; - fn try_from(keypair: &Keypair) -> Result { - Ok(Self { - mdns: mdns::tokio::Behaviour::new( - mdns::Config::default(), - keypair.public().into(), - )?, - ping: libp2p::ping::Behaviour::new(ping::Config::new()), - }) - } -} - -#[derive(Debug)] -pub enum Event { - Mdns(mdns::Event), - Ping(ping::Event), -} - -impl Event { - pub async fn run(&self) - { - match self { - Self::Mdns(x) => { - match x { - mdns::Event::Discovered(e) => { - for peer in e.iter() { - event!(Level::TRACE, "Peer discovered via mdns: {}, {}", &peer.0, &peer.1); - match try_get_or_insert_cached_peer(&peer.0, &peer.1).await { - Ok(_) => {}, - Err(e) => { - event!(Level::WARN, "{:?}", e); - } - }; - } - }, - _ => {}, - } - }, - _ => {} - } - } -} -impl From for Event { - fn from(event: mdns::Event) -> Self { - Self::Mdns(event) - } -} -impl From for Event { - fn from(event: ping::Event) -> Self { - Self::Ping(event) - } -} - -async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(CachedPeerModel, CachedAddressModel), Error> { - match ( - CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?, - CachedAddressEntity::find().filter(CachedAddressColumn::Multiaddress.eq(MultiaddrValue::from(peer_addr.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?, - ) { - (Some(x), Some(y) ) => { - if x.id == y.cached_peer_id { - event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr); - let mut addr: CachedAddressActiveModel = y.into(); - addr.updated_at = Set(Local::now().to_utc()); - let updated = addr.update(DATABASE_CONNECTIONS.get_cache_unchecked()).await?; - Ok((x, updated)) - } else { - y.delete(DATABASE_CONNECTIONS.get_cache().expect("Cache database should initialized beforehand!")).await?; - Ok((x.clone(), CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?)) - } - } - (Some(x), None) => { - event!(Level::INFO, "New address {} for {}", peer_addr, peer_id); - Ok((x.clone(),CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?)) - }, - (None, x) => { - event!(Level::INFO, "Add new peer: {}", peer_id); - let inserted = CachedPeerActiveModel::new(peer_id.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?; - if let Some(y) = x { - event!(Level::INFO, "Remove {} from {}", peer_addr, peer_id); - y.delete(DATABASE_CONNECTIONS.get_cache_unchecked()).await?; - }; - event!(Level::INFO, "Add address {} to {}", peer_addr, peer_id); - Ok((inserted.clone(), CachedAddressActiveModel::new(inserted.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?)) - }, - - - } -} \ No newline at end of file diff --git a/core/src/rpc/service/cached_peer.rs b/core/src/rpc/service/cached_peer.rs index 9603e84..ca49b7e 100644 --- a/core/src/rpc/service/cached_peer.rs +++ b/core/src/rpc/service/cached_peer.rs @@ -1,9 +1,8 @@ -use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{DATABASE_CONNECTIONS}, proto::CachedAddressMessage}; +use crate::{global::{DATABASE_CONNECTION}, proto::CachedAddressMessage}; use futures::future::join_all; use tonic::{Request, Response, Status}; use crate::proto::{cached_peer_service_server::{CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage}; -use sea_orm::prelude::*; #[derive(Debug, Default)] pub struct CachedPeerService {} diff --git a/core/src/server.rs b/core/src/server.rs index 734484c..4def1ad 100644 --- a/core/src/server.rs +++ b/core/src/server.rs @@ -1,13 +1,13 @@ -use crate::{config::{Config, P2pConfig, RpcConfig}, error::Error}; +use crate::{config::{Config, IrohConfig, RpcConfig}, error::Error}; pub trait ServerTrait { async fn serve_p2p(config: &T) -> Result<(), Error> - where T: AsRef; + where T: AsRef; async fn serve_rpc(config: &T) -> Result<(), Error> where T: AsRef; async fn serve_all(config: &T) -> Result<(), Error> where - T: AsRef + AsRef { + T: AsRef + AsRef { tokio::try_join!( Self::serve_p2p(config), Self::serve_rpc(config) diff --git a/core/src/tests.rs b/core/src/tests.rs index ba19fa7..4d4f260 100644 --- a/core/src/tests.rs +++ b/core/src/tests.rs @@ -2,7 +2,7 @@ use std::{path::PathBuf, sync::LazyLock}; use tempfile::TempDir; use url::Url; -use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}}; +use crate::{ config::{Config, PartialConfig, PartialIrohConfig, PartialRpcConfig, RpcConfig, StorageConfig}}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -13,7 +13,7 @@ pub static TEST_CONFIG: LazyLock = LazyLock::new(|| { Config { - p2p: PartialP2pConfig::default().with_new_private_key().try_into().unwrap(), + iroh: PartialIrohConfig::default().with_new_secret_key().try_into().unwrap(), storage: StorageConfig { data_directory: data_dir, cache_directory: cache_dir, diff --git a/examples/core/Cargo.toml b/examples/core/Cargo.toml index 10bec3f..804dab1 100644 --- a/examples/core/Cargo.toml +++ b/examples/core/Cargo.toml @@ -9,7 +9,6 @@ repository.workspace = true [dependencies] bevy.workspace = true caretta-sync = { path = "../..", features = ["bevy"] } -libp2p.workspace = true tokio.workspace = true tokio-stream = { version = "0.1.17", features = ["net"] } tonic.workspace = true diff --git a/examples/desktop/Cargo.toml b/examples/desktop/Cargo.toml index 42b4277..14ab4ea 100644 --- a/examples/desktop/Cargo.toml +++ b/examples/desktop/Cargo.toml @@ -10,5 +10,4 @@ repository.workspace = true clap.workspace = true caretta-sync = { path = "../..", features = ["cli", "bevy", "test"] } caretta-sync-example-core.path = "../core" -libp2p.workspace = true tokio.workspace = true