Merge iroh.proto to caretta-sync.proto

This commit is contained in:
fluo10 2025-09-06 09:42:14 +09:00
parent c6e678188f
commit b57734db33
28 changed files with 131 additions and 243 deletions

View file

@ -26,7 +26,7 @@ caretta-sync-macros = { path="macros", optional = true}
caretta-sync-core = {workspace = true, features = ["test"]}
[workspace]
members = [ ".", "core", "macros", "cli", "mobile", "examples/*" , "bevy", "iroh-proto"]
members = [ ".", "core", "macros", "cli", "mobile", "examples/*" , "bevy"]
resolver = "3"
[workspace.package]

View file

@ -37,6 +37,8 @@ uuid.workspace = true
url.workspace = true
whoami = "1.6.1"
rand = "0.8.5"
ed25519-dalek = { version = "2.2.0", features = ["signature"] }
tokio-stream.workspace = true
[target.'cfg(target_os="android")'.dependencies]
jni = "0.21.1"

View file

@ -1,33 +1,51 @@
syntax = "proto3";
package caretta_sync;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
enum PeerListOrderBy {
CREATED_AT = 0;
UPDATED_AT = 1;
PEER_ID = 2;
service CarettaSync {
rpc RemoteInfo(RemoteInfoRequest) returns (RemoteInfoResponse);
rpc RemoteInfoIter(RemoteInfoIterRequest) returns (stream RemoteInfoResponse);
}
service CachedPeerService {
rpc List(CachedPeerListRequest) returns (CachedPeerListResponse);
message NodeIdMessage {
bytes node_id = 1;
}
message CachedPeerListRequest {}
message CachedPeerMessage {
uint32 number = 1;
string peer_id = 2;
google.protobuf.Timestamp created_at = 3;
repeated CachedAddressMessage addresses = 4;
message RemoteInfoRequest {
NodeIdMessage node_id = 1;
}
message CachedAddressMessage {
uint32 number = 1;
google.protobuf.Timestamp created_at = 2;
google.protobuf.Timestamp updated_at = 3;
string multiaddress = 4;
message RemoteInfoIterRequest {}
message RemoteInfoResponse {
RemoteInfoMessage remote_info = 1;
}
message CachedPeerListResponse {
repeated CachedPeerMessage peers = 1;
message RemoteInfoMessage {
NodeIdMessage node_id = 1;
string relay_url = 2;
repeated DirectAddrInfoMessage addrs = 3;
string conn_type = 4;
google.protobuf.Duration latency = 5;
google.protobuf.Duration last_used = 6;
}
message DirectAddrInfoMessage {
string addr = 1;
google.protobuf.Duration latency = 2;
LastControlMessage last_control = 3;
google.protobuf.Duration last_payload = 4;
google.protobuf.Duration last_alive = 5;
repeated SourceMessage sources = 6;
}
message LastControlMessage {
google.protobuf.Duration duration = 1;
string control_msg = 2;
}
message SourceMessage {
string source = 1;
google.protobuf.Duration duration = 2;
}

View file

@ -12,7 +12,7 @@ use tracing_subscriber::EnvFilter;
use crate::{
config::PartialConfig,
error::Error, p2p, utils::{emptiable::Emptiable, mergeable::Mergeable}
error::Error, utils::{emptiable::Emptiable, mergeable::Mergeable}
};
#[derive(Clone, Debug)]

View file

@ -0,0 +1,27 @@
use std::sync::OnceLock;
use iroh::Endpoint;
pub static IROH_ENDPOINT: GlobalIrohEndpoint = GlobalIrohEndpoint::const_new();
pub struct GlobalIrohEndpoint {
inner: OnceLock<Endpoint>
}
impl GlobalIrohEndpoint {
const fn const_new() -> Self {
Self {
inner: OnceLock::new()
}
}
pub fn get_or_init(&self, endpoint: &Endpoint) -> Endpoint {
self.inner.get_or_init(|| endpoint.clone()).clone()
}
pub fn get(&self) -> Option<Endpoint> {
self.inner.get().map(|x| x.clone())
}
pub fn get_unchecked(&self) -> Endpoint {
self.get().expect("Global Iroh Endpoint must be initialized before use")
}
}

View file

@ -4,7 +4,9 @@ use crate::{config::{StorageConfig}, error::Error };
use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard};
mod config;
mod iroh_endpoint;
pub use config::*;
pub use iroh_endpoint::*;
use uuid::{ContextV7, Timestamp, Uuid};
pub fn generate_uuid() -> Uuid {

View file

@ -2,9 +2,7 @@ pub mod config;
pub mod data;
pub mod error;
pub mod global;
pub mod p2p;
pub mod proto;
pub mod rpc;
#[cfg(any(test, feature="test"))]
pub mod tests;
pub mod utils;

View file

@ -1,9 +1,9 @@
use iroh::endpoint::DirectAddrInfo;
use crate::{error::Error, proto::{DirectAddrInfoMessage, LastControlMessage, SourceMessage}};
use crate::proto::{error::ProtoSerializeError, DirectAddrInfoMessage, LastControlMessage, SourceMessage};
impl TryFrom<DirectAddrInfo> for DirectAddrInfoMessage {
type Error = Error;
type Error = ProtoSerializeError;
fn try_from(value: DirectAddrInfo) -> Result<Self, Self::Error> {
Ok(DirectAddrInfoMessage {
addr: value.addr.to_string(),
@ -11,7 +11,7 @@ impl TryFrom<DirectAddrInfo> for DirectAddrInfoMessage {
last_control: value.last_control.map(|x| LastControlMessage::try_from(x)).transpose()?,
last_payload: value.last_payload.map(|x| x.try_into()).transpose()?,
last_alive: value.last_alive.map(|x| x.try_into()).transpose()?,
sources: value.sources.into_iter().map(|x| SourceMessage::try_from(x)).collect::<Result<Vec<SourceMessage>, Error>>()?
sources: value.sources.into_iter().map(|x| SourceMessage::try_from(x)).collect::<Result<Vec<SourceMessage>, Self::Error>>()?
})
}
}

View file

@ -3,10 +3,10 @@ use std::time::Duration;
use iroh::endpoint::ControlMsg;
use prost_types::DurationError;
use crate::proto::LastControlMessage;
use crate::proto::{error::ProtoSerializeError, LastControlMessage};
impl TryFrom<(Duration, ControlMsg)> for LastControlMessage {
type Error = DurationError;
type Error = ProtoSerializeError;
fn try_from(value: (Duration, ControlMsg)) -> Result<Self, Self::Error> {
Ok(LastControlMessage {
duration: Some(value.0.try_into()?),

View file

@ -1,9 +1,8 @@
mod direct_addr_info_message;
mod last_control_message;
mod node_id_message;
mod remote_info_iter_request;
mod remote_info_message;
mod remote_info_request;
mod remote_info_response;
mod source_message;
tonic::include_proto!("iroh");
mod source_message;

View file

@ -0,0 +1,17 @@
use iroh::NodeId;
use crate::proto::{error::{ProtoDeserializeError, ProtoSerializeError}, NodeIdMessage};
impl From<NodeId> for NodeIdMessage {
fn from(value: NodeId) -> Self {
NodeIdMessage { node_id: Vec::from(value.as_bytes()) }
}
}
impl TryFrom<NodeIdMessage> for NodeId {
type Error = ProtoDeserializeError;
fn try_from(value: NodeIdMessage) -> Result<Self, Self::Error> {
let slice: [u8; 32] = value.node_id[0..32].try_into()?;
Ok(NodeId::from_bytes(&slice)?)
}
}

View file

@ -1,16 +1,16 @@
use iroh::endpoint::RemoteInfo;
use crate::{error::Error, proto::{DirectAddrInfoMessage, RemoteInfoMessage}};
use crate::{error::Error, proto::{error::ProtoSerializeError, DirectAddrInfoMessage, RemoteInfoMessage}};
impl TryFrom<RemoteInfo> for RemoteInfoMessage {
type Error = Error;
type Error = ProtoSerializeError;
fn try_from(value: RemoteInfo) -> Result<Self, Self::Error> {
Ok(Self {
node_id: Vec::from(value.node_id.as_bytes()),
node_id: Some(value.node_id.into()),
relay_url: value.relay_url.map_or(String::from(""), |x| x.relay_url.to_string()),
addrs: value.addrs.into_iter()
.map(|x| DirectAddrInfoMessage::try_from(x))
.collect::<Result<Vec<DirectAddrInfoMessage>,Error>>()?,
.collect::<Result<Vec<DirectAddrInfoMessage>,Self::Error>>()?,
conn_type: value.conn_type.to_string(),
latency: value.latency.map(|x| x.try_into()).transpose()?,
last_used: value.last_used.map(|x| x.try_into()).transpose()?,

View file

@ -0,0 +1,11 @@
use iroh::NodeId;
use crate::proto::{error::ProtoDeserializeError, NodeIdMessage, RemoteInfoRequest};
impl From<NodeIdMessage> for RemoteInfoRequest {
fn from(value: NodeIdMessage) -> Self {
Self {
node_id : Some(value)
}
}
}

View file

@ -2,10 +2,10 @@ use std::time::Duration;
use iroh::endpoint::Source;
use crate::{error::Error, proto::SourceMessage};
use crate::{error::Error, proto::{error::ProtoSerializeError, SourceMessage}};
impl TryFrom<(Source, Duration)> for SourceMessage {
type Error = Error;
type Error = ProtoSerializeError;
fn try_from(src: (Source, Duration)) -> Result<Self, Self::Error> {
let (source, duration )= src;
Ok(Self {

View file

@ -1,9 +1,15 @@
#[derive(thiserror::Error, Debug)]
pub enum Error {
pub enum ProtoSerializeError {
#[error("Duration parse error: {0}")]
Duration(#[from] prost_types::DurationError),
}
#[derive(thiserror::Error, Debug)]
pub enum ProtoDeserializeError {
#[error("Missing field: {0}")]
MissingField(&'static str),
#[error("Signature error: {0}")]
Signature(#[from] ed25519_dalek::SignatureError),
#[error("slice parse error: {0}")]
SliceTryFrom(#[from] std::array::TryFromSliceError)
}
SliceTryFrom(#[from] std::array::TryFromSliceError),
}

View file

@ -1 +1,5 @@
mod convert;
mod error;
mod server;
tonic::include_proto!("caretta_sync");

View file

@ -4,35 +4,25 @@ use iroh::{endpoint, Endpoint, NodeId};
use tonic::{Response, Request, Status};
use tokio_stream::Stream;
use crate::proto::{RemoteInfoIterRequest, RemoteInfoMessage, RemoteInfoRequest, RemoteInfoResponse};
use crate::{global::IROH_ENDPOINT, proto::{error::ProtoDeserializeError, RemoteInfoIterRequest, RemoteInfoMessage, RemoteInfoRequest, RemoteInfoResponse}};
pub struct IrohServer
where
{
endpoint: Endpoint
}
impl From<Endpoint> for IrohServer {
fn from(endpoint: Endpoint) -> Self {
Self{endpoint: endpoint}
}
}
pub struct CarettaSyncServer{}
#[tonic::async_trait]
impl crate::proto::iroh_server::Iroh for IrohServer {
impl crate::proto::caretta_sync_server::CarettaSync for CarettaSyncServer {
type RemoteInfoIterStream = Pin<Box<dyn Stream<Item = Result<RemoteInfoResponse, Status>> + Send>>;
async fn remote_info(&self, request: Request<RemoteInfoRequest>) -> Result<Response<RemoteInfoResponse>, Status> {
let node_id = NodeId::try_from(request.into_inner()).or_else(|e| {
let node_id = NodeId::try_from(request.into_inner().node_id.ok_or(Status::from_error(Box::new(ProtoDeserializeError::MissingField("node_id"))))?).or_else(|e| {
Err(Status::from_error(Box::new(e)))
})?;
let remote_info: Option<RemoteInfoMessage> = self.endpoint.remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| {
let remote_info: Option<RemoteInfoMessage> = IROH_ENDPOINT.get_unchecked().remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| {
Err(Status::from_error(Box::new(e)))
})?;
Ok(Response::new(RemoteInfoResponse::from(remote_info)))
}
async fn remote_info_iter(&self, _: Request<RemoteInfoIterRequest>)
-> Result<Response<Self::RemoteInfoIterStream>, Status> {
let iter = self.endpoint.remote_info_iter()
let iter = IROH_ENDPOINT.get_unchecked().remote_info_iter()
.map(|x| {
RemoteInfoMessage::try_from(x).map(|x| RemoteInfoResponse::from(x)).or_else(|e| {
Err(Status::from_error(Box::new(e)))

View file

@ -1,2 +0,0 @@
pub mod service;

View file

@ -1 +0,0 @@
pub mod iroh;

View file

@ -1,26 +0,0 @@
[package]
name = "iroh-proto"
edition.workspace = true
version = "0.1.0-alpha"
description = "protobuf definition and tonic server implemention for fetch iroh server infomation"
license.workspace = true
repository.workspace = true
[dependencies]
ed25519-dalek = { version = "2.2.0", features = ["signature"] }
futures.workspace = true
iroh.workspace = true
prost.workspace = true
prost-types.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
tonic-prost.workspace = true
thiserror.workspace = true
[dev-dependencies]
clap.workspace = true
url.workspace = true
tokio.workspace = true
[build-dependencies]
tonic-prost-build.workspace = true

View file

@ -1,4 +0,0 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_prost_build::compile_protos("proto/iroh.proto")?;
Ok(())
}

View file

@ -1,21 +0,0 @@
use clap::Parser;
use iroh_proto::proto::{iroh_client::IrohClient, RemoteInfoIterRequest};
use tokio_stream::StreamExt;
use url::Url;
#[derive(Parser)]
struct Cli {
tonic_endpoint: Url
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
let mut client = IrohClient::connect(cli.tonic_endpoint.to_string()).await.unwrap();
println!("Streaming remote info");
let mut stream = client.remote_info_iter(RemoteInfoIterRequest{}).await.unwrap().into_inner();
while let Some(item) = stream.next().await {
println!("Received: {:?}", item.unwrap())
}
}

View file

@ -1,63 +0,0 @@
#[cfg(unix)]
use std::net::SocketAddr;
use std::{net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc};
use clap::Parser;
use tokio::time::error;
use tonic::transport::Server;
use url::{form_urlencoded::Parse, Host, Url};
#[derive(Parser)]
struct Cli {
tonic_endpoint: Url
}
enum ParsedUrl {
#[cfg(unix)]
Tcp(SocketAddr),
Unix(PathBuf)
}
impl From<Url> for ParsedUrl {
fn from(url: Url) -> Self {
match url.scheme() {
#[cfg(unix)]
"unix" | "file" => {
ParsedUrl::Unix(url.to_file_path().expect("Invalid path url"))
},
"http" | "tcp" => {
ParsedUrl::Tcp(url.socket_addrs(|| None).expect("Invalid address and port").pop().expect("Target domain is not found"))
},
_ => panic!("Invalid url scheme")
}
}
}
#[tokio::main]
async fn main() {
let args = Cli::parse();
let endpoint = iroh::Endpoint::builder().discovery_n0().discovery_dht().discovery_local_network().bind().await.unwrap();
let server = Server::builder()
.add_service(iroh_proto::proto::iroh_server::IrohServer::new(iroh_proto::server::IrohServer::from(endpoint)));
let _ = match ParsedUrl::from(args.tonic_endpoint) {
#[cfg(unix)]
ParsedUrl::Unix(x) => {
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
if x.exists() {
if x.is_file() {
println!("Socket file already exists. Removing.");
std::fs::remove_file(&x).expect("Failed to remove target file already exists");
} else if x.is_dir() {
panic!("Directory already exists");
}
}
let uds = UnixListener::bind(x).expect("Failed to bind the path");
let uds_stream = UnixListenerStream::new(uds);
server.serve_with_incoming(uds_stream).await
},
ParsedUrl::Tcp(x) => {
server.serve(x).await
}
};
}

View file

@ -1,47 +0,0 @@
syntax = "proto3";
package iroh;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
service Iroh {
rpc RemoteInfo(RemoteInfoRequest) returns (RemoteInfoResponse);
rpc RemoteInfoIter(RemoteInfoIterRequest) returns (stream RemoteInfoResponse);
}
message RemoteInfoRequest {
bytes node_id = 1;
}
message RemoteInfoIterRequest {}
message RemoteInfoResponse {
RemoteInfoMessage remote_info = 1;
}
message RemoteInfoMessage {
bytes node_id = 1;
string relay_url = 2;
repeated DirectAddrInfoMessage addrs = 3;
string conn_type = 4;
google.protobuf.Duration latency = 5;
google.protobuf.Duration last_used = 6;
}
message DirectAddrInfoMessage {
string addr = 1;
google.protobuf.Duration latency = 2;
LastControlMessage last_control = 3;
google.protobuf.Duration last_payload = 4;
google.protobuf.Duration last_alive = 5;
repeated SourceMessage sources = 6;
}
message LastControlMessage {
google.protobuf.Duration duration = 1;
string control_msg = 2;
}
message SourceMessage {
string source = 1;
google.protobuf.Duration duration = 2;
}

View file

@ -1,3 +0,0 @@
pub mod error;
pub mod proto;
pub mod server;

View file

@ -1,19 +0,0 @@
use iroh::NodeId;
use crate::proto::RemoteInfoRequest;
impl From<NodeId> for RemoteInfoRequest {
fn from(value: NodeId) -> Self {
Self {
node_id : Vec::from(value.as_bytes())
}
}
}
impl TryFrom<RemoteInfoRequest> for NodeId {
type Error = crate::error::Error;
fn try_from(value: RemoteInfoRequest) -> Result<Self, Self::Error> {
let slice: [u8; 32] = value.node_id[0..32].try_into()?;
Ok(NodeId::from_bytes(&slice)?)
}
}