diff --git a/core/src/cache/entity/cached_address.rs b/core/src/cache/entity/cached_address.rs index 64f0a10..4826631 100644 --- a/core/src/cache/entity/cached_address.rs +++ b/core/src/cache/entity/cached_address.rs @@ -8,7 +8,7 @@ use sea_orm::{entity::{ }, sea_query}; use serde::{Deserialize, Serialize}; -use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, rpc::proto::CachedAddressMessage, utils::utc_to_timestamp}; +use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, utils::utc_to_timestamp}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)] @@ -57,13 +57,3 @@ impl ActiveModel { } } -impl From for CachedAddressMessage { - fn from(a: Model) -> Self { - Self { - number: a.id, - created_at: Some(utc_to_timestamp(a.created_at)), - updated_at: Some(utc_to_timestamp(a.updated_at)), - multiaddress: Multiaddr::from(a.multiaddress).to_string(), - } - } -} \ No newline at end of file diff --git a/core/src/lib.rs b/core/src/lib.rs index cfbe82e..eec9477 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,6 +6,7 @@ pub mod global; pub mod message; pub mod migration; pub mod p2p; +pub mod proto; pub mod rpc; #[cfg(any(test, feature="test"))] pub mod tests; diff --git a/core/src/p2p/mod.rs b/core/src/p2p/mod.rs index 80e743b..eaa5001 100644 --- a/core/src/p2p/mod.rs +++ b/core/src/p2p/mod.rs @@ -70,7 +70,7 @@ impl From for Event { async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(CachedPeerModel, CachedAddressModel), Error> { match ( CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(CACHE_DATABASE_CONNECTION.get()).await?, - CachedAddressEntity::find().filter(CachedAddressColumn::Address.eq(MultiaddrValue::from(peer_addr.clone()))).one(CACHE_DATABASE_CONNECTION.get()).await?, + CachedAddressEntity::find().filter(CachedAddressColumn::Multiaddress.eq(MultiaddrValue::from(peer_addr.clone()))).one(CACHE_DATABASE_CONNECTION.get()).await?, ) { (Some(x), Some(y) ) => { if x.id == y.cached_peer_id { diff --git a/core/src/proto/cached_address.rs b/core/src/proto/cached_address.rs new file mode 100644 index 0000000..b538253 --- /dev/null +++ b/core/src/proto/cached_address.rs @@ -0,0 +1,16 @@ +use libp2p::Multiaddr; + +use crate::cache::entity::CachedAddressModel; +use crate::utils::utc_to_timestamp; +use crate::proto::CachedAddressMessage; + +impl From<&CachedAddressModel> for CachedAddressMessage { + fn from(a: &CachedAddressModel) -> Self { + Self { + number: a.id, + created_at: Some(utc_to_timestamp(a.created_at)), + updated_at: Some(utc_to_timestamp(a.updated_at)), + multiaddress: Multiaddr::from(a.multiaddress.clone()).to_string(), + } + } +} \ No newline at end of file diff --git a/core/src/proto/cached_peer.rs b/core/src/proto/cached_peer.rs new file mode 100644 index 0000000..6e83697 --- /dev/null +++ b/core/src/proto/cached_peer.rs @@ -0,0 +1,14 @@ +use crate::{cache::entity::{CachedAddressModel, CachedPeerModel}, proto::{CachedAddressMessage, CachedPeerMessage}, utils::utc_to_timestamp}; + +impl From<(&CachedPeerModel, &Vec)> for CachedPeerMessage { + fn from(source: (&CachedPeerModel, &Vec)) -> Self { + let (peer, addresses) = source; + + Self { + number: peer.id, + peer_id: peer.peer_id.to_string(), + created_at: Some(utc_to_timestamp(peer.created_at)), + addresses: addresses.iter().map(|x| CachedAddressMessage::from(x)).collect(), + } + } +} \ No newline at end of file diff --git a/core/src/proto/mod.rs b/core/src/proto/mod.rs new file mode 100644 index 0000000..db05f66 --- /dev/null +++ b/core/src/proto/mod.rs @@ -0,0 +1,5 @@ +mod cached_address; +mod cached_peer; + +tonic::include_proto!("caretta"); + diff --git a/core/src/rpc/mod.rs b/core/src/rpc/mod.rs index c6eebbb..74f47ad 100644 --- a/core/src/rpc/mod.rs +++ b/core/src/rpc/mod.rs @@ -1,2 +1 @@ -pub mod proto; pub mod server; diff --git a/core/src/rpc/proto/cached_peer.rs b/core/src/rpc/proto/cached_peer.rs deleted file mode 100644 index e69de29..0000000 diff --git a/core/src/rpc/proto/mod.rs b/core/src/rpc/proto/mod.rs deleted file mode 100644 index b2e88a0..0000000 --- a/core/src/rpc/proto/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ - -use crate::{cache::entity::CachedPeerModel, error::Error}; - -tonic::include_proto!("caretta"); - -pub trait CachedPeerInterface { - async fn list( req: CachedPeerListRequest) -> Result; -} - diff --git a/core/src/rpc/server/cached_peer.rs b/core/src/rpc/server/cached_peer.rs index 682cd6d..d51b0a6 100644 --- a/core/src/rpc/server/cached_peer.rs +++ b/core/src/rpc/server/cached_peer.rs @@ -1,7 +1,8 @@ -use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{CACHE_DATABASE_CONNECTION, DATA_DATABASE_CONNECTION}, rpc::proto::CachedAddressMessage}; +use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{CACHE_DATABASE_CONNECTION, DATA_DATABASE_CONNECTION}, proto::CachedAddressMessage}; +use futures::future::join_all; use tonic::{Request, Response, Status}; -use crate::rpc::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage}; +use crate::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage}; use sea_orm::prelude::*; #[derive(Debug, Default)] @@ -13,16 +14,15 @@ pub struct CachedPeerServer {} impl CachedPeerService for CachedPeerServer { async fn list(&self, request: Request) -> Result, Status> { println!("Got a request: {:?}", request); - todo!(); let reply = CachedPeerListResponse { - peers: CachedPeerEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.into_iter().map(|x| { - let addresses = CachedAddressEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))? - .map(|x| { - CachedAddressMessage::from(x) - }); - CachedPeerMessage::from(x) - }).collect(), + peers: join_all( CachedPeerEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.iter().map(|x| async move { + let addresses = CachedAddressEntity::find() + .all(CACHE_DATABASE_CONNECTION.get()) + .await + .or_else(|e| Err(Status::from_error(Box::new(e))))?; + Ok::(CachedPeerMessage::from((x, &addresses))) + })).await.into_iter().collect::,_>>()?, }; Ok(Response::new(reply)) diff --git a/desktop/src/lib.rs b/desktop/src/lib.rs index 30a67cf..371fa2c 100644 --- a/desktop/src/lib.rs +++ b/desktop/src/lib.rs @@ -1,7 +1,6 @@ pub mod cli; pub mod config; pub mod global; -pub mod rpc; pub mod utils; pub use caretta_core::{ cache,