From f7e67759de192ac1923403bd6c92cf42ae786f28 Mon Sep 17 00:00:00 2001 From: fluo10 Date: Tue, 12 Aug 2025 07:29:23 +0900 Subject: [PATCH] Move rpc module from desktop to core --- Cargo.toml | 3 +- core/Cargo.toml | 9 +++++- {desktop => core}/build.rs | 0 {macros => core/macros}/Cargo.toml | 0 {macros => core/macros}/src/derive.rs | 0 {macros => core/macros}/src/lib.rs | 0 .../macros}/tests/derive_emptiable.rs | 0 .../macros}/tests/derive_mergeable.rs | 0 .../macros}/tests/derive_runnable.rs | 0 .../macros}/tests/derive_syncable.rs | 0 {desktop => core}/proto/caretta.proto | 14 +++++++-- core/src/cache/entity/cached_address.rs | 22 ++++++++++--- core/src/cache/entity/cached_peer.rs | 9 ++++++ core/src/cache/entity/mod.rs | 5 +-- .../m20220101_000001_create_cache_tables.rs | 31 ++++++++++++------- core/src/client.rs | 0 core/src/ipc.rs | 4 --- core/src/lib.rs | 2 +- core/src/p2p/mod.rs | 2 +- core/src/rpc/mod.rs | 2 ++ core/src/rpc/proto/cached_peer.rs | 0 core/src/rpc/proto/mod.rs | 9 ++++++ core/src/rpc/server/cached_peer.rs | 30 ++++++++++++++++++ core/src/rpc/server/mod.rs | 1 + core/src/utils/mod.rs | 9 ++++++ desktop/Cargo.toml | 5 ++- desktop/src/rpc/mod.rs | 13 -------- desktop/src/rpc/server/cached_peer.rs | 20 ------------ desktop/src/rpc/server/mod.rs | 1 - 29 files changed, 125 insertions(+), 66 deletions(-) rename {desktop => core}/build.rs (100%) rename {macros => core/macros}/Cargo.toml (100%) rename {macros => core/macros}/src/derive.rs (100%) rename {macros => core/macros}/src/lib.rs (100%) rename {macros => core/macros}/tests/derive_emptiable.rs (100%) rename {macros => core/macros}/tests/derive_mergeable.rs (100%) rename {macros => core/macros}/tests/derive_runnable.rs (100%) rename {macros => core/macros}/tests/derive_syncable.rs (100%) rename {desktop => core}/proto/caretta.proto (55%) create mode 100644 core/src/client.rs delete mode 100644 core/src/ipc.rs create mode 100644 core/src/rpc/mod.rs create mode 100644 core/src/rpc/proto/cached_peer.rs create mode 100644 core/src/rpc/proto/mod.rs create mode 100644 core/src/rpc/server/cached_peer.rs create mode 100644 core/src/rpc/server/mod.rs delete mode 100644 desktop/src/rpc/mod.rs delete mode 100644 desktop/src/rpc/server/cached_peer.rs delete mode 100644 desktop/src/rpc/server/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 36cb629..8e28cf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [ "core", "desktop", "macros", "mobile", "examples/*" ] +members = [ "core", "core/macros", "desktop", "mobile", "examples/*" ] resolver = "3" [workspace.package] @@ -21,4 +21,5 @@ sea-orm-migration = { version = "1.1.0", features = ["runtime-tokio-rustls", "sq serde = { version = "1.0.219", features = ["derive"] } thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] } +tonic = "0.14.0" uuid = { version = "1.17.0", features = ["v7"] } diff --git a/core/Cargo.toml b/core/Cargo.toml index 9c8d4af..3150480 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,10 +20,11 @@ chrono-tz = "0.10.3" ciborium.workspace = true clap = {workspace = true, optional = true} futures = "0.3.31" -caretta-macros = { path = "../macros", optional = true } +caretta-macros = { path = "macros", optional = true } libp2p.workspace = true libp2p-core = { version = "0.43.0", features = ["serde"] } libp2p-identity = { version = "0.2.11", features = ["ed25519", "peerid", "rand", "serde"] } +prost = "0.14.1" sea-orm.workspace = true sea-orm-migration.workspace = true serde.workspace = true @@ -31,9 +32,15 @@ tempfile = { version = "3.20.0", optional = true } thiserror.workspace = true tokio.workspace = true toml = "0.8.22" +tonic.workspace = true +tonic-prost = "0.14.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } uuid.workspace = true +prost-types = "0.14.1" [dev-dependencies] tempfile = "3.20.0" + +[build-dependencies] +tonic-prost-build = "0.14.0" diff --git a/desktop/build.rs b/core/build.rs similarity index 100% rename from desktop/build.rs rename to core/build.rs diff --git a/macros/Cargo.toml b/core/macros/Cargo.toml similarity index 100% rename from macros/Cargo.toml rename to core/macros/Cargo.toml diff --git a/macros/src/derive.rs b/core/macros/src/derive.rs similarity index 100% rename from macros/src/derive.rs rename to core/macros/src/derive.rs diff --git a/macros/src/lib.rs b/core/macros/src/lib.rs similarity index 100% rename from macros/src/lib.rs rename to core/macros/src/lib.rs diff --git a/macros/tests/derive_emptiable.rs b/core/macros/tests/derive_emptiable.rs similarity index 100% rename from macros/tests/derive_emptiable.rs rename to core/macros/tests/derive_emptiable.rs diff --git a/macros/tests/derive_mergeable.rs b/core/macros/tests/derive_mergeable.rs similarity index 100% rename from macros/tests/derive_mergeable.rs rename to core/macros/tests/derive_mergeable.rs diff --git a/macros/tests/derive_runnable.rs b/core/macros/tests/derive_runnable.rs similarity index 100% rename from macros/tests/derive_runnable.rs rename to core/macros/tests/derive_runnable.rs diff --git a/macros/tests/derive_syncable.rs b/core/macros/tests/derive_syncable.rs similarity index 100% rename from macros/tests/derive_syncable.rs rename to core/macros/tests/derive_syncable.rs diff --git a/desktop/proto/caretta.proto b/core/proto/caretta.proto similarity index 55% rename from desktop/proto/caretta.proto rename to core/proto/caretta.proto index 16d46e9..4f83053 100644 --- a/desktop/proto/caretta.proto +++ b/core/proto/caretta.proto @@ -1,5 +1,6 @@ syntax = "proto3"; package caretta; +import "google/protobuf/timestamp.proto"; enum PeerListOrderBy { CREATED_AT = 0; @@ -19,10 +20,17 @@ message CachedPeerListRequest { } message CachedPeerMessage { - string peer_id = 1; - - repeated string multi_addresss = 2; + uint32 number = 1; + string peer_id = 2; + google.protobuf.Timestamp created_at = 3; + repeated CachedAddressMessage addresses = 4; +} +message CachedAddressMessage { + uint32 number = 1; + google.protobuf.Timestamp created_at = 2; + google.protobuf.Timestamp updated_at = 3; + string multiaddress = 4; } message CachedPeerListResponse { diff --git a/core/src/cache/entity/cached_address.rs b/core/src/cache/entity/cached_address.rs index db62585..64f0a10 100644 --- a/core/src/cache/entity/cached_address.rs +++ b/core/src/cache/entity/cached_address.rs @@ -2,12 +2,13 @@ use std::str::FromStr; use chrono::{Days, Local}; use libp2p::{multiaddr, Multiaddr, PeerId}; +use prost_types::Timestamp; use sea_orm::{entity::{ prelude::*, * }, sea_query}; use serde::{Deserialize, Serialize}; -use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}}; +use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, rpc::proto::CachedAddressMessage, utils::utc_to_timestamp}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)] @@ -18,11 +19,11 @@ pub struct Model { #[sea_orm(indexed)] pub created_at: DateTimeUtc, #[sea_orm(indexed)] - pub last_used_at: DateTimeUtc, + pub updated_at: DateTimeUtc, #[sea_orm(indexed)] pub cached_peer_id: u32, #[sea_orm(indexed)] - pub address: MultiaddrValue, + pub multiaddress: MultiaddrValue, } @@ -48,10 +49,21 @@ impl ActiveModel { let timestamp: DateTimeUtc = Local::now().to_utc(); Self{ cached_peer_id: Set(cached_peer_id), - address: Set(MultiaddrValue::from(multiaddr)), + multiaddress: Set(MultiaddrValue::from(multiaddr)), created_at: Set(timestamp), - last_used_at: Set(timestamp), + updated_at: Set(timestamp), ..Default::default() } } } + +impl From for CachedAddressMessage { + fn from(a: Model) -> Self { + Self { + number: a.id, + created_at: Some(utc_to_timestamp(a.created_at)), + updated_at: Some(utc_to_timestamp(a.updated_at)), + multiaddress: Multiaddr::from(a.multiaddress).to_string(), + } + } +} \ No newline at end of file diff --git a/core/src/cache/entity/cached_peer.rs b/core/src/cache/entity/cached_peer.rs index 78b24d7..0920e45 100644 --- a/core/src/cache/entity/cached_peer.rs +++ b/core/src/cache/entity/cached_peer.rs @@ -18,6 +18,8 @@ pub struct Model { #[sea_orm(indexed)] pub created_at: DateTimeUtc, #[sea_orm(indexed)] + pub updated_at: DateTimeUtc, + #[sea_orm(indexed)] pub peer_id: PeerIdValue, } @@ -42,7 +44,14 @@ impl ActiveModel { Self{ peer_id: Set(PeerIdValue::from(peer_id)), created_at: Set(timestamp), + updated_at: Set(timestamp), ..Default::default() } } +} + +impl Entity { + pub fn find_by_peer_id(peer_id: PeerId) -> Select { + Self::find().filter(Column::PeerId.eq(PeerIdValue::from(peer_id))) + } } \ No newline at end of file diff --git a/core/src/cache/entity/mod.rs b/core/src/cache/entity/mod.rs index ab96a24..27bef7c 100644 --- a/core/src/cache/entity/mod.rs +++ b/core/src/cache/entity/mod.rs @@ -8,7 +8,6 @@ pub use cached_peer::{ Entity as CachedPeerEntity, }; - pub use cached_address::{ ActiveModel as CachedAddressActiveModel, Column as CachedAddressColumn, @@ -28,6 +27,8 @@ mod tests { use libp2p::{identity::{self, Keypair}, multiaddr, swarm::handler::multi, Multiaddr, PeerId}; use sea_orm::ActiveModelTrait; + + #[tokio::test] async fn insert() { let db = get_or_init_test_cache_database().await; @@ -40,7 +41,7 @@ mod tests { let inserted_cached_address: CachedAddressModel = CachedAddressActiveModel::new(inserted_cached_peer.id, multiaddr.clone()) .insert(db).await.unwrap(); assert_eq!(PeerId::from(inserted_cached_peer.peer_id), peer_id); - assert_eq!(Multiaddr::from(inserted_cached_address.address), multiaddr); + assert_eq!(Multiaddr::from(inserted_cached_address.multiaddress), multiaddr); } } \ No newline at end of file diff --git a/core/src/cache/migration/m20220101_000001_create_cache_tables.rs b/core/src/cache/migration/m20220101_000001_create_cache_tables.rs index 70670e1..09bdfd1 100644 --- a/core/src/cache/migration/m20220101_000001_create_cache_tables.rs +++ b/core/src/cache/migration/m20220101_000001_create_cache_tables.rs @@ -26,11 +26,12 @@ enum CachedPeer { Id, PeerId, CreatedAt, + UpdatedAt, } -static IDX_CACHED_ADDRESS: &str = "idx_CACHED_ADDRESS"; static IDX_CACHED_PEER_PEER_ID: &str = "idx_cached_peer_peer_id"; static IDX_CACHED_PEER_CREATED_AT: &str = "idx_cached_peer_created_at"; +static IDX_CACHED_PEER_UPDATED_AT: &str = "idx_cached_peer_updated_at"; #[async_trait::async_trait] impl TableMigration for CachedPeer { @@ -42,6 +43,7 @@ impl TableMigration for CachedPeer { .col(pk_auto(Self::Id)) .col(string_len(Self::PeerId, 255)) .col(timestamp(Self::CreatedAt)) + .col(timestamp(Self::UpdatedAt)) .to_owned() ).await?; manager.create_index( @@ -58,6 +60,13 @@ impl TableMigration for CachedPeer { .col(Self::CreatedAt) .to_owned() ).await?; + manager.create_index( + Index::create() + .name(IDX_CACHED_PEER_UPDATED_AT) + .table(Self::Table) + .col(Self::UpdatedAt) + .to_owned() + ).await?; Ok(()) } async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{ @@ -71,14 +80,14 @@ enum CachedAddress { Id, CachedPeerId, CreatedAt, - LastUsedAt, - Address, + UpdatedAt, + Multiaddress, } -static IDX_CACHED_ADDRESS_ADDRESS: &str = "idx_cached_address_address"; +static IDX_CACHED_ADDRESS_MULTIADDRESS: &str = "idx_cached_address_multiaddress"; static IDX_CACHED_ADDRESS_CACHED_PEER_ID: &str = "idx_cached_address_cached_peer_id"; static IDX_CACHED_ADDRESS_CREATED_AT: &str = "idx_cached_address_created_at"; -static IDX_CACHED_ADDRESS_LAST_USED_AT: &str = "idx_cached_address_last_used_at"; +static IDX_CACHED_ADDRESS_UPDATED_AT: &str = "idx_cached_address_updated_at"; static FK_CACHED_ADDRESS_CACHED_PEER: &str = "fk_cached_address_cached_peer"; #[async_trait::async_trait] @@ -98,8 +107,8 @@ impl TableMigration for CachedAddress { .on_update(ForeignKeyAction::Cascade) ) .col(timestamp(Self::CreatedAt)) - .col(timestamp(Self::LastUsedAt)) - .col(text_uniq(Self::Address)) + .col(timestamp(Self::UpdatedAt)) + .col(text_uniq(Self::Multiaddress)) .to_owned() ).await?; manager.create_index( @@ -111,9 +120,9 @@ impl TableMigration for CachedAddress { ).await?; manager.create_index( Index::create() - .name(IDX_CACHED_ADDRESS_ADDRESS) + .name(IDX_CACHED_ADDRESS_MULTIADDRESS) .table(Self::Table) - .col(Self::Address) + .col(Self::Multiaddress) .to_owned() ).await?; manager.create_index( @@ -125,9 +134,9 @@ impl TableMigration for CachedAddress { ).await?; manager.create_index( Index::create() - .name(IDX_CACHED_ADDRESS_LAST_USED_AT) + .name(IDX_CACHED_ADDRESS_UPDATED_AT) .table(Self::Table) - .col(Self::LastUsedAt) + .col(Self::UpdatedAt) .to_owned() ).await?; diff --git a/core/src/client.rs b/core/src/client.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/ipc.rs b/core/src/ipc.rs deleted file mode 100644 index 63a8d5f..0000000 --- a/core/src/ipc.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub trait Message { - fn into_vec_u8(self) -> Vec; - fn from_vec_u8() -> Self; -} \ No newline at end of file diff --git a/core/src/lib.rs b/core/src/lib.rs index 2baa9c4..cfbe82e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -3,10 +3,10 @@ pub mod config; pub mod data; pub mod error; pub mod global; -pub mod ipc; pub mod message; pub mod migration; pub mod p2p; +pub mod rpc; #[cfg(any(test, feature="test"))] pub mod tests; pub mod utils; diff --git a/core/src/p2p/mod.rs b/core/src/p2p/mod.rs index 6130a42..80e743b 100644 --- a/core/src/p2p/mod.rs +++ b/core/src/p2p/mod.rs @@ -76,7 +76,7 @@ async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) if x.id == y.cached_peer_id { event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr); let mut addr: CachedAddressActiveModel = y.into(); - addr.last_used_at = Set(Local::now().to_utc()); + addr.updated_at = Set(Local::now().to_utc()); let updated = addr.update(CACHE_DATABASE_CONNECTION.get()).await?; Ok((x, updated)) } else { diff --git a/core/src/rpc/mod.rs b/core/src/rpc/mod.rs new file mode 100644 index 0000000..c6eebbb --- /dev/null +++ b/core/src/rpc/mod.rs @@ -0,0 +1,2 @@ +pub mod proto; +pub mod server; diff --git a/core/src/rpc/proto/cached_peer.rs b/core/src/rpc/proto/cached_peer.rs new file mode 100644 index 0000000..e69de29 diff --git a/core/src/rpc/proto/mod.rs b/core/src/rpc/proto/mod.rs new file mode 100644 index 0000000..b2e88a0 --- /dev/null +++ b/core/src/rpc/proto/mod.rs @@ -0,0 +1,9 @@ + +use crate::{cache::entity::CachedPeerModel, error::Error}; + +tonic::include_proto!("caretta"); + +pub trait CachedPeerInterface { + async fn list( req: CachedPeerListRequest) -> Result; +} + diff --git a/core/src/rpc/server/cached_peer.rs b/core/src/rpc/server/cached_peer.rs new file mode 100644 index 0000000..682cd6d --- /dev/null +++ b/core/src/rpc/server/cached_peer.rs @@ -0,0 +1,30 @@ +use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{CACHE_DATABASE_CONNECTION, DATA_DATABASE_CONNECTION}, rpc::proto::CachedAddressMessage}; +use tonic::{Request, Response, Status}; + +use crate::rpc::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage}; +use sea_orm::prelude::*; + +#[derive(Debug, Default)] +pub struct CachedPeerServer {} + + + +#[tonic::async_trait] +impl CachedPeerService for CachedPeerServer { + async fn list(&self, request: Request) -> Result, Status> { + println!("Got a request: {:?}", request); + todo!(); + + let reply = CachedPeerListResponse { + peers: CachedPeerEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.into_iter().map(|x| { + let addresses = CachedAddressEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))? + .map(|x| { + CachedAddressMessage::from(x) + }); + CachedPeerMessage::from(x) + }).collect(), + }; + + Ok(Response::new(reply)) + } +} diff --git a/core/src/rpc/server/mod.rs b/core/src/rpc/server/mod.rs new file mode 100644 index 0000000..ed2c36c --- /dev/null +++ b/core/src/rpc/server/mod.rs @@ -0,0 +1 @@ +pub mod cached_peer; \ No newline at end of file diff --git a/core/src/utils/mod.rs b/core/src/utils/mod.rs index 1233bee..a9263ef 100644 --- a/core/src/utils/mod.rs +++ b/core/src/utils/mod.rs @@ -1,4 +1,13 @@ +use prost_types::Timestamp; +use chrono::{DateTime, Timelike, Utc}; pub mod async_convert; pub mod emptiable; pub mod mergeable; pub mod runnable; + +pub fn utc_to_timestamp(utc: DateTime) -> Timestamp { + Timestamp{ + seconds: utc.timestamp(), + nanos: i32::try_from(utc.nanosecond()).unwrap(), + } +} \ No newline at end of file diff --git a/desktop/Cargo.toml b/desktop/Cargo.toml index 393ee71..4fa7c98 100644 --- a/desktop/Cargo.toml +++ b/desktop/Cargo.toml @@ -16,13 +16,12 @@ clap.workspace = true dirs = "6.0.0" caretta-core = { workspace = true, features = ["desktop"] } libp2p.workspace = true -prost = "0.14.1" sea-orm.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true -tonic = "0.14.0" -tonic-prost = "0.14.0" +tonic.workspace = true + uuid.workspace = true [dev-dependencies] diff --git a/desktop/src/rpc/mod.rs b/desktop/src/rpc/mod.rs deleted file mode 100644 index 31a454d..0000000 --- a/desktop/src/rpc/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -pub mod server; - -pub mod proto { - use caretta_core::cache::entity::CachedPeerModel; - - tonic::include_proto!("caretta"); - - impl From for CachedPeerMessage { - fn from(s: CachedPeerModel) -> Self { - todo!() - } - } -} \ No newline at end of file diff --git a/desktop/src/rpc/server/cached_peer.rs b/desktop/src/rpc/server/cached_peer.rs deleted file mode 100644 index a30a57c..0000000 --- a/desktop/src/rpc/server/cached_peer.rs +++ /dev/null @@ -1,20 +0,0 @@ -use caretta_core::{cache::entity::CachedPeerEntity, global::DATA_DATABASE_CONNECTION}; -use tonic::{Request, Response, Status}; - -use crate::rpc::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse}; -use sea_orm::prelude::*; - -#[derive(Debug, Default)] -pub struct CachedPeerServer {} - -#[tonic::async_trait] -impl CachedPeerService for CachedPeerServer { - async fn list(&self, request: Request) -> Result, Status> { - println!("Got a request: {:?}", request); - - let reply = CachedPeerListResponse { - peers: CachedPeerEntity::find().all(DATA_DATABASE_CONNECTION.get()).await? - }; - } - -} \ No newline at end of file diff --git a/desktop/src/rpc/server/mod.rs b/desktop/src/rpc/server/mod.rs deleted file mode 100644 index 8d64ea1..0000000 --- a/desktop/src/rpc/server/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod cached_peer;