Update protobuf

This commit is contained in:
fluo10 2025-10-01 07:19:35 +09:00
parent de211d2b71
commit 87d78e7605
11 changed files with 163 additions and 81 deletions

View file

@ -9,11 +9,11 @@ import "google/protobuf/empty.proto";
service RemoteNode { service RemoteNode {
rpc Info(RemoteNodeInfoRequest) returns (RemoteNodeInfoResponse); rpc Info(RemoteNodeInfoRequest) returns (RemoteNodeInfoResponse);
rpc InfoStream(stream RemoteNodeInfoStreamRequest) returns (stream RemoteNodeInfoStreamResponse); rpc List(stream RemoteNodeListRequest) returns (stream RemoteNodeListResponse);
} }
message RemoteNodeIdentifier { message RemoteNodeIdentifier {
oneof remote_node_identifier { oneof identifier {
tripod_id.Double id = 1; tripod_id.Double id = 1;
caretta_sync.common.PublicKey public_key = 2; caretta_sync.common.PublicKey public_key = 2;
} }
@ -23,13 +23,13 @@ message RemoteNodeInfoRequest {
RemoteNodeIdentifier remote_node = 1; RemoteNodeIdentifier remote_node = 1;
} }
message RemoteNodeInfoStreamRequest {} message RemoteNodeListRequest {}
message RemoteNodeInfoResponse { message RemoteNodeInfoResponse {
RemoteNodeInfo remote_node_info = 1; RemoteNodeInfo remote_node_info = 1;
} }
message RemoteNodeInfoStreamResponse { message RemoteNodeListResponse {
RemoteNodeInfo remote_node_info = 1; RemoteNodeInfo remote_node_info = 1;
} }
@ -39,11 +39,25 @@ message RemoteNodeInfo {
caretta_sync.common.PublicKey public_key = 3; caretta_sync.common.PublicKey public_key = 3;
string relay_url = 4; string relay_url = 4;
repeated RemoteNodeDirectAddrInfo addrs = 5; repeated RemoteNodeDirectAddrInfo addrs = 5;
string conn_type = 6; RemoteNodeConnectionType conn_type = 6;
google.protobuf.Duration latency = 7; google.protobuf.Duration latency = 7;
google.protobuf.Duration last_used = 8; google.protobuf.Duration last_used = 8;
} }
message RemoteNodeConnectionType {
oneof conn_type {
caretta_sync.common.SocketAddr direct = 1;
caretta_sync.common.Url relay = 2;
RemoteNodeConnectionTypeMixed mixed = 3;
google.protobuf.Empty none = 4;
}
}
message RemoteNodeConnectionTypeMixed {
caretta_sync.common.SocketAddr socket_addr = 1;
caretta_sync.common.Url relay_url = 2;
}
message RemoteNodeRelayUrlInfo { message RemoteNodeRelayUrlInfo {
caretta_sync.common.Url relay_url = 1; caretta_sync.common.Url relay_url = 1;
google.protobuf.Duration last_alive = 2; google.protobuf.Duration last_alive = 2;
@ -51,7 +65,7 @@ message RemoteNodeRelayUrlInfo {
} }
message RemoteNodeDirectAddrInfo { message RemoteNodeDirectAddrInfo {
string addr = 1; caretta_sync.common.SocketAddr addr = 1;
google.protobuf.Duration latency = 2; google.protobuf.Duration latency = 2;
RemoteNodeLastControl last_control = 3; RemoteNodeLastControl last_control = 3;
google.protobuf.Duration last_payload = 4; google.protobuf.Duration last_payload = 4;
@ -61,10 +75,23 @@ message RemoteNodeDirectAddrInfo {
message RemoteNodeLastControl { message RemoteNodeLastControl {
google.protobuf.Duration duration = 1; google.protobuf.Duration duration = 1;
string control_msg = 2; RemoteNodeControlMsg control_msg = 2;
}
enum RemoteNodeControlMsg {
PING = 0;
PONG = 1;
CALL_ME_MAYBE = 2;
} }
message RemoteNodeSource { message RemoteNodeSource {
string source = 1; oneof source = {
google.protobuf.Duration duration = 2; google.protobuf.Empty saved = 1;
google.protobuf.Empty udp = 2;
google.protobuf.Empty Relay = 3;
google.protobuf.Empty App = 4;
string discovery = 5;
string named_app = 6
};
google.protobuf.Duration duration = 7;
} }

View file

@ -1,5 +1,6 @@
// mod authorization_request; // mod authorization_request;
mod remote_node; mod remote_node;
pub use remote_node::RemoteNodeRecord;
pub mod migration; pub mod migration;
use std::{cell::OnceCell, convert::Infallible, iter::Map, path::Path, sync::{LazyLock, OnceLock}}; use std::{cell::OnceCell, convert::Infallible, iter::Map, path::Path, sync::{LazyLock, OnceLock}};
@ -10,7 +11,7 @@ use rusqlite::{ffi::Error, params, types::FromSql, Connection, MappedRows, Optio
use crate::{config::StorageConfig, global::{CONFIG, LOCAL_DATABASE_CONNECTION}}; use crate::{config::StorageConfig, global::{CONFIG, LOCAL_DATABASE_CONNECTION}};
// pub use authorization_request::*; // pub use authorization_request::*;
type LocalRecordError = rusqlite::Error; pub type LocalRecordError = rusqlite::Error;
/// a struct of id for local database record. /// a struct of id for local database record.
@ -48,7 +49,7 @@ pub trait SelectableLocalRecord: LocalRecord<RowValues: TryInto<Self>> {
fn get_one_where<P>(where_statement: &str, params: P) -> Result<Option<Self>, rusqlite::Error> fn get_one_where<P>(where_statement: &str, params: P) -> Result<Self, rusqlite::Error>
where P: Params where P: Params
{ {
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked(); let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
@ -56,21 +57,21 @@ pub trait SelectableLocalRecord: LocalRecord<RowValues: TryInto<Self>> {
&(String::new() + &Self::SELECT_STATEMENT + " " + where_statement), &(String::new() + &Self::SELECT_STATEMENT + " " + where_statement),
params, params,
Self::from_row Self::from_row
).optional()?) )?)
} }
fn get_one_by_field<T>(field_name: &str, field_value: T) -> Result<Option<Self>, rusqlite::Error> fn get_one_by_field<T>(field_name: &str, field_value: T) -> Result<Self, rusqlite::Error>
where where
T: ToSql T: ToSql
{ {
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked(); let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
Ok(Some(connection.query_row( Ok(connection.query_row(
&("SELECT ".to_string() + &Self::COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME + " WHERE " + field_name + "= ?1"), &([&Self::SELECT_STATEMENT, "FROM", Self::TABLE_NAME, "WHERE", field_name,"= ?1"].join(" ")),
params![field_value], params![field_value],
Self::from_row Self::from_row
)?)) )?)
} }
fn get_one_by_id(id: u32) -> Result<Option<Self>, rusqlite::Error> { fn get_one_by_id(id: u32) -> Result<Self, rusqlite::Error> {
Self::get_one_by_field("id", id ) Self::get_one_by_field("id", id )
} }
fn from_row(row: &Row<'_>) -> Result<Self, rusqlite::Error>; fn from_row(row: &Row<'_>) -> Result<Self, rusqlite::Error>;

View file

@ -33,23 +33,24 @@ pub struct RemoteNodeRecord<T> {
impl RemoteNodeRecord<LocalRecordId> { impl RemoteNodeRecord<LocalRecordId> {
pub fn get_or_insert_by_public_key(public_key: &PublicKey) -> Result<Self, rusqlite::Error> { pub fn get_or_insert_by_public_key(public_key: &PublicKey) -> Result<Self, rusqlite::Error> {
match Self::get_by_public_key(public_key)? { match Self::get_by_public_key(public_key) {
Some(x) => Ok(x), Ok(x) => Ok(x),
None => { Err(rusqlite::Error::QueryReturnedNoRows) => {
let new = RemoteNodeRecord{ let new = RemoteNodeRecord{
id: NoLocalRecordId{}, id: NoLocalRecordId{},
public_id: rand::random(), public_id: rand::random(),
public_key: public_key.clone() public_key: public_key.clone()
}; };
Ok(new.insert()?) Ok(new.insert()?)
} },
Err(e) => Err(e)
} }
} }
pub fn get_by_public_id(public_id: &Double) -> Result<Option<Self>, rusqlite::Error> { pub fn get_by_public_id(public_id: &Double) -> Result<Self, rusqlite::Error> {
Self::get_one_where("WHERE public_id = ?1", (public_id,)) Self::get_one_where("WHERE public_id = ?1", (public_id,))
} }
pub fn get_by_public_key(public_key: &PublicKey) -> Result<Option<Self>, rusqlite::Error> { pub fn get_by_public_key(public_key: &PublicKey) -> Result<Self, rusqlite::Error> {
Self::get_one_where("WHERE public_Key = ?1", (public_key.as_bytes(),)) Self::get_one_where("WHERE public_Key = ?1", (public_key.as_bytes(),))
} }
} }
@ -113,7 +114,7 @@ mod tests {
let key = SecretKey::generate(&mut rand::rngs::OsRng); let key = SecretKey::generate(&mut rand::rngs::OsRng);
let pubkey = key.public(); let pubkey = key.public();
let record = RemoteNodeRecord::get_or_insert_by_public_key(&pubkey).unwrap(); let record = RemoteNodeRecord::get_or_insert_by_public_key(&pubkey).unwrap();
assert_eq!(record, RemoteNodeRecord::get_by_public_id(&record.public_id).unwrap().unwrap()); assert_eq!(record, RemoteNodeRecord::get_by_public_id(&record.public_id).unwrap());
assert_eq!(record, RemoteNodeRecord::get_by_public_key(&record.public_key).unwrap().unwrap()); assert_eq!(record, RemoteNodeRecord::get_by_public_key(&record.public_key).unwrap());
} }
} }

View file

@ -1,4 +1,7 @@
use std::{array::TryFromSliceError, ffi::OsString}; use std::{array::TryFromSliceError, ffi::OsString};
use tonic::Status;
use crate::proto::ProtoDeserializeError;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum Error { pub enum Error {
@ -31,10 +34,30 @@ pub enum Error {
TomlDe(#[from] toml::de::Error), TomlDe(#[from] toml::de::Error),
#[error("toml serialization error: {0}")] #[error("toml serialization error: {0}")]
TomlSer(#[from] toml::ser::Error), TomlSer(#[from] toml::ser::Error),
#[error("protobuf serialization error: {0}")]
ProtoSerialize(#[from] crate::proto::ProtoSerializeError),
#[error("protobuf deserialization error: {0}")]
ProtoDeserialize(#[from] crate::proto::ProtoDeserializeError),
#[error("Local record error: {0}")]
LocalRecord(#[from] crate::data::local::LocalRecordError),
#[error("Tripod id error: {0}")]
TripodId(#[from] tripod_id::Error),
} }
impl From<std::ffi::OsString> for Error { impl From<std::ffi::OsString> for Error {
fn from(s: OsString) -> Error { fn from(s: OsString) -> Error {
Self::OsStringConvert(s) Self::OsStringConvert(s)
} }
}
impl From<Error> for Status {
fn from(value: Error) -> Self {
match value {
Error::ProtoDeserialize(x) => { match x {
ProtoDeserializeError::MissingField(x) => Self::invalid_argument(format!("{} is required", x)),
_ => Status::unimplemented("Unimplemented protobuf deserialize error status")
}},
_ => Status::unimplemented("Unimplemented error status")
}
}
} }

View file

@ -1,4 +1,4 @@
pub use crate::proto::generated::common; pub use crate::proto::generated::common::{self, PublicKey as PublicKeyMessage, Uuid as UuidMessage, Url as UrlMessage};
use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}}; use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}};

View file

@ -1,17 +0,0 @@
use iroh::NodeId;
use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}, NodeIdMessage};
impl From<NodeId> for NodeIdMessage {
fn from(value: NodeId) -> Self {
NodeIdMessage { node_id: Vec::from(value.as_bytes()) }
}
}
impl TryFrom<NodeIdMessage> for NodeId {
type Error = ProtoDeserializeError;
fn try_from(value: NodeIdMessage) -> Result<Self, Self::Error> {
let slice: [u8; 32] = value.node_id[0..32].try_into()?;
Ok(NodeId::from_bytes(&slice)?)
}
}

View file

@ -1,7 +0,0 @@
use crate::proto::RemoteInfoIterRequest;
impl RemoteInfoIterRequest {
pub fn new() -> Self {
Self{}
}
}

View file

@ -1,11 +0,0 @@
use iroh::NodeId;
use crate::proto::{error::ProtoDeserializeError, NodeIdMessage, RemoteInfoRequest};
impl From<NodeIdMessage> for RemoteInfoRequest {
fn from(value: NodeIdMessage) -> Self {
Self {
node_id : Some(value)
}
}
}

View file

@ -1,16 +0,0 @@
use crate::{ proto::{RemoteInfoMessage, RemoteInfoResponse}};
impl From<RemoteInfoMessage> for RemoteInfoResponse {
fn from(value: RemoteInfoMessage) -> Self {
Self {
remote_info: Some(value)
}
}
}
impl From<Option<RemoteInfoMessage>> for RemoteInfoResponse {
fn from(value: Option<RemoteInfoMessage>) -> Self {
Self{
remote_info: value,
}
}
}

View file

@ -3,5 +3,10 @@ mod authorized_node;
mod remote_node; mod remote_node;
mod common; mod common;
mod error; mod error;
//mod server; //mod server;
mod generated; mod generated;
pub use common::*;
pub use error::*;
pub use remote_node::*;

View file

@ -1,10 +1,15 @@
use std::time::Duration; use std::{pin::Pin, time::Duration};
use crate::{error::Error, proto::{error::ProtoSerializeError, generated::remote_node}}; use futures::{future::Remote, Stream};
use iroh::{endpoint::{DirectAddrInfo, RemoteInfo}, PublicKey};
use tonic::{Request, Response, Status, Streaming};
use tripod_id::Double;
use crate::{data::local::{LocalRecordId, RemoteNodeRecord}, global::IROH_ENDPOINT, error::Error, proto::{error::{ProtoDeserializeError, ProtoSerializeError}, generated::remote_node::*}};
impl TryFrom<(iroh::endpoint::Source, Duration)> for remote_node::RemoteNodeSource { impl TryFrom<(iroh::endpoint::Source, Duration)> for RemoteNodeSource {
type Error = ProtoSerializeError; type Error = ProtoSerializeError;
fn try_from(src: (iroh::endpoint::Source, Duration)) -> Result<Self, Self::Error> { fn try_from(src: (iroh::endpoint::Source, Duration)) -> Result<Self, Self::Error> {
let (source, duration )= src; let (source, duration )= src;
@ -13,4 +18,75 @@ impl TryFrom<(iroh::endpoint::Source, Duration)> for remote_node::RemoteNodeSour
duration: Some(duration.try_into()?), duration: Some(duration.try_into()?),
}) })
} }
} }
impl TryFrom<RemoteNodeIdentifier> for RemoteNodeRecord<LocalRecordId> {
type Error = Error;
fn try_from(value: RemoteNodeIdentifier) -> Result<Self, Self::Error> {
Ok(match value.identifier.ok_or(ProtoDeserializeError::MissingField("RemoteNodeIdentifier.identifier"))? {
remote_node_identifier::Identifier::PublicKey(x) => Self::get_or_insert_by_public_key(&x.try_into()?)?,
remote_node_identifier::Identifier::Id(x) => Self::get_by_public_id(&x.try_into()?)?
})
}
}
impl TryFrom<(tripod_id::Double, RemoteInfo)> for RemoteNodeInfo {
type Error = ProtoSerializeError;
fn try_from(value: (tripod_id::Double, RemoteInfo)) -> Result<Self, Self::Error> {
Ok(Self {
id: Some(value.0.into()),
public_key: Some(value.1.node_id.into()),
relay_url: value.1.relay_url.map_or(String::from(""), |x| x.relay_url.to_string()),
addrs: value.1.addrs.into_iter()
.map(|x| DirectAddrInfoMessage::try_from(x))
.collect::<Result<Vec<DirectAddrInfoMessage>,Self::Error>>()?,
conn_type: value.1.conn_type.to_string(),
latency: value.1.latency.map(|x| x.try_into()).transpose()?,
last_used: value.1.last_used.map(|x| x.try_into()).transpose()?,
})
}
}
impl TryFrom<DirectAddrInfo> for RemoteNodeDirectAddrInfo {
type Error = ProtoSerializeError;
fn try_from(value: DirectAddrInfo) -> Result<Self, Self::Error> {
Ok(RemoteNodeDirectAddrInfo {
addr: value.addr.to_string(),
latency: value.latency.map(|x| x.try_into()).transpose()?,
last_control: value.last_control.map(|x| LastControlMessage::try_from(x)).transpose()?,
last_payload: value.last_payload.map(|x| x.try_into()).transpose()?,
last_alive: value.last_alive.map(|x| x.try_into()).transpose()?,
sources: value.sources.into_iter().map(|x| SourceMessage::try_from(x)).collect::<Result<Vec<SourceMessage>, Self::Error>>()?
})
}
}
pub struct RemoteNodeServer{}
#[tonic::async_trait]
impl remote_node_server::RemoteNode for RemoteNodeServer {
type ListStream = Pin<Box<dyn Stream<Item = Result<RemoteNodeListResponse, Status>> + Send>>;
async fn info(&self, request: Request<RemoteNodeInfoRequest>) -> Result<Response<RemoteNodeInfoResponse>, Status> {
let request = request.into_inner();
let remote_node: RemoteNodeRecord<LocalRecordId> = request.remote_node.ok_or(Status::invalid_argument("remote_node is required."))?.try_into()?;
let remote_info: RemoteNodeInfo = IROH_ENDPOINT.get_unchecked().remote_info(remote_node.public_key).ok_or(Status::not_found(format!("node {:?} is not found", remote_node)))?.into();
Ok(Response::new(RemoteNodeInfoResponse{
remote_node_info: Some(remote_info)
}))
}
async fn list(&self, request: Request<Streaming<RemoteNodeListRequest>>)
-> Result<Response<Self::ListStream>, Status> {
let iter = IROH_ENDPOINT.get_unchecked().remote_info_iter()
.map(|x| {
RemoteNodeInfo::try_from(x).map(|x| RemoteNodeListResponse {
remote_node: x.into()
}).or_else(|e| {
Err(Status::from_error(Box::new(e)))
})
});
let stream = futures::stream::iter(iter);
Ok(Response::new(Box::pin(stream)))
}
}