Replace p2p config to iroh config

This commit is contained in:
fluo10 2025-09-03 13:22:31 +09:00
parent 7fe348e803
commit b53c7170eb
18 changed files with 181 additions and 351 deletions

View file

@ -43,7 +43,6 @@ ciborium = "0.2.2"
clap = { version = "4.5.38", features = ["derive"] } clap = { version = "4.5.38", features = ["derive"] }
caretta-sync-core.path = "core" caretta-sync-core.path = "core"
futures = { version = "0.3.31", features = ["executor"] } futures = { version = "0.3.31", features = ["executor"] }
libp2p = { version = "0.55.0", features = ["macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux" ] }
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
thiserror = "2.0.12" thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] } tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] }

View file

@ -15,7 +15,6 @@ ciborium.workspace = true
clap.workspace = true clap.workspace = true
dirs = "6.0.0" dirs = "6.0.0"
caretta-sync-core = { workspace = true, features = ["cli"] } caretta-sync-core = { workspace = true, features = ["cli"] }
libp2p.workspace = true
serde.workspace = true serde.workspace = true
thiserror.workspace = true thiserror.workspace = true
tokio.workspace = true tokio.workspace = true

View file

@ -19,12 +19,12 @@ ciborium.workspace = true
clap = {workspace = true, optional = true} clap = {workspace = true, optional = true}
dirs = "6.0.0" dirs = "6.0.0"
futures.workspace = true futures.workspace = true
libp2p.workspace = true iroh = { version = "0.91.2", features = ["discovery-local-network", "discovery-pkarr-dht"] }
libp2p-core = { version = "0.43.0", features = ["serde"] }
libp2p-identity = { version = "0.2.11", features = ["ed25519", "peerid", "rand", "serde"] }
prost = "0.14.1" prost = "0.14.1"
prost-types = "0.14.1" prost-types = "0.14.1"
rusqlite = { version = "0.37.0", features = ["bundled"] }
serde.workspace = true serde.workspace = true
sysinfo = "0.37.0"
tempfile = { version = "3.20.0", optional = true } tempfile = { version = "3.20.0", optional = true }
thiserror.workspace = true thiserror.workspace = true
tokio.workspace = true tokio.workspace = true
@ -35,9 +35,8 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
uuid.workspace = true uuid.workspace = true
url.workspace = true url.workspace = true
sysinfo = "0.37.0"
whoami = "1.6.1" whoami = "1.6.1"
rusqlite = { version = "0.37.0", features = ["bundled"] } rand = "0.8.5"
[target.'cfg(target_os="android")'.dependencies] [target.'cfg(target_os="android")'.dependencies]
jni = "0.21.1" jni = "0.21.1"

134
core/src/config/iroh.rs Normal file
View file

@ -0,0 +1,134 @@
use std::{net::{IpAddr, Ipv4Addr}, ops, path::{Path, PathBuf}};
use base64::{prelude::BASE64_STANDARD, Engine};
#[cfg(feature="cli")]
use clap::Args;
use futures::StreamExt;
use iroh::{Endpoint, SecretKey};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
use tracing_subscriber::EnvFilter;
use crate::{
config::PartialConfig,
error::Error, p2p, utils::{emptiable::Emptiable, mergeable::Mergeable}
};
#[derive(Clone, Debug)]
pub struct IrohConfig {
pub enable: bool,
pub secret_key: SecretKey,
pub use_n0_discovery_service: bool,
}
impl IrohConfig {
async fn into_endpoint(config: Self) -> Result<Option<Endpoint>, crate::error::Error> {
if config.enable {
let mut endpoint = Endpoint::builder()
.secret_key(config.secret_key)
.discovery_dht()
.discovery_local_network();
if config.use_n0_discovery_service {
endpoint = endpoint.discovery_n0();
}
Ok(Some(endpoint.bind().await?))
} else {
Ok(None)
}
}
}
impl TryFrom<PartialIrohConfig> for IrohConfig {
type Error = crate::error::Error;
fn try_from(raw: PartialIrohConfig) -> Result<IrohConfig, Self::Error> {
Ok(IrohConfig {
enable: raw.enable.ok_or(Error::MissingConfig("iroh.enable"))?,
secret_key: raw.secret_key.ok_or(Error::MissingConfig("iroh.secret_key"))?,
use_n0_discovery_service: raw.use_n0_discovery_service.ok_or(Error::MissingConfig("iroh.use_n0_discovery_service"))?
})
}
}
#[cfg_attr(feature="cli",derive(Args))]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PartialIrohConfig {
#[cfg_attr(feature="cli",arg(long="p2p_enable"))]
pub enable: Option<bool>,
#[cfg_attr(feature="cli",arg(long))]
pub secret_key: Option<SecretKey>,
#[cfg_attr(feature="cli",arg(long))]
pub use_n0_discovery_service: Option<bool>,
}
impl PartialIrohConfig {
pub fn with_new_secret_key(mut self) -> Self {
let mut rng = rand::rngs::OsRng;
self.secret_key = Some(SecretKey::generate(&mut rng));
self
}
}
impl From<IrohConfig> for PartialIrohConfig {
fn from(config: IrohConfig) -> Self {
Self {
enable: Some(config.enable),
secret_key: Some(config.secret_key),
use_n0_discovery_service: Some(config.use_n0_discovery_service)
}
}
}
impl Default for PartialIrohConfig {
fn default() -> Self {
Self {
enable: Some(true),
secret_key: None,
use_n0_discovery_service: Some(true)
}
}
}
impl Emptiable for PartialIrohConfig {
fn empty() -> Self {
Self{
enable: None,
secret_key: None,
use_n0_discovery_service: None
}
}
fn is_empty(&self) -> bool {
self.enable.is_none() && self.secret_key.is_none() && self.use_n0_discovery_service.is_none()
}
}
impl Mergeable for PartialIrohConfig {
fn merge(&mut self, mut other: Self) {
if let Some(x) = other.enable.take() {
let _ = self.enable.insert(x);
};
if let Some(x) = other.secret_key.take() {
let _ = self.secret_key.insert(x);
};
if let Some(x) = other.use_n0_discovery_service.take() {
let _ = self.use_n0_discovery_service.insert(x);
};
}
}
impl Mergeable for Option<PartialIrohConfig> {
fn merge(&mut self, mut other: Self) {
match other.take() {
Some(x) => {
if let Some(y) = self.as_mut() {
y.merge(x);
} else {
let _ = self.insert(x);
}
},
None => {}
};
}
}

View file

@ -1,16 +1,16 @@
pub mod error; pub mod error;
mod storage; mod storage;
mod p2p; mod iroh;
mod rpc; mod rpc;
use std::{path::Path, default::Default}; use std::{default::Default, fs::File, io::{Read, Write}, path::Path};
use crate::{utils::{emptiable::Emptiable, mergeable::Mergeable}}; use crate::{utils::{emptiable::Emptiable, mergeable::Mergeable}};
pub use error::ConfigError; pub use error::ConfigError;
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}}; use tokio::{io::{AsyncReadExt, AsyncWriteExt}};
pub use storage::{StorageConfig, PartialStorageConfig}; pub use storage::{StorageConfig, PartialStorageConfig};
pub use p2p::{P2pConfig, PartialP2pConfig}; pub use iroh::{IrohConfig, PartialIrohConfig};
pub use rpc::*; pub use rpc::*;
#[cfg(feature="cli")] #[cfg(feature="cli")]
@ -18,7 +18,7 @@ use clap::Args;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Config { pub struct Config {
pub p2p: P2pConfig, pub iroh: IrohConfig,
pub storage: StorageConfig, pub storage: StorageConfig,
pub rpc: RpcConfig, pub rpc: RpcConfig,
} }
@ -29,9 +29,9 @@ impl AsRef<StorageConfig> for Config {
} }
} }
impl AsRef<P2pConfig> for Config { impl AsRef<IrohConfig> for Config {
fn as_ref(&self) -> &P2pConfig { fn as_ref(&self) -> &IrohConfig {
&self.p2p &self.iroh
} }
} }
@ -46,17 +46,17 @@ impl TryFrom<PartialConfig> for Config {
fn try_from(value: PartialConfig) -> Result<Self, Self::Error> { fn try_from(value: PartialConfig) -> Result<Self, Self::Error> {
Ok(Self{ Ok(Self{
rpc: value.rpc.ok_or(crate::error::Error::MissingConfig("rpc"))?.try_into()?, rpc: value.rpc.ok_or(crate::error::Error::MissingConfig("rpc"))?.try_into()?,
p2p: value.p2p.ok_or(crate::error::Error::MissingConfig("p2p"))?.try_into()?, iroh: value.iroh.ok_or(crate::error::Error::MissingConfig("p2p"))?.try_into()?,
storage: value.storage.ok_or(crate::error::Error::MissingConfig("storage"))?.try_into()? storage: value.storage.ok_or(crate::error::Error::MissingConfig("storage"))?.try_into()?
}) })
} }
} }
#[cfg_attr(feature="cli", derive(Args))] #[cfg_attr(feature="cli", derive(Args))]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PartialConfig { pub struct PartialConfig {
#[cfg_attr(feature="cli", command(flatten))] #[cfg_attr(feature="cli", command(flatten))]
pub p2p: Option<PartialP2pConfig>, pub iroh: Option<PartialIrohConfig>,
#[cfg_attr(feature="cli", command(flatten))] #[cfg_attr(feature="cli", command(flatten))]
pub storage: Option<PartialStorageConfig>, pub storage: Option<PartialStorageConfig>,
#[cfg_attr(feature="cli", command(flatten))] #[cfg_attr(feature="cli", command(flatten))]
@ -66,7 +66,7 @@ pub struct PartialConfig {
impl PartialConfig { impl PartialConfig {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
p2p : Some(PartialP2pConfig::empty().with_new_private_key()), iroh : Some(PartialIrohConfig::empty().with_new_secret_key()),
storage: Some(PartialStorageConfig::empty()), storage: Some(PartialStorageConfig::empty()),
rpc: Some(PartialRpcConfig::empty()), rpc: Some(PartialRpcConfig::empty()),
} }
@ -77,16 +77,16 @@ impl PartialConfig {
pub fn into_toml(&self) -> Result<String, toml::ser::Error> { pub fn into_toml(&self) -> Result<String, toml::ser::Error> {
toml::to_string(self) toml::to_string(self)
} }
pub async fn read_or_create<T>(path: T) -> Result<Self, ConfigError> pub fn read_or_create<T>(path: T) -> Result<Self, ConfigError>
where where
T: AsRef<Path> T: AsRef<Path>
{ {
if !path.as_ref().exists() { if !path.as_ref().exists() {
Self::new().write_to(&path).await?; Self::new().write_to(&path)?;
} }
Self::read_from(&path).await Self::read_from(&path)
} }
pub async fn read_from<T>(path:T) -> Result<Self, ConfigError> pub fn read_from<T>(path:T) -> Result<Self, ConfigError>
where where
T: AsRef<Path> T: AsRef<Path>
{ {
@ -94,15 +94,15 @@ impl PartialConfig {
if let Some(x) = path.as_ref().parent() { if let Some(x) = path.as_ref().parent() {
std::fs::create_dir_all(x)?; std::fs::create_dir_all(x)?;
}; };
let _ = File::create(&path).await?; let _ = File::create(&path)?;
} }
let mut file = File::open(path.as_ref()).await?; let mut file = File::open(path.as_ref())?;
let mut content = String::new(); let mut content = String::new();
file.read_to_string(&mut content).await?; file.read_to_string(&mut content)?;
let config: Self = toml::from_str(&content)?; let config: Self = toml::from_str(&content)?;
Ok(config) Ok(config)
} }
pub async fn write_to<T>(&self, path:T) -> Result<(), ConfigError> pub fn write_to<T>(&self, path:T) -> Result<(), ConfigError>
where where
T: AsRef<Path> T: AsRef<Path>
{ {
@ -110,15 +110,15 @@ impl PartialConfig {
if let Some(x) = path.as_ref().parent() { if let Some(x) = path.as_ref().parent() {
std::fs::create_dir_all(x)?; std::fs::create_dir_all(x)?;
}; };
let _ = File::create(&path).await?; let _ = File::create(&path)?;
} }
let mut file = File::create(&path).await?; let mut file = File::create(&path)?;
file.write_all(toml::to_string(self)?.as_bytes()).await?; file.write_all(toml::to_string(self)?.as_bytes())?;
Ok(()) Ok(())
} }
pub fn default(app_name: &'static str) -> Self { pub fn default(app_name: &'static str) -> Self {
Self { Self {
p2p: Some(PartialP2pConfig::default()), iroh: Some(PartialIrohConfig::default()),
rpc: Some(PartialRpcConfig::default(app_name)), rpc: Some(PartialRpcConfig::default(app_name)),
storage: Some(PartialStorageConfig::default(app_name)), storage: Some(PartialStorageConfig::default(app_name)),
} }
@ -128,7 +128,7 @@ impl PartialConfig {
impl From<Config> for PartialConfig { impl From<Config> for PartialConfig {
fn from(value: Config) -> Self { fn from(value: Config) -> Self {
Self { Self {
p2p: Some(value.p2p.into()), iroh: Some(value.iroh.into()),
storage: Some(value.storage.into()), storage: Some(value.storage.into()),
rpc: Some(value.rpc.into()) rpc: Some(value.rpc.into())
} }
@ -138,20 +138,20 @@ impl From<Config> for PartialConfig {
impl Emptiable for PartialConfig { impl Emptiable for PartialConfig {
fn empty() -> Self { fn empty() -> Self {
Self { Self {
p2p: None, iroh: None,
storage: None, storage: None,
rpc: None, rpc: None,
} }
} }
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.p2p.is_empty() && self.rpc.is_empty() && self.storage.is_empty() self.iroh.is_empty() && self.rpc.is_empty() && self.storage.is_empty()
} }
} }
impl Mergeable for PartialConfig { impl Mergeable for PartialConfig {
fn merge(&mut self, other: Self) { fn merge(&mut self, other: Self) {
self.p2p.merge(other.p2p); self.iroh.merge(other.iroh);
self.rpc.merge(other.rpc); self.rpc.merge(other.rpc);
self.storage.merge(other.storage); self.storage.merge(other.storage);
} }

View file

@ -1,181 +0,0 @@
use std::{net::{IpAddr, Ipv4Addr}, ops, path::{Path, PathBuf}};
use base64::{prelude::BASE64_STANDARD, Engine};
#[cfg(feature="cli")]
use clap::Args;
use futures::StreamExt;
use libp2p::{identity::{self, DecodingError, Keypair}, noise, ping, swarm::SwarmEvent, tcp, yamux, Swarm};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
use tracing_subscriber::EnvFilter;
use crate::{
config::PartialConfig,
error::Error, p2p, utils::{emptiable::Emptiable, mergeable::Mergeable}
};
static DEFAULT_P2P_LISTEN_IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))];
static DEFAULT_P2P_PORT: u16 = 0;
fn keypair_to_base64(keypair: &Keypair) -> String {
let vec = match keypair.to_protobuf_encoding() {
Ok(x) => x,
Err(_) => unreachable!(),
};
BASE64_STANDARD.encode(vec)
}
fn base64_to_keypair(base64: &str) -> Result<Keypair, Error> {
let vec = BASE64_STANDARD.decode(base64)?;
Ok(Keypair::from_protobuf_encoding(&vec)?)
}
#[derive(Clone, Debug)]
pub struct P2pConfig {
pub private_key: Keypair,
pub listen_ips: Vec<IpAddr>,
pub port: u16,
}
impl P2pConfig {
async fn try_into_swarm (self) -> Result<Swarm<p2p::Behaviour>, Error> {
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(self.private_key)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|keypair| p2p::Behaviour::try_from(keypair).unwrap())?
.build();
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
pub async fn launch_swarm(self) -> Result<(), Error>{
let mut swarm = self.try_into_swarm().await?;
loop{
let swarm_event = swarm.select_next_some().await;
tokio::spawn(async move{
match swarm_event {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
SwarmEvent::Behaviour(event) => {
println!("{event:?}");
event.run().await;
},
_ => {}
}
});
}
}
}
impl TryFrom<PartialP2pConfig> for P2pConfig {
type Error = crate::error::Error;
fn try_from(raw: PartialP2pConfig) -> Result<P2pConfig, Self::Error> {
Ok(P2pConfig {
private_key: base64_to_keypair(&raw.private_key.ok_or(Error::MissingConfig("secret"))?)?,
listen_ips: raw.listen_ips.ok_or(Error::MissingConfig("listen_ips"))?,
port: raw.port.ok_or(Error::MissingConfig("port"))?
})
}
}
#[cfg_attr(feature="cli",derive(Args))]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct PartialP2pConfig {
#[cfg_attr(feature="cli",arg(long))]
pub private_key: Option<String>,
#[cfg_attr(feature="cli",arg(long))]
pub listen_ips: Option<Vec<IpAddr>>,
#[cfg_attr(feature="cli",arg(long))]
pub port: Option<u16>,
}
impl PartialP2pConfig {
pub fn with_new_private_key(mut self) -> Self {
self.private_key = Some(keypair_to_base64(&Keypair::generate_ed25519()));
self
}
pub fn init_private_key(&mut self) {
let _ = self.private_key.insert(keypair_to_base64(&Keypair::generate_ed25519()));
}
}
impl From<P2pConfig> for PartialP2pConfig {
fn from(config: P2pConfig) -> Self {
Self {
private_key: Some(keypair_to_base64(&config.private_key)),
listen_ips: Some(config.listen_ips),
port: Some(config.port)
}
}
}
impl Default for PartialP2pConfig {
fn default() -> Self {
Self {
private_key: None,
listen_ips: Some(Vec::from(DEFAULT_P2P_LISTEN_IPS)),
port: Some(DEFAULT_P2P_PORT),
}
}
}
impl Emptiable for PartialP2pConfig {
fn empty() -> Self {
Self{
private_key: None,
listen_ips: None,
port: None
}
}
fn is_empty(&self) -> bool {
self.private_key.is_none() && self.listen_ips.is_none() && self.port.is_none()
}
}
impl Mergeable for PartialP2pConfig {
fn merge(&mut self, mut other: Self) {
if let Some(x) = other.private_key.take() {
let _ = self.private_key.insert(x);
};
if let Some(x) = other.listen_ips.take() {
let _ = self.listen_ips.insert(x);
};
if let Some(x) = other.port.take() {
let _ = self.port.insert(x);
};
}
}
impl Mergeable for Option<PartialP2pConfig> {
fn merge(&mut self, mut other: Self) {
match other.take() {
Some(x) => {
if let Some(y) = self.as_mut() {
y.merge(x);
} else {
let _ = self.insert(x);
}
},
None => {}
};
}
}
#[cfg(test)]
mod tests {
use libp2p::identity;
use super::*;
#[tokio::test]
async fn parse_keypair() {
let keypair = identity::Keypair::generate_ed25519();
let keypair2 = base64_to_keypair(&keypair_to_base64(&keypair)).unwrap();
assert_eq!(keypair.public(), keypair2.public());
}
}

View file

@ -3,7 +3,6 @@ use std::{net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::PathBuf, str::
use clap::Args; use clap::Args;
use url::Url; use url::Url;
use crate::{config::PartialConfig, utils::{emptiable::Emptiable, mergeable::Mergeable}}; use crate::{config::PartialConfig, utils::{emptiable::Emptiable, mergeable::Mergeable}};
use libp2p::mdns::Config;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::config::error::ConfigError; use crate::config::error::ConfigError;

View file

@ -3,10 +3,10 @@ use std::path::PathBuf;
#[cfg(feature="cli")] #[cfg(feature="cli")]
use clap::Args; use clap::Args;
use rusqlite::Connection;
#[cfg(any(test, feature="test"))] #[cfg(any(test, feature="test"))]
use tempfile::tempdir; use tempfile::tempdir;
use crate::{config::{ConfigError, PartialConfig}, utils::{emptiable::Emptiable, get_binary_name, mergeable::Mergeable}}; use crate::{config::{ConfigError, PartialConfig}, data::local::LocalDatabaseConnection, utils::{emptiable::Emptiable, get_binary_name, mergeable::Mergeable}};
use libp2p::mdns::Config;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -19,6 +19,9 @@ impl StorageConfig {
pub fn get_local_database_path(&self) -> PathBuf { pub fn get_local_database_path(&self) -> PathBuf {
self.data_directory.join("local.sqlite") self.data_directory.join("local.sqlite")
} }
pub fn create_local_database_connection(&self) -> Connection {
Connection::from_storage_config(self)
}
} }
impl TryFrom<PartialStorageConfig> for StorageConfig { impl TryFrom<PartialStorageConfig> for StorageConfig {
@ -31,6 +34,7 @@ impl TryFrom<PartialStorageConfig> for StorageConfig {
}) })
} }
} }
#[cfg_attr(feature="cli", derive(Args))] #[cfg_attr(feature="cli", derive(Args))]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct PartialStorageConfig { pub struct PartialStorageConfig {

View file

@ -10,20 +10,14 @@ pub enum Error {
CiborSerialize(#[from] ciborium::ser::Error<std::io::Error>), CiborSerialize(#[from] ciborium::ser::Error<std::io::Error>),
#[error("Config error: {0}")] #[error("Config error: {0}")]
Config(#[from] crate::config::error::ConfigError), Config(#[from] crate::config::error::ConfigError),
#[error("Dial Error: {0}")]
Dial(#[from] libp2p::swarm::DialError),
#[error("Decoding identity error: {0}")]
IdentityDecoding(#[from] libp2p::identity::DecodingError),
#[error("Infallible: {0}")] #[error("Infallible: {0}")]
Infallible(#[from] std::convert::Infallible), Infallible(#[from] std::convert::Infallible),
#[error("IO Error: {0}")] #[error("IO Error: {0}")]
Io(#[from]std::io::Error), Io(#[from]std::io::Error),
#[error("Iroh bind error: {0}")]
IrohBind(#[from] iroh::endpoint::BindError),
#[error("mandatory config `{0}` is missing")] #[error("mandatory config `{0}` is missing")]
MissingConfig(&'static str), MissingConfig(&'static str),
#[error("Multiaddr error: {0}")]
Multiaddr(#[from] libp2p::multiaddr::Error),
#[error("Noise error: {0}")]
Noise(#[from] libp2p::noise::Error),
#[error("Parse OsString error: {0:?}")] #[error("Parse OsString error: {0:?}")]
OsStringConvert(std::ffi::OsString), OsStringConvert(std::ffi::OsString),
#[cfg(feature="cli")] #[cfg(feature="cli")]
@ -33,8 +27,6 @@ pub enum Error {
TomlDe(#[from] toml::de::Error), TomlDe(#[from] toml::de::Error),
#[error("toml serialization error: {0}")] #[error("toml serialization error: {0}")]
TomlSer(#[from] toml::ser::Error), TomlSer(#[from] toml::ser::Error),
#[error("Transport error: {0}")]
Transport(#[from]libp2p::TransportError<std::io::Error>)
} }
impl From<std::ffi::OsString> for Error { impl From<std::ffi::OsString> for Error {

View file

@ -2,7 +2,7 @@
use tempfile::TempDir; use tempfile::TempDir;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use crate::{config::{Config, ConfigError, PartialP2pConfig, PartialRpcConfig, PartialStorageConfig, StorageConfig}, error::Error}; use crate::{config::{Config, ConfigError, PartialIrohConfig, PartialRpcConfig, PartialStorageConfig, StorageConfig}, error::Error};
pub static CONFIG: GlobalConfig = GlobalConfig::const_new(); pub static CONFIG: GlobalConfig = GlobalConfig::const_new();
pub struct GlobalConfig { pub struct GlobalConfig {

View file

@ -1,7 +1,6 @@
use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::LazyLock}; use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::LazyLock};
use crate::{config::{P2pConfig, PartialP2pConfig, StorageConfig}, error::Error }; use crate::{config::{StorageConfig}, error::Error };
use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId};
use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard};
mod config; mod config;
@ -12,9 +11,6 @@ pub fn generate_uuid() -> Uuid {
Uuid::new_v7(Timestamp::now(ContextV7::new())) Uuid::new_v7(Timestamp::now(ContextV7::new()))
} }
pub static DEFAULT_LISTEN_IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))];
fn uninitialized_message<T>(var: T) -> String { fn uninitialized_message<T>(var: T) -> String {
format!("{} is uninitialized!", &stringify!(var)) format!("{} is uninitialized!", &stringify!(var))
} }

View file

@ -1,4 +0,0 @@
#[derive(Debug, thiserror::Error)]
pub enum P2pError {
}

View file

@ -1,104 +0,0 @@
pub mod error;
use chrono::Local;
use libp2p::{ identity::Keypair, mdns, ping, swarm, Multiaddr, PeerId};
use sea_orm::{prelude::DateTimeUtc, ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, ModelTrait, QueryFilter};
use tracing::{event, Level};
use crate::{cache::entity::{CachedPeerActiveModel, CachedAddressActiveModel, CachedAddressColumn, CachedAddressEntity, CachedAddressModel, CachedPeerColumn, CachedPeerEntity, CachedPeerModel}, data::value::{MultiaddrValue, PeerIdValue}, error::Error, global::DATABASE_CONNECTIONS};
#[derive(swarm::NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub struct Behaviour {
pub mdns: mdns::tokio::Behaviour,
pub ping: ping::Behaviour,
}
impl TryFrom<&Keypair> for Behaviour {
type Error = Error;
fn try_from(keypair: &Keypair) -> Result<Self, Error> {
Ok(Self {
mdns: mdns::tokio::Behaviour::new(
mdns::Config::default(),
keypair.public().into(),
)?,
ping: libp2p::ping::Behaviour::new(ping::Config::new()),
})
}
}
#[derive(Debug)]
pub enum Event {
Mdns(mdns::Event),
Ping(ping::Event),
}
impl Event {
pub async fn run(&self)
{
match self {
Self::Mdns(x) => {
match x {
mdns::Event::Discovered(e) => {
for peer in e.iter() {
event!(Level::TRACE, "Peer discovered via mdns: {}, {}", &peer.0, &peer.1);
match try_get_or_insert_cached_peer(&peer.0, &peer.1).await {
Ok(_) => {},
Err(e) => {
event!(Level::WARN, "{:?}", e);
}
};
}
},
_ => {},
}
},
_ => {}
}
}
}
impl From<mdns::Event> for Event {
fn from(event: mdns::Event) -> Self {
Self::Mdns(event)
}
}
impl From<ping::Event> for Event {
fn from(event: ping::Event) -> Self {
Self::Ping(event)
}
}
async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(CachedPeerModel, CachedAddressModel), Error> {
match (
CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?,
CachedAddressEntity::find().filter(CachedAddressColumn::Multiaddress.eq(MultiaddrValue::from(peer_addr.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?,
) {
(Some(x), Some(y) ) => {
if x.id == y.cached_peer_id {
event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr);
let mut addr: CachedAddressActiveModel = y.into();
addr.updated_at = Set(Local::now().to_utc());
let updated = addr.update(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
Ok((x, updated))
} else {
y.delete(DATABASE_CONNECTIONS.get_cache().expect("Cache database should initialized beforehand!")).await?;
Ok((x.clone(), CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
}
}
(Some(x), None) => {
event!(Level::INFO, "New address {} for {}", peer_addr, peer_id);
Ok((x.clone(),CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
},
(None, x) => {
event!(Level::INFO, "Add new peer: {}", peer_id);
let inserted = CachedPeerActiveModel::new(peer_id.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
if let Some(y) = x {
event!(Level::INFO, "Remove {} from {}", peer_addr, peer_id);
y.delete(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
};
event!(Level::INFO, "Add address {} to {}", peer_addr, peer_id);
Ok((inserted.clone(), CachedAddressActiveModel::new(inserted.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
},
}
}

View file

@ -1,9 +1,8 @@
use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{DATABASE_CONNECTIONS}, proto::CachedAddressMessage}; use crate::{global::{DATABASE_CONNECTION}, proto::CachedAddressMessage};
use futures::future::join_all; use futures::future::join_all;
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
use crate::proto::{cached_peer_service_server::{CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage}; use crate::proto::{cached_peer_service_server::{CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage};
use sea_orm::prelude::*;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct CachedPeerService {} pub struct CachedPeerService {}

View file

@ -1,13 +1,13 @@
use crate::{config::{Config, P2pConfig, RpcConfig}, error::Error}; use crate::{config::{Config, IrohConfig, RpcConfig}, error::Error};
pub trait ServerTrait { pub trait ServerTrait {
async fn serve_p2p<T>(config: &T) -> Result<(), Error> async fn serve_p2p<T>(config: &T) -> Result<(), Error>
where T: AsRef<P2pConfig>; where T: AsRef<IrohConfig>;
async fn serve_rpc<T>(config: &T) -> Result<(), Error> async fn serve_rpc<T>(config: &T) -> Result<(), Error>
where T: AsRef<RpcConfig>; where T: AsRef<RpcConfig>;
async fn serve_all<T>(config: &T) -> Result<(), Error> async fn serve_all<T>(config: &T) -> Result<(), Error>
where where
T: AsRef<P2pConfig> + AsRef<RpcConfig> { T: AsRef<IrohConfig> + AsRef<RpcConfig> {
tokio::try_join!( tokio::try_join!(
Self::serve_p2p(config), Self::serve_p2p(config),
Self::serve_rpc(config) Self::serve_rpc(config)

View file

@ -2,7 +2,7 @@ use std::{path::PathBuf, sync::LazyLock};
use tempfile::TempDir; use tempfile::TempDir;
use url::Url; use url::Url;
use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}}; use crate::{ config::{Config, PartialConfig, PartialIrohConfig, PartialRpcConfig, RpcConfig, StorageConfig}};
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
@ -13,7 +13,7 @@ pub static TEST_CONFIG: LazyLock<Config> = LazyLock::new(|| {
Config { Config {
p2p: PartialP2pConfig::default().with_new_private_key().try_into().unwrap(), iroh: PartialIrohConfig::default().with_new_secret_key().try_into().unwrap(),
storage: StorageConfig { storage: StorageConfig {
data_directory: data_dir, data_directory: data_dir,
cache_directory: cache_dir, cache_directory: cache_dir,

View file

@ -9,7 +9,6 @@ repository.workspace = true
[dependencies] [dependencies]
bevy.workspace = true bevy.workspace = true
caretta-sync = { path = "../..", features = ["bevy"] } caretta-sync = { path = "../..", features = ["bevy"] }
libp2p.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-stream = { version = "0.1.17", features = ["net"] } tokio-stream = { version = "0.1.17", features = ["net"] }
tonic.workspace = true tonic.workspace = true

View file

@ -10,5 +10,4 @@ repository.workspace = true
clap.workspace = true clap.workspace = true
caretta-sync = { path = "../..", features = ["cli", "bevy", "test"] } caretta-sync = { path = "../..", features = ["cli", "bevy", "test"] }
caretta-sync-example-core.path = "../core" caretta-sync-example-core.path = "../core"
libp2p.workspace = true
tokio.workspace = true tokio.workspace = true