Add examples for iroh-proto
This commit is contained in:
parent
d30188e7d9
commit
c6e678188f
5 changed files with 109 additions and 18 deletions
|
|
@ -1,6 +1,4 @@
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
tonic_prost_build::compile_protos("proto/caretta_sync.proto")?;
|
tonic_prost_build::compile_protos("proto/caretta_sync.proto")?;
|
||||||
tonic_prost_build::compile_protos("proto/iroh.proto")?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -17,5 +17,10 @@ tonic.workspace = true
|
||||||
tonic-prost.workspace = true
|
tonic-prost.workspace = true
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
clap.workspace = true
|
||||||
|
url.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-prost-build.workspace = true
|
tonic-prost-build.workspace = true
|
||||||
|
|
|
||||||
21
iroh-proto/examples/client.rs
Normal file
21
iroh-proto/examples/client.rs
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
use clap::Parser;
|
||||||
|
use iroh_proto::proto::{iroh_client::IrohClient, RemoteInfoIterRequest};
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
struct Cli {
|
||||||
|
tonic_endpoint: Url
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let cli = Cli::parse();
|
||||||
|
let mut client = IrohClient::connect(cli.tonic_endpoint.to_string()).await.unwrap();
|
||||||
|
|
||||||
|
println!("Streaming remote info");
|
||||||
|
let mut stream = client.remote_info_iter(RemoteInfoIterRequest{}).await.unwrap().into_inner();
|
||||||
|
while let Some(item) = stream.next().await {
|
||||||
|
println!("Received: {:?}", item.unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
63
iroh-proto/examples/server.rs
Normal file
63
iroh-proto/examples/server.rs
Normal file
|
|
@ -0,0 +1,63 @@
|
||||||
|
#[cfg(unix)]
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::{net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use tokio::time::error;
|
||||||
|
use tonic::transport::Server;
|
||||||
|
use url::{form_urlencoded::Parse, Host, Url};
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
struct Cli {
|
||||||
|
tonic_endpoint: Url
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ParsedUrl {
|
||||||
|
#[cfg(unix)]
|
||||||
|
Tcp(SocketAddr),
|
||||||
|
Unix(PathBuf)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Url> for ParsedUrl {
|
||||||
|
fn from(url: Url) -> Self {
|
||||||
|
match url.scheme() {
|
||||||
|
#[cfg(unix)]
|
||||||
|
"unix" | "file" => {
|
||||||
|
ParsedUrl::Unix(url.to_file_path().expect("Invalid path url"))
|
||||||
|
},
|
||||||
|
"http" | "tcp" => {
|
||||||
|
ParsedUrl::Tcp(url.socket_addrs(|| None).expect("Invalid address and port").pop().expect("Target domain is not found"))
|
||||||
|
},
|
||||||
|
_ => panic!("Invalid url scheme")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let args = Cli::parse();
|
||||||
|
let endpoint = iroh::Endpoint::builder().discovery_n0().discovery_dht().discovery_local_network().bind().await.unwrap();
|
||||||
|
let server = Server::builder()
|
||||||
|
.add_service(iroh_proto::proto::iroh_server::IrohServer::new(iroh_proto::server::IrohServer::from(endpoint)));
|
||||||
|
let _ = match ParsedUrl::from(args.tonic_endpoint) {
|
||||||
|
#[cfg(unix)]
|
||||||
|
ParsedUrl::Unix(x) => {
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
use tokio_stream::wrappers::UnixListenerStream;
|
||||||
|
if x.exists() {
|
||||||
|
if x.is_file() {
|
||||||
|
println!("Socket file already exists. Removing.");
|
||||||
|
std::fs::remove_file(&x).expect("Failed to remove target file already exists");
|
||||||
|
} else if x.is_dir() {
|
||||||
|
panic!("Directory already exists");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let uds = UnixListener::bind(x).expect("Failed to bind the path");
|
||||||
|
let uds_stream = UnixListenerStream::new(uds);
|
||||||
|
server.serve_with_incoming(uds_stream).await
|
||||||
|
},
|
||||||
|
ParsedUrl::Tcp(x) => {
|
||||||
|
server.serve(x).await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
@ -1,40 +1,44 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
use iroh::{Endpoint, NodeId};
|
use iroh::{endpoint, Endpoint, NodeId};
|
||||||
use tonic::{Response, Request, Status};
|
use tonic::{Response, Request, Status};
|
||||||
use tokio_stream::Stream;
|
use tokio_stream::Stream;
|
||||||
|
|
||||||
use crate::proto::{RemoteInfoIterRequest, RemoteInfoMessage, RemoteInfoRequest, RemoteInfoResponse};
|
use crate::proto::{RemoteInfoIterRequest, RemoteInfoMessage, RemoteInfoRequest, RemoteInfoResponse};
|
||||||
|
|
||||||
#[tonic::async_trait]
|
pub struct IrohServer
|
||||||
pub trait AsEndpoint: Send + Sync + 'static {
|
|
||||||
async fn as_endpoint(&self) -> &Endpoint;
|
|
||||||
}
|
|
||||||
pub struct IrohServer<T>
|
|
||||||
where
|
where
|
||||||
T: AsEndpoint
|
|
||||||
{
|
{
|
||||||
endpoint: T
|
endpoint: Endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Endpoint> for IrohServer {
|
||||||
|
fn from(endpoint: Endpoint) -> Self {
|
||||||
|
Self{endpoint: endpoint}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<T> crate::proto::iroh_server::Iroh for IrohServer<T>
|
impl crate::proto::iroh_server::Iroh for IrohServer {
|
||||||
where
|
|
||||||
T: AsEndpoint
|
|
||||||
{
|
|
||||||
type RemoteInfoIterStream = Pin<Box<dyn Stream<Item = Result<RemoteInfoResponse, Status>> + Send>>;
|
type RemoteInfoIterStream = Pin<Box<dyn Stream<Item = Result<RemoteInfoResponse, Status>> + Send>>;
|
||||||
async fn remote_info(&self, request: Request<RemoteInfoRequest>) -> Result<Response<RemoteInfoResponse>, Status> {
|
async fn remote_info(&self, request: Request<RemoteInfoRequest>) -> Result<Response<RemoteInfoResponse>, Status> {
|
||||||
let node_id = NodeId::try_from(request.into_inner()).or_else(|e| {
|
let node_id = NodeId::try_from(request.into_inner()).or_else(|e| {
|
||||||
Err(Status::from_error(Box::new(e)))
|
Err(Status::from_error(Box::new(e)))
|
||||||
})?;
|
})?;
|
||||||
let remote_info: Option<RemoteInfoMessage> = self.endpoint.as_endpoint().await.remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| {
|
let remote_info: Option<RemoteInfoMessage> = self.endpoint.remote_info(node_id).map(|x| x.try_into()).transpose().or_else(|e| {
|
||||||
Err(Status::from_error(Box::new(e)))
|
Err(Status::from_error(Box::new(e)))
|
||||||
})?;
|
})?;
|
||||||
Ok(Response::new(RemoteInfoResponse::from(remote_info)))
|
Ok(Response::new(RemoteInfoResponse::from(remote_info)))
|
||||||
}
|
}
|
||||||
async fn remote_info_iter(&self, _: Request<RemoteInfoIterRequest>) -> Result<Response<Self::RemoteInfoIterStream>, Status> {
|
async fn remote_info_iter(&self, _: Request<RemoteInfoIterRequest>)
|
||||||
let iter = self.endpoint.as_endpoint().await.remote_info_iter();
|
-> Result<Response<Self::RemoteInfoIterStream>, Status> {
|
||||||
|
let iter = self.endpoint.remote_info_iter()
|
||||||
|
.map(|x| {
|
||||||
|
RemoteInfoMessage::try_from(x).map(|x| RemoteInfoResponse::from(x)).or_else(|e| {
|
||||||
|
Err(Status::from_error(Box::new(e)))
|
||||||
|
})
|
||||||
|
});
|
||||||
let stream = futures::stream::iter(iter);
|
let stream = futures::stream::iter(iter);
|
||||||
todo!()
|
Ok(Response::new(Box::pin(stream)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Add table
Reference in a new issue