diff --git a/core/build.rs b/core/build.rs index e2bad9d..c8a4f87 100644 --- a/core/build.rs +++ b/core/build.rs @@ -1,6 +1,4 @@ fn main() -> Result<(), Box> { tonic_prost_build::compile_protos("proto/caretta_sync.proto")?; - tonic_prost_build::compile_protos("proto/iroh.proto")?; - Ok(()) } \ No newline at end of file diff --git a/iroh-proto/Cargo.toml b/iroh-proto/Cargo.toml index 7a7d6a9..f263745 100644 --- a/iroh-proto/Cargo.toml +++ b/iroh-proto/Cargo.toml @@ -17,5 +17,10 @@ tonic.workspace = true tonic-prost.workspace = true thiserror.workspace = true +[dev-dependencies] +clap.workspace = true +url.workspace = true +tokio.workspace = true + [build-dependencies] tonic-prost-build.workspace = true diff --git a/iroh-proto/examples/client.rs b/iroh-proto/examples/client.rs new file mode 100644 index 0000000..99cf225 --- /dev/null +++ b/iroh-proto/examples/client.rs @@ -0,0 +1,21 @@ +use clap::Parser; +use iroh_proto::proto::{iroh_client::IrohClient, RemoteInfoIterRequest}; +use tokio_stream::StreamExt; +use url::Url; + +#[derive(Parser)] +struct Cli { + tonic_endpoint: Url +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let mut client = IrohClient::connect(cli.tonic_endpoint.to_string()).await.unwrap(); + + println!("Streaming remote info"); + let mut stream = client.remote_info_iter(RemoteInfoIterRequest{}).await.unwrap().into_inner(); + while let Some(item) = stream.next().await { + println!("Received: {:?}", item.unwrap()) + } +} \ No newline at end of file diff --git a/iroh-proto/examples/server.rs b/iroh-proto/examples/server.rs new file mode 100644 index 0000000..51ae527 --- /dev/null +++ b/iroh-proto/examples/server.rs @@ -0,0 +1,63 @@ +#[cfg(unix)] +use std::net::SocketAddr; +use std::{net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc}; + +use clap::Parser; +use tokio::time::error; +use tonic::transport::Server; +use url::{form_urlencoded::Parse, Host, Url}; + +#[derive(Parser)] +struct Cli { + tonic_endpoint: Url +} + +enum ParsedUrl { + #[cfg(unix)] + Tcp(SocketAddr), + Unix(PathBuf) +} + +impl From for ParsedUrl { + fn from(url: Url) -> Self { + match url.scheme() { + #[cfg(unix)] + "unix" | "file" => { + ParsedUrl::Unix(url.to_file_path().expect("Invalid path url")) + }, + "http" | "tcp" => { + ParsedUrl::Tcp(url.socket_addrs(|| None).expect("Invalid address and port").pop().expect("Target domain is not found")) + }, + _ => panic!("Invalid url scheme") + } + } +} + +#[tokio::main] +async fn main() { + let args = Cli::parse(); + let endpoint = iroh::Endpoint::builder().discovery_n0().discovery_dht().discovery_local_network().bind().await.unwrap(); + let server = Server::builder() + .add_service(iroh_proto::proto::iroh_server::IrohServer::new(iroh_proto::server::IrohServer::from(endpoint))); + let _ = match ParsedUrl::from(args.tonic_endpoint) { + #[cfg(unix)] + ParsedUrl::Unix(x) => { + use tokio::net::UnixListener; + use tokio_stream::wrappers::UnixListenerStream; + if x.exists() { + if x.is_file() { + println!("Socket file already exists. Removing."); + std::fs::remove_file(&x).expect("Failed to remove target file already exists"); + } else if x.is_dir() { + panic!("Directory already exists"); + } + } + let uds = UnixListener::bind(x).expect("Failed to bind the path"); + let uds_stream = UnixListenerStream::new(uds); + server.serve_with_incoming(uds_stream).await + }, + ParsedUrl::Tcp(x) => { + server.serve(x).await + } + }; +} \ No newline at end of file diff --git a/iroh-proto/src/server.rs b/iroh-proto/src/server.rs index 305ba50..cc81a26 100644 --- a/iroh-proto/src/server.rs +++ b/iroh-proto/src/server.rs @@ -1,40 +1,44 @@ use std::pin::Pin; -use iroh::{Endpoint, NodeId}; +use iroh::{endpoint, Endpoint, NodeId}; use tonic::{Response, Request, Status}; use tokio_stream::Stream; use crate::proto::{RemoteInfoIterRequest, RemoteInfoMessage, RemoteInfoRequest, RemoteInfoResponse}; -#[tonic::async_trait] -pub trait AsEndpoint: Send + Sync + 'static { - async fn as_endpoint(&self) -> &Endpoint; -} -pub struct IrohServer +pub struct IrohServer where - T: AsEndpoint { - endpoint: T + endpoint: Endpoint +} + +impl From for IrohServer { + fn from(endpoint: Endpoint) -> Self { + Self{endpoint: endpoint} + } } #[tonic::async_trait] -impl crate::proto::iroh_server::Iroh for IrohServer -where - T: AsEndpoint -{ +impl crate::proto::iroh_server::Iroh for IrohServer { type RemoteInfoIterStream = Pin> + Send>>; async fn remote_info(&self, request: Request) -> Result, Status> { let node_id = NodeId::try_from(request.into_inner()).or_else(|e| { Err(Status::from_error(Box::new(e))) })?; - let remote_info: Option = self.endpoint.as_endpoint().await.remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| { + let remote_info: Option = self.endpoint.remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| { Err(Status::from_error(Box::new(e))) })?; Ok(Response::new(RemoteInfoResponse::from(remote_info))) } - async fn remote_info_iter(&self, _: Request) -> Result, Status> { - let iter = self.endpoint.as_endpoint().await.remote_info_iter(); + async fn remote_info_iter(&self, _: Request) + -> Result, Status> { + let iter = self.endpoint.remote_info_iter() + .map(|x| { + RemoteInfoMessage::try_from(x).map(|x| RemoteInfoResponse::from(x)).or_else(|e| { + Err(Status::from_error(Box::new(e))) + }) + }); let stream = futures::stream::iter(iter); - todo!() + Ok(Response::new(Box::pin(stream))) } } \ No newline at end of file