From 87d78e7605cadc98712581c1e3fe7be1a13830c1 Mon Sep 17 00:00:00 2001 From: fluo10 Date: Wed, 1 Oct 2025 07:19:35 +0900 Subject: [PATCH] Update protobuf --- core/proto/caretta_sync.remote_node.proto | 45 ++++++++-- core/src/data/local/mod.rs | 17 ++-- core/src/data/local/remote_node.rs | 17 ++-- core/src/error.rs | 23 +++++ core/src/proto/common.rs | 2 +- core/src/proto/convert/node_id_message.rs | 17 ---- .../proto/convert/remote_info_iter_request.rs | 7 -- core/src/proto/convert/remote_info_request.rs | 11 --- .../src/proto/convert/remote_info_response.rs | 16 ---- core/src/proto/mod.rs | 5 ++ core/src/proto/remote_node.rs | 84 ++++++++++++++++++- 11 files changed, 163 insertions(+), 81 deletions(-) delete mode 100644 core/src/proto/convert/node_id_message.rs delete mode 100644 core/src/proto/convert/remote_info_iter_request.rs delete mode 100644 core/src/proto/convert/remote_info_request.rs delete mode 100644 core/src/proto/convert/remote_info_response.rs diff --git a/core/proto/caretta_sync.remote_node.proto b/core/proto/caretta_sync.remote_node.proto index 5d179dd..3e8c5fc 100644 --- a/core/proto/caretta_sync.remote_node.proto +++ b/core/proto/caretta_sync.remote_node.proto @@ -9,11 +9,11 @@ import "google/protobuf/empty.proto"; service RemoteNode { rpc Info(RemoteNodeInfoRequest) returns (RemoteNodeInfoResponse); - rpc InfoStream(stream RemoteNodeInfoStreamRequest) returns (stream RemoteNodeInfoStreamResponse); + rpc List(stream RemoteNodeListRequest) returns (stream RemoteNodeListResponse); } message RemoteNodeIdentifier { - oneof remote_node_identifier { + oneof identifier { tripod_id.Double id = 1; caretta_sync.common.PublicKey public_key = 2; } @@ -23,13 +23,13 @@ message RemoteNodeInfoRequest { RemoteNodeIdentifier remote_node = 1; } -message RemoteNodeInfoStreamRequest {} +message RemoteNodeListRequest {} message RemoteNodeInfoResponse { RemoteNodeInfo remote_node_info = 1; } -message RemoteNodeInfoStreamResponse { +message RemoteNodeListResponse { RemoteNodeInfo remote_node_info = 1; } @@ -39,11 +39,25 @@ message RemoteNodeInfo { caretta_sync.common.PublicKey public_key = 3; string relay_url = 4; repeated RemoteNodeDirectAddrInfo addrs = 5; - string conn_type = 6; + RemoteNodeConnectionType conn_type = 6; google.protobuf.Duration latency = 7; google.protobuf.Duration last_used = 8; } +message RemoteNodeConnectionType { + oneof conn_type { + caretta_sync.common.SocketAddr direct = 1; + caretta_sync.common.Url relay = 2; + RemoteNodeConnectionTypeMixed mixed = 3; + google.protobuf.Empty none = 4; + } +} + +message RemoteNodeConnectionTypeMixed { + caretta_sync.common.SocketAddr socket_addr = 1; + caretta_sync.common.Url relay_url = 2; +} + message RemoteNodeRelayUrlInfo { caretta_sync.common.Url relay_url = 1; google.protobuf.Duration last_alive = 2; @@ -51,7 +65,7 @@ message RemoteNodeRelayUrlInfo { } message RemoteNodeDirectAddrInfo { - string addr = 1; + caretta_sync.common.SocketAddr addr = 1; google.protobuf.Duration latency = 2; RemoteNodeLastControl last_control = 3; google.protobuf.Duration last_payload = 4; @@ -61,10 +75,23 @@ message RemoteNodeDirectAddrInfo { message RemoteNodeLastControl { google.protobuf.Duration duration = 1; - string control_msg = 2; + RemoteNodeControlMsg control_msg = 2; +} + +enum RemoteNodeControlMsg { + PING = 0; + PONG = 1; + CALL_ME_MAYBE = 2; } message RemoteNodeSource { - string source = 1; - google.protobuf.Duration duration = 2; + oneof source = { + google.protobuf.Empty saved = 1; + google.protobuf.Empty udp = 2; + google.protobuf.Empty Relay = 3; + google.protobuf.Empty App = 4; + string discovery = 5; + string named_app = 6 + }; + google.protobuf.Duration duration = 7; } diff --git a/core/src/data/local/mod.rs b/core/src/data/local/mod.rs index 00f907f..57d1fdf 100644 --- a/core/src/data/local/mod.rs +++ b/core/src/data/local/mod.rs @@ -1,5 +1,6 @@ // mod authorization_request; mod remote_node; +pub use remote_node::RemoteNodeRecord; pub mod migration; use std::{cell::OnceCell, convert::Infallible, iter::Map, path::Path, sync::{LazyLock, OnceLock}}; @@ -10,7 +11,7 @@ use rusqlite::{ffi::Error, params, types::FromSql, Connection, MappedRows, Optio use crate::{config::StorageConfig, global::{CONFIG, LOCAL_DATABASE_CONNECTION}}; // pub use authorization_request::*; -type LocalRecordError = rusqlite::Error; +pub type LocalRecordError = rusqlite::Error; /// a struct of id for local database record. @@ -48,7 +49,7 @@ pub trait SelectableLocalRecord: LocalRecord> { - fn get_one_where

(where_statement: &str, params: P) -> Result, rusqlite::Error> + fn get_one_where

(where_statement: &str, params: P) -> Result where P: Params { let connection = LOCAL_DATABASE_CONNECTION.get_unchecked(); @@ -56,21 +57,21 @@ pub trait SelectableLocalRecord: LocalRecord> { &(String::new() + &Self::SELECT_STATEMENT + " " + where_statement), params, Self::from_row - ).optional()?) + )?) } - fn get_one_by_field(field_name: &str, field_value: T) -> Result, rusqlite::Error> + fn get_one_by_field(field_name: &str, field_value: T) -> Result where T: ToSql { let connection = LOCAL_DATABASE_CONNECTION.get_unchecked(); - Ok(Some(connection.query_row( - &("SELECT ".to_string() + &Self::COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME + " WHERE " + field_name + "= ?1"), + Ok(connection.query_row( + &([&Self::SELECT_STATEMENT, "FROM", Self::TABLE_NAME, "WHERE", field_name,"= ?1"].join(" ")), params![field_value], Self::from_row - )?)) + )?) } - fn get_one_by_id(id: u32) -> Result, rusqlite::Error> { + fn get_one_by_id(id: u32) -> Result { Self::get_one_by_field("id", id ) } fn from_row(row: &Row<'_>) -> Result; diff --git a/core/src/data/local/remote_node.rs b/core/src/data/local/remote_node.rs index 06f494b..73d8951 100644 --- a/core/src/data/local/remote_node.rs +++ b/core/src/data/local/remote_node.rs @@ -33,23 +33,24 @@ pub struct RemoteNodeRecord { impl RemoteNodeRecord { pub fn get_or_insert_by_public_key(public_key: &PublicKey) -> Result { - match Self::get_by_public_key(public_key)? { - Some(x) => Ok(x), - None => { + match Self::get_by_public_key(public_key) { + Ok(x) => Ok(x), + Err(rusqlite::Error::QueryReturnedNoRows) => { let new = RemoteNodeRecord{ id: NoLocalRecordId{}, public_id: rand::random(), public_key: public_key.clone() }; Ok(new.insert()?) - } + }, + Err(e) => Err(e) } } - pub fn get_by_public_id(public_id: &Double) -> Result, rusqlite::Error> { + pub fn get_by_public_id(public_id: &Double) -> Result { Self::get_one_where("WHERE public_id = ?1", (public_id,)) } - pub fn get_by_public_key(public_key: &PublicKey) -> Result, rusqlite::Error> { + pub fn get_by_public_key(public_key: &PublicKey) -> Result { Self::get_one_where("WHERE public_Key = ?1", (public_key.as_bytes(),)) } } @@ -113,7 +114,7 @@ mod tests { let key = SecretKey::generate(&mut rand::rngs::OsRng); let pubkey = key.public(); let record = RemoteNodeRecord::get_or_insert_by_public_key(&pubkey).unwrap(); - assert_eq!(record, RemoteNodeRecord::get_by_public_id(&record.public_id).unwrap().unwrap()); - assert_eq!(record, RemoteNodeRecord::get_by_public_key(&record.public_key).unwrap().unwrap()); + assert_eq!(record, RemoteNodeRecord::get_by_public_id(&record.public_id).unwrap()); + assert_eq!(record, RemoteNodeRecord::get_by_public_key(&record.public_key).unwrap()); } } \ No newline at end of file diff --git a/core/src/error.rs b/core/src/error.rs index 27eaffe..1ff17e4 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -1,4 +1,7 @@ use std::{array::TryFromSliceError, ffi::OsString}; +use tonic::Status; + +use crate::proto::ProtoDeserializeError; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -31,10 +34,30 @@ pub enum Error { TomlDe(#[from] toml::de::Error), #[error("toml serialization error: {0}")] TomlSer(#[from] toml::ser::Error), + #[error("protobuf serialization error: {0}")] + ProtoSerialize(#[from] crate::proto::ProtoSerializeError), + #[error("protobuf deserialization error: {0}")] + ProtoDeserialize(#[from] crate::proto::ProtoDeserializeError), + #[error("Local record error: {0}")] + LocalRecord(#[from] crate::data::local::LocalRecordError), + #[error("Tripod id error: {0}")] + TripodId(#[from] tripod_id::Error), } impl From for Error { fn from(s: OsString) -> Error { Self::OsStringConvert(s) } +} + +impl From for Status { + fn from(value: Error) -> Self { + match value { + Error::ProtoDeserialize(x) => { match x { + ProtoDeserializeError::MissingField(x) => Self::invalid_argument(format!("{} is required", x)), + _ => Status::unimplemented("Unimplemented protobuf deserialize error status") + }}, + _ => Status::unimplemented("Unimplemented error status") + } + } } \ No newline at end of file diff --git a/core/src/proto/common.rs b/core/src/proto/common.rs index 2c8ef1d..66e4c3d 100644 --- a/core/src/proto/common.rs +++ b/core/src/proto/common.rs @@ -1,4 +1,4 @@ -pub use crate::proto::generated::common; +pub use crate::proto::generated::common::{self, PublicKey as PublicKeyMessage, Uuid as UuidMessage, Url as UrlMessage}; use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}}; diff --git a/core/src/proto/convert/node_id_message.rs b/core/src/proto/convert/node_id_message.rs deleted file mode 100644 index e6d7aea..0000000 --- a/core/src/proto/convert/node_id_message.rs +++ /dev/null @@ -1,17 +0,0 @@ -use iroh::NodeId; - -use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}, NodeIdMessage}; - -impl From for NodeIdMessage { - fn from(value: NodeId) -> Self { - NodeIdMessage { node_id: Vec::from(value.as_bytes()) } - } -} - -impl TryFrom for NodeId { - type Error = ProtoDeserializeError; - fn try_from(value: NodeIdMessage) -> Result { - let slice: [u8; 32] = value.node_id[0..32].try_into()?; - Ok(NodeId::from_bytes(&slice)?) - } -} \ No newline at end of file diff --git a/core/src/proto/convert/remote_info_iter_request.rs b/core/src/proto/convert/remote_info_iter_request.rs deleted file mode 100644 index 6ba7abb..0000000 --- a/core/src/proto/convert/remote_info_iter_request.rs +++ /dev/null @@ -1,7 +0,0 @@ -use crate::proto::RemoteInfoIterRequest; - -impl RemoteInfoIterRequest { - pub fn new() -> Self { - Self{} - } -} \ No newline at end of file diff --git a/core/src/proto/convert/remote_info_request.rs b/core/src/proto/convert/remote_info_request.rs deleted file mode 100644 index d485b14..0000000 --- a/core/src/proto/convert/remote_info_request.rs +++ /dev/null @@ -1,11 +0,0 @@ -use iroh::NodeId; - -use crate::proto::{error::ProtoDeserializeError, NodeIdMessage, RemoteInfoRequest}; - -impl From for RemoteInfoRequest { - fn from(value: NodeIdMessage) -> Self { - Self { - node_id : Some(value) - } - } -} \ No newline at end of file diff --git a/core/src/proto/convert/remote_info_response.rs b/core/src/proto/convert/remote_info_response.rs deleted file mode 100644 index 5e3a17f..0000000 --- a/core/src/proto/convert/remote_info_response.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::{ proto::{RemoteInfoMessage, RemoteInfoResponse}}; - -impl From for RemoteInfoResponse { - fn from(value: RemoteInfoMessage) -> Self { - Self { - remote_info: Some(value) - } - } -} -impl From> for RemoteInfoResponse { - fn from(value: Option) -> Self { - Self{ - remote_info: value, - } - } -} \ No newline at end of file diff --git a/core/src/proto/mod.rs b/core/src/proto/mod.rs index 98f851e..3b45615 100644 --- a/core/src/proto/mod.rs +++ b/core/src/proto/mod.rs @@ -3,5 +3,10 @@ mod authorized_node; mod remote_node; mod common; mod error; + //mod server; mod generated; + +pub use common::*; +pub use error::*; +pub use remote_node::*; \ No newline at end of file diff --git a/core/src/proto/remote_node.rs b/core/src/proto/remote_node.rs index ff6f117..1edbd2b 100644 --- a/core/src/proto/remote_node.rs +++ b/core/src/proto/remote_node.rs @@ -1,10 +1,15 @@ -use std::time::Duration; +use std::{pin::Pin, time::Duration}; -use crate::{error::Error, proto::{error::ProtoSerializeError, generated::remote_node}}; +use futures::{future::Remote, Stream}; +use iroh::{endpoint::{DirectAddrInfo, RemoteInfo}, PublicKey}; +use tonic::{Request, Response, Status, Streaming}; +use tripod_id::Double; + +use crate::{data::local::{LocalRecordId, RemoteNodeRecord}, global::IROH_ENDPOINT, error::Error, proto::{error::{ProtoDeserializeError, ProtoSerializeError}, generated::remote_node::*}}; -impl TryFrom<(iroh::endpoint::Source, Duration)> for remote_node::RemoteNodeSource { +impl TryFrom<(iroh::endpoint::Source, Duration)> for RemoteNodeSource { type Error = ProtoSerializeError; fn try_from(src: (iroh::endpoint::Source, Duration)) -> Result { let (source, duration )= src; @@ -13,4 +18,75 @@ impl TryFrom<(iroh::endpoint::Source, Duration)> for remote_node::RemoteNodeSour duration: Some(duration.try_into()?), }) } -} \ No newline at end of file +} + +impl TryFrom for RemoteNodeRecord { + type Error = Error; + fn try_from(value: RemoteNodeIdentifier) -> Result { + Ok(match value.identifier.ok_or(ProtoDeserializeError::MissingField("RemoteNodeIdentifier.identifier"))? { + remote_node_identifier::Identifier::PublicKey(x) => Self::get_or_insert_by_public_key(&x.try_into()?)?, + remote_node_identifier::Identifier::Id(x) => Self::get_by_public_id(&x.try_into()?)? + }) + } + +} + + +impl TryFrom<(tripod_id::Double, RemoteInfo)> for RemoteNodeInfo { + type Error = ProtoSerializeError; + fn try_from(value: (tripod_id::Double, RemoteInfo)) -> Result { + Ok(Self { + id: Some(value.0.into()), + public_key: Some(value.1.node_id.into()), + relay_url: value.1.relay_url.map_or(String::from(""), |x| x.relay_url.to_string()), + addrs: value.1.addrs.into_iter() + .map(|x| DirectAddrInfoMessage::try_from(x)) + .collect::,Self::Error>>()?, + conn_type: value.1.conn_type.to_string(), + latency: value.1.latency.map(|x| x.try_into()).transpose()?, + last_used: value.1.last_used.map(|x| x.try_into()).transpose()?, + }) + } +} + +impl TryFrom for RemoteNodeDirectAddrInfo { + type Error = ProtoSerializeError; + fn try_from(value: DirectAddrInfo) -> Result { + Ok(RemoteNodeDirectAddrInfo { + addr: value.addr.to_string(), + latency: value.latency.map(|x| x.try_into()).transpose()?, + last_control: value.last_control.map(|x| LastControlMessage::try_from(x)).transpose()?, + last_payload: value.last_payload.map(|x| x.try_into()).transpose()?, + last_alive: value.last_alive.map(|x| x.try_into()).transpose()?, + sources: value.sources.into_iter().map(|x| SourceMessage::try_from(x)).collect::, Self::Error>>()? + }) + } +} + +pub struct RemoteNodeServer{} + +#[tonic::async_trait] +impl remote_node_server::RemoteNode for RemoteNodeServer { + type ListStream = Pin> + Send>>; + async fn info(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + let remote_node: RemoteNodeRecord = request.remote_node.ok_or(Status::invalid_argument("remote_node is required."))?.try_into()?; + let remote_info: RemoteNodeInfo = IROH_ENDPOINT.get_unchecked().remote_info(remote_node.public_key).ok_or(Status::not_found(format!("node {:?} is not found", remote_node)))?.into(); + Ok(Response::new(RemoteNodeInfoResponse{ + remote_node_info: Some(remote_info) + })) + } + async fn list(&self, request: Request>) + -> Result, Status> { + let iter = IROH_ENDPOINT.get_unchecked().remote_info_iter() + .map(|x| { + RemoteNodeInfo::try_from(x).map(|x| RemoteNodeListResponse { + remote_node: x.into() + }).or_else(|e| { + Err(Status::from_error(Box::new(e))) + }) + }); + let stream = futures::stream::iter(iter); + Ok(Response::new(Box::pin(stream))) + } +} \ No newline at end of file