pub mod error; use chrono::Local; use libp2p::{ identity::Keypair, mdns, ping, swarm, Multiaddr, PeerId}; use sea_orm::{prelude::DateTimeUtc, ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, ModelTrait, QueryFilter}; use tracing::{event, Level}; use crate::{cache::entity::{CachedPeerActiveModel, CachedAddressActiveModel, CachedAddressColumn, CachedAddressEntity, CachedAddressModel, CachedPeerColumn, CachedPeerEntity, CachedPeerModel}, data::value::{MultiaddrValue, PeerIdValue}, error::Error, global::CACHE_DATABASE_CONNECTION}; #[derive(swarm::NetworkBehaviour)] #[behaviour(to_swarm = "Event")] pub struct Behaviour { pub mdns: mdns::tokio::Behaviour, pub ping: ping::Behaviour, } impl TryFrom<&Keypair> for Behaviour { type Error = Error; fn try_from(keypair: &Keypair) -> Result { Ok(Self { mdns: mdns::tokio::Behaviour::new( mdns::Config::default(), keypair.public().into(), )?, ping: libp2p::ping::Behaviour::new(ping::Config::new()), }) } } #[derive(Debug)] pub enum Event { Mdns(mdns::Event), Ping(ping::Event), } impl Event { pub async fn run(&self) { match self { Self::Mdns(x) => { match x { mdns::Event::Discovered(e) => { for peer in e.iter() { event!(Level::TRACE, "Peer discovered via mdns: {}, {}", &peer.0, &peer.1); match try_get_or_insert_cached_peer(&peer.0, &peer.1).await { Ok(_) => {}, Err(e) => { event!(Level::WARN, "{:?}", e); } }; } }, _ => {}, } }, _ => {} } } } impl From for Event { fn from(event: mdns::Event) -> Self { Self::Mdns(event) } } impl From for Event { fn from(event: ping::Event) -> Self { Self::Ping(event) } } async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(CachedPeerModel, CachedAddressModel), Error> { match ( CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(CACHE_DATABASE_CONNECTION.get()).await?, CachedAddressEntity::find().filter(CachedAddressColumn::Address.eq(MultiaddrValue::from(peer_addr.clone()))).one(CACHE_DATABASE_CONNECTION.get()).await?, ) { (Some(x), Some(y) ) => { if x.id == y.cached_peer_id { event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr); let mut addr: CachedAddressActiveModel = y.into(); addr.last_used_at = Set(Local::now().to_utc()); let updated = addr.update(CACHE_DATABASE_CONNECTION.get()).await?; Ok((x, updated)) } else { y.delete(CACHE_DATABASE_CONNECTION.get()).await?; Ok((x.clone(), CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get()).await?)) } } (Some(x), None) => { event!(Level::INFO, "New address {} for {}", peer_addr, peer_id); Ok((x.clone(),CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get()).await?)) }, (None, x) => { event!(Level::INFO, "Add new peer: {}", peer_id); let inserted = CachedPeerActiveModel::new(peer_id.clone()).insert(CACHE_DATABASE_CONNECTION.get()).await?; if let Some(y) = x { event!(Level::INFO, "Remove {} from {}", peer_addr, peer_id); y.delete(CACHE_DATABASE_CONNECTION.get()).await?; }; event!(Level::INFO, "Add address {} to {}", peer_addr, peer_id); Ok((inserted.clone(), CachedAddressActiveModel::new(inserted.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get()).await?)) }, } }