Move rpc module from desktop to core

This commit is contained in:
fluo10 2025-08-12 07:29:23 +09:00
parent c39a28388d
commit f7e67759de
29 changed files with 125 additions and 66 deletions

View file

@ -1,5 +1,5 @@
[workspace]
members = [ "core", "desktop", "macros", "mobile", "examples/*" ]
members = [ "core", "core/macros", "desktop", "mobile", "examples/*" ]
resolver = "3"
[workspace.package]
@ -21,4 +21,5 @@ sea-orm-migration = { version = "1.1.0", features = ["runtime-tokio-rustls", "sq
serde = { version = "1.0.219", features = ["derive"] }
thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] }
tonic = "0.14.0"
uuid = { version = "1.17.0", features = ["v7"] }

View file

@ -20,10 +20,11 @@ chrono-tz = "0.10.3"
ciborium.workspace = true
clap = {workspace = true, optional = true}
futures = "0.3.31"
caretta-macros = { path = "../macros", optional = true }
caretta-macros = { path = "macros", optional = true }
libp2p.workspace = true
libp2p-core = { version = "0.43.0", features = ["serde"] }
libp2p-identity = { version = "0.2.11", features = ["ed25519", "peerid", "rand", "serde"] }
prost = "0.14.1"
sea-orm.workspace = true
sea-orm-migration.workspace = true
serde.workspace = true
@ -31,9 +32,15 @@ tempfile = { version = "3.20.0", optional = true }
thiserror.workspace = true
tokio.workspace = true
toml = "0.8.22"
tonic.workspace = true
tonic-prost = "0.14.0"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
uuid.workspace = true
prost-types = "0.14.1"
[dev-dependencies]
tempfile = "3.20.0"
[build-dependencies]
tonic-prost-build = "0.14.0"

View file

@ -1,5 +1,6 @@
syntax = "proto3";
package caretta;
import "google/protobuf/timestamp.proto";
enum PeerListOrderBy {
CREATED_AT = 0;
@ -19,10 +20,17 @@ message CachedPeerListRequest {
}
message CachedPeerMessage {
string peer_id = 1;
repeated string multi_addresss = 2;
uint32 number = 1;
string peer_id = 2;
google.protobuf.Timestamp created_at = 3;
repeated CachedAddressMessage addresses = 4;
}
message CachedAddressMessage {
uint32 number = 1;
google.protobuf.Timestamp created_at = 2;
google.protobuf.Timestamp updated_at = 3;
string multiaddress = 4;
}
message CachedPeerListResponse {

View file

@ -2,12 +2,13 @@ use std::str::FromStr;
use chrono::{Days, Local};
use libp2p::{multiaddr, Multiaddr, PeerId};
use prost_types::Timestamp;
use sea_orm::{entity::{
prelude::*, *
}, sea_query};
use serde::{Deserialize, Serialize};
use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}};
use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, rpc::proto::CachedAddressMessage, utils::utc_to_timestamp};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)]
@ -18,11 +19,11 @@ pub struct Model {
#[sea_orm(indexed)]
pub created_at: DateTimeUtc,
#[sea_orm(indexed)]
pub last_used_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
#[sea_orm(indexed)]
pub cached_peer_id: u32,
#[sea_orm(indexed)]
pub address: MultiaddrValue,
pub multiaddress: MultiaddrValue,
}
@ -48,10 +49,21 @@ impl ActiveModel {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
cached_peer_id: Set(cached_peer_id),
address: Set(MultiaddrValue::from(multiaddr)),
multiaddress: Set(MultiaddrValue::from(multiaddr)),
created_at: Set(timestamp),
last_used_at: Set(timestamp),
updated_at: Set(timestamp),
..Default::default()
}
}
}
impl From<Model> for CachedAddressMessage {
fn from(a: Model) -> Self {
Self {
number: a.id,
created_at: Some(utc_to_timestamp(a.created_at)),
updated_at: Some(utc_to_timestamp(a.updated_at)),
multiaddress: Multiaddr::from(a.multiaddress).to_string(),
}
}
}

View file

@ -18,6 +18,8 @@ pub struct Model {
#[sea_orm(indexed)]
pub created_at: DateTimeUtc,
#[sea_orm(indexed)]
pub updated_at: DateTimeUtc,
#[sea_orm(indexed)]
pub peer_id: PeerIdValue,
}
@ -42,7 +44,14 @@ impl ActiveModel {
Self{
peer_id: Set(PeerIdValue::from(peer_id)),
created_at: Set(timestamp),
updated_at: Set(timestamp),
..Default::default()
}
}
}
impl Entity {
pub fn find_by_peer_id(peer_id: PeerId) -> Select<Entity> {
Self::find().filter(Column::PeerId.eq(PeerIdValue::from(peer_id)))
}
}

View file

@ -8,7 +8,6 @@ pub use cached_peer::{
Entity as CachedPeerEntity,
};
pub use cached_address::{
ActiveModel as CachedAddressActiveModel,
Column as CachedAddressColumn,
@ -28,6 +27,8 @@ mod tests {
use libp2p::{identity::{self, Keypair}, multiaddr, swarm::handler::multi, Multiaddr, PeerId};
use sea_orm::ActiveModelTrait;
#[tokio::test]
async fn insert() {
let db = get_or_init_test_cache_database().await;
@ -40,7 +41,7 @@ mod tests {
let inserted_cached_address: CachedAddressModel = CachedAddressActiveModel::new(inserted_cached_peer.id, multiaddr.clone())
.insert(db).await.unwrap();
assert_eq!(PeerId::from(inserted_cached_peer.peer_id), peer_id);
assert_eq!(Multiaddr::from(inserted_cached_address.address), multiaddr);
assert_eq!(Multiaddr::from(inserted_cached_address.multiaddress), multiaddr);
}
}

View file

@ -26,11 +26,12 @@ enum CachedPeer {
Id,
PeerId,
CreatedAt,
UpdatedAt,
}
static IDX_CACHED_ADDRESS: &str = "idx_CACHED_ADDRESS";
static IDX_CACHED_PEER_PEER_ID: &str = "idx_cached_peer_peer_id";
static IDX_CACHED_PEER_CREATED_AT: &str = "idx_cached_peer_created_at";
static IDX_CACHED_PEER_UPDATED_AT: &str = "idx_cached_peer_updated_at";
#[async_trait::async_trait]
impl TableMigration for CachedPeer {
@ -42,6 +43,7 @@ impl TableMigration for CachedPeer {
.col(pk_auto(Self::Id))
.col(string_len(Self::PeerId, 255))
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::UpdatedAt))
.to_owned()
).await?;
manager.create_index(
@ -58,6 +60,13 @@ impl TableMigration for CachedPeer {
.col(Self::CreatedAt)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_PEER_UPDATED_AT)
.table(Self::Table)
.col(Self::UpdatedAt)
.to_owned()
).await?;
Ok(())
}
async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{
@ -71,14 +80,14 @@ enum CachedAddress {
Id,
CachedPeerId,
CreatedAt,
LastUsedAt,
Address,
UpdatedAt,
Multiaddress,
}
static IDX_CACHED_ADDRESS_ADDRESS: &str = "idx_cached_address_address";
static IDX_CACHED_ADDRESS_MULTIADDRESS: &str = "idx_cached_address_multiaddress";
static IDX_CACHED_ADDRESS_CACHED_PEER_ID: &str = "idx_cached_address_cached_peer_id";
static IDX_CACHED_ADDRESS_CREATED_AT: &str = "idx_cached_address_created_at";
static IDX_CACHED_ADDRESS_LAST_USED_AT: &str = "idx_cached_address_last_used_at";
static IDX_CACHED_ADDRESS_UPDATED_AT: &str = "idx_cached_address_updated_at";
static FK_CACHED_ADDRESS_CACHED_PEER: &str = "fk_cached_address_cached_peer";
#[async_trait::async_trait]
@ -98,8 +107,8 @@ impl TableMigration for CachedAddress {
.on_update(ForeignKeyAction::Cascade)
)
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::LastUsedAt))
.col(text_uniq(Self::Address))
.col(timestamp(Self::UpdatedAt))
.col(text_uniq(Self::Multiaddress))
.to_owned()
).await?;
manager.create_index(
@ -111,9 +120,9 @@ impl TableMigration for CachedAddress {
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_ADDRESS)
.name(IDX_CACHED_ADDRESS_MULTIADDRESS)
.table(Self::Table)
.col(Self::Address)
.col(Self::Multiaddress)
.to_owned()
).await?;
manager.create_index(
@ -125,9 +134,9 @@ impl TableMigration for CachedAddress {
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_LAST_USED_AT)
.name(IDX_CACHED_ADDRESS_UPDATED_AT)
.table(Self::Table)
.col(Self::LastUsedAt)
.col(Self::UpdatedAt)
.to_owned()
).await?;

0
core/src/client.rs Normal file
View file

View file

@ -1,4 +0,0 @@
pub trait Message {
fn into_vec_u8(self) -> Vec<u8>;
fn from_vec_u8() -> Self;
}

View file

@ -3,10 +3,10 @@ pub mod config;
pub mod data;
pub mod error;
pub mod global;
pub mod ipc;
pub mod message;
pub mod migration;
pub mod p2p;
pub mod rpc;
#[cfg(any(test, feature="test"))]
pub mod tests;
pub mod utils;

View file

@ -76,7 +76,7 @@ async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr)
if x.id == y.cached_peer_id {
event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr);
let mut addr: CachedAddressActiveModel = y.into();
addr.last_used_at = Set(Local::now().to_utc());
addr.updated_at = Set(Local::now().to_utc());
let updated = addr.update(CACHE_DATABASE_CONNECTION.get()).await?;
Ok((x, updated))
} else {

2
core/src/rpc/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod proto;
pub mod server;

View file

View file

@ -0,0 +1,9 @@
use crate::{cache::entity::CachedPeerModel, error::Error};
tonic::include_proto!("caretta");
pub trait CachedPeerInterface {
async fn list( req: CachedPeerListRequest) -> Result<CachedPeerListResponse, Error>;
}

View file

@ -0,0 +1,30 @@
use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{CACHE_DATABASE_CONNECTION, DATA_DATABASE_CONNECTION}, rpc::proto::CachedAddressMessage};
use tonic::{Request, Response, Status};
use crate::rpc::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse, CachedPeerMessage};
use sea_orm::prelude::*;
#[derive(Debug, Default)]
pub struct CachedPeerServer {}
#[tonic::async_trait]
impl CachedPeerService for CachedPeerServer {
async fn list(&self, request: Request<CachedPeerListRequest>) -> Result<Response<CachedPeerListResponse>, Status> {
println!("Got a request: {:?}", request);
todo!();
let reply = CachedPeerListResponse {
peers: CachedPeerEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.into_iter().map(|x| {
let addresses = CachedAddressEntity::find().all(CACHE_DATABASE_CONNECTION.get()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?
.map(|x| {
CachedAddressMessage::from(x)
});
CachedPeerMessage::from(x)
}).collect(),
};
Ok(Response::new(reply))
}
}

View file

@ -0,0 +1 @@
pub mod cached_peer;

View file

@ -1,4 +1,13 @@
use prost_types::Timestamp;
use chrono::{DateTime, Timelike, Utc};
pub mod async_convert;
pub mod emptiable;
pub mod mergeable;
pub mod runnable;
pub fn utc_to_timestamp(utc: DateTime<Utc>) -> Timestamp {
Timestamp{
seconds: utc.timestamp(),
nanos: i32::try_from(utc.nanosecond()).unwrap(),
}
}

View file

@ -16,13 +16,12 @@ clap.workspace = true
dirs = "6.0.0"
caretta-core = { workspace = true, features = ["desktop"] }
libp2p.workspace = true
prost = "0.14.1"
sea-orm.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tonic = "0.14.0"
tonic-prost = "0.14.0"
tonic.workspace = true
uuid.workspace = true
[dev-dependencies]

View file

@ -1,13 +0,0 @@
pub mod server;
pub mod proto {
use caretta_core::cache::entity::CachedPeerModel;
tonic::include_proto!("caretta");
impl From<CachedPeerModel> for CachedPeerMessage {
fn from(s: CachedPeerModel) -> Self {
todo!()
}
}
}

View file

@ -1,20 +0,0 @@
use caretta_core::{cache::entity::CachedPeerEntity, global::DATA_DATABASE_CONNECTION};
use tonic::{Request, Response, Status};
use crate::rpc::proto::{cached_peer_service_server::{CachedPeerService, CachedPeerServiceServer}, CachedPeerListRequest, CachedPeerListResponse};
use sea_orm::prelude::*;
#[derive(Debug, Default)]
pub struct CachedPeerServer {}
#[tonic::async_trait]
impl CachedPeerService for CachedPeerServer {
async fn list(&self, request: Request<CachedPeerListRequest>) -> Result<Response<CachedPeerListResponse>, Status> {
println!("Got a request: {:?}", request);
let reply = CachedPeerListResponse {
peers: CachedPeerEntity::find().all(DATA_DATABASE_CONNECTION.get()).await?
};
}
}

View file

@ -1 +0,0 @@
pub mod cached_peer;