Update local data and protobuf

This commit is contained in:
fluo10 2025-09-27 19:30:17 +09:00
parent 2a29f1fc82
commit 22fbe53710
14 changed files with 397 additions and 78 deletions

View file

@ -23,7 +23,7 @@ futures.workspace = true
iroh.workspace = true
prost.workspace = true
prost-types.workspace = true
rusqlite = { workspace = true, features = ["bundled", "chrono"] }
rusqlite = { workspace = true, features = ["bundled", "chrono", "uuid"] }
serde.workspace = true
sysinfo = "0.37.0"
tempfile = { version = "3.20.0", optional = true }

View file

@ -1,4 +1,9 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_prost_build::compile_protos("proto/caretta_sync.proto")?;
tonic_prost_build::configure()
.extern_path(".tripod_id", "::tripod_id::prost::generated")
.compile_protos(
&["proto/caretta_sync.proto", "proto/caretta_sync.common.proto"],
&["proto"]
)?;
Ok(())
}

View file

@ -0,0 +1,28 @@
syntax = "proto3";
package caretta_sync.common;
import "tripod_id.proto";
message PublicKey {
bytes key = 1;
}
message UuidMessage {
uint64 first_half = 1;
uint64 second_half = 2;
}
message PeerIdentifierMessage {
oneof peer_identifier {
tripod_id.Double id = 1;
PublicKey public_key = 2;
}
}
message PeerIdentifierErrorMessage {
oneof error {
string peer_id_not_found = 1;
string public_key_not_found = 2;
}
}

View file

@ -0,0 +1,97 @@
syntax = "proto3";
package caretta_sync;
import "common.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
service PeerInfo {
rpc PeerInfo(PeerInfoRequest) returns (PeerInfoResponse);
rpc PeerInfoIter(PeerInfoIterRequest) returns (stream PeerInfoResponse);
}
message PeerInfoRequest {
caretta_sync.common.PeerIdentifierMessage peer = 1;
}
message PeerInfoIterRequest {}
message PeerInfoResponse {
oneof result {
PeerInfoMessage remote_info = 1;
Error error = 2;
}
}
message PeerInfoMessage {
DoubleIdMessage id = 1;
PublicKeyMessage public_key = 2;
string relay_url = 3;
repeated DirectAddrInfoMessage addrs = 4;
string conn_type = 5;
google.protobuf.Duration latency = 6;
google.protobuf.Duration last_used = 7;
}
message DirectAddrInfoMessage {
string addr = 1;
google.protobuf.Duration latency = 2;
LastControlMessage last_control = 3;
google.protobuf.Duration last_payload = 4;
google.protobuf.Duration last_alive = 5;
repeated SourceMessage sources = 6;
}
message LastControlMessage {
google.protobuf.Duration duration = 1;
string control_msg = 2;
}
message SourceMessage {
string source = 1;
google.protobuf.Duration duration = 2;
}
message SendAuthorizationRequestRequest {
PeerMessage peer = 1;
}
message SendAuthorizationRequestResponse {
oneof result {
string passcode = 1;
Error error = 2;
}
}
message AuthorizationRequestMessage {
oneof authorization_request {
UuidMessage uuid = 1;
DoubleIdMessage id = 2;
}
}
message AcceptAuthorizationRequestRequest {
AuthorizationRequestMessage authorization_request = 1;
string passcode = 2;
}
message AcceptAuthorizationRequestResponse {
oneof result {
google.protobuf.Empty ok = 1;
Error error = 2;
}
}
message RejectAuthorizationRequestRequest {
AuthorizationRequestMessage authorization_request = 1;
}
message RejectAuthorizationRequestResponse {
oneof result {
google.protobuf.Empty ok = 1;
Error error = 2;
}
}

View file

@ -0,0 +1,99 @@
syntax = "proto3";
package caretta_sync;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
service CarettaSync {
rpc SendAuthorizationRequest(SendAuthorizationRequestRequest) returns (SendAuthorizationRequestResponse);
rpc AcceptAuthorizationRequest(AcceptAuthorizationRequestRequest) returns (AcceptAuthorizationRequestResponse);
rpc RejectAuthorizationRequest(RejectAuthorizationRequestRequest) returns (RejectAuthorizationRequestResponse);
rpc RemoteInfo(RemoteInfoRequest) returns (RemoteInfoResponse);
rpc RemoteInfoIter(RemoteInfoIterRequest) returns (stream RemoteInfoResponse);
}
message RemoteInfoRequest {
PeerMessage peer = 1;
}
message RemoteInfoIterRequest {}
message RemoteInfoResponse {
oneof result {
RemoteInfoMessage remote_info = 1;
Error error = 2;
}
}
message RemoteInfoMessage {
DoubleIdMessage id = 1;
PublicKeyMessage public_key = 2;
string relay_url = 3;
repeated DirectAddrInfoMessage addrs = 4;
string conn_type = 5;
google.protobuf.Duration latency = 6;
google.protobuf.Duration last_used = 7;
}
message DirectAddrInfoMessage {
string addr = 1;
google.protobuf.Duration latency = 2;
LastControlMessage last_control = 3;
google.protobuf.Duration last_payload = 4;
google.protobuf.Duration last_alive = 5;
repeated SourceMessage sources = 6;
}
message LastControlMessage {
google.protobuf.Duration duration = 1;
string control_msg = 2;
}
message SourceMessage {
string source = 1;
google.protobuf.Duration duration = 2;
}
message SendAuthorizationRequestRequest {
PeerMessage peer = 1;
}
message SendAuthorizationRequestResponse {
oneof result {
string passcode = 1;
Error error = 2;
}
}
message AuthorizationRequestMessage {
oneof authorization_request {
UuidMessage uuid = 1;
DoubleIdMessage id = 2;
}
}
message AcceptAuthorizationRequestRequest {
AuthorizationRequestMessage authorization_request = 1;
string passcode = 2;
}
message AcceptAuthorizationRequestResponse {
oneof result {
google.protobuf.Empty ok = 1;
Error error = 2;
}
}
message RejectAuthorizationRequestRequest {
AuthorizationRequestMessage authorization_request = 1;
}
message RejectAuthorizationRequestResponse {
oneof result {
google.protobuf.Empty ok = 1;
Error error = 2;
}
}

View file

@ -0,0 +1,17 @@
syntax = "proto3";
package tripod_id;
// Single size tripod id message
message Single {
uint32 id = 1;
}
// Double size tripod id message
message Double {
uint32 id = 1;
}
// Triple size tripod id message
message Triple {
uint64 id = 1;
}

View file

@ -5,7 +5,7 @@ mod received;
use std::os::unix::raw::time_t;
use caretta_id::DoubleId;
use tripod_id::Double;
use chrono::{DateTime, Local, NaiveDateTime};
use iroh::{NodeId, PublicKey};
pub use sent::*;
@ -20,7 +20,8 @@ use crate::data::local::LocalRecord;
#[derive(Debug, Clone)]
pub struct AuthorizationRequestRecord {
id: u32,
uid: DoubleId,
uuid: Uuid,
public_id: Double,
peer_id: u32,
created_at: DateTime<Local>,
closed_at: Option<DateTime<Local>>,
@ -31,28 +32,49 @@ impl LocalRecord for AuthorizationRequestRecord {
const TABLE_NAME: &str = "authorization_request";
const SELECT_COLUMNS: &[&str] = &[
"id",
"uid",
"uuid",
"public_id",
"peer_id",
"created_at",
"closed_at"
];
const INSERT_COLUMNS: &[&str] = &[
"uid",
"uuid",
"public_id",
"peer_id",
"created_at"
];
type InsertParams<'a> = (&'a DoubleId, &'a [u8;32], &'a NaiveDateTime);
type InsertParams<'a> = (&'a Double, &'a [u8;32], &'a NaiveDateTime);
type SelectValues = (u32, Uuid, Double, PublicKey, NaiveDateTime, Option<NaiveDateTime>);
fn from_row(row: &rusqlite::Row<'_>) -> Result<Self, rusqlite::Error> {
let created_at: NaiveDateTime = row.get(3)?;
let closed_at: Option<NaiveDateTime> = row.get(4)?;
let created_at: NaiveDateTime = row.get(4)?;
let closed_at: Option<NaiveDateTime> = row.get(5)?;
Ok(Self {
id: row.get(0)?,
uid: row.get(1)?,
peer_id: row.get(2)?,
uuid: row.get(1)?,
public_id: row.get(2)?,
peer_id: row.get(3)?,
created_at: created_at.and_utc().into(),
closed_at: closed_at.map(|x| x.and_utc().into())
})
}
}
impl From<(u32, Uuid, Double, u32, NaiveDateTime, Option<NaiveDateTime>)> for AuthorizationRequestRecord {
fn from(value: (u32, Uuid, Double, u32, NaiveDateTime, Option<NaiveDateTime>)) -> Self {
Self {
id: value.0,
uuid: value.1,
public_id: value.2,
peer_id: value.3,
created_at: value.4.and_utc().into(),
closed_at: value.5.map(|x| x.and_utc().into())
}
}
}
impl<'a> From<&'a rusqlite::Row<'_>> for AuthorizationRequestRecord {
fn from(value: &'a rusqlite::Row<'_>) -> Self {
todo!()
}
}

View file

@ -4,20 +4,21 @@ pub fn migrate(tx: &Transaction) -> Result<(), Error>{
tx.execute_batch(
"CREATE TABLE peer (
id INTEGER PRIMARY KEY,
uid INTEGER NOT NULL UNIQUE,
public_id INTEGER NOT NULL UNIQUE,
public_key BLOB UNIQUE NOT NULL
);
CREATE TABLE authorization_request (
id INTEGER PRIMARY KEY,
uid INTEGER NOT NULL UNIQUE,
uuid BLOB NOT NULL UNIQUE,
public_id INTEGER NOT NULL UNIQUE,
peer_id INTEGER NOT NULL UNIQUE,
created_at TEXT NOT NULL,
closed_at TEXT,
FOREIGN KEY(peer_id) REFERENCES peer(id)
);
CREATE TABLE received_authorization_request (
id INTEGER PRIMARY KEY
authorization_request_id INTEGER NOT NULL UNIQUE
id INTEGER PRIMARY KEY,
authorization_request_id INTEGER NOT NULL UNIQUE,
peer_note TEXT,
FOREIGN KEY(authorization_request_id) REFERENCES authorization_request(id)
);

View file

@ -1,60 +1,52 @@
mod authorization_request;
// mod authorization_request;
mod peer;
pub mod migration;
use std::{cell::OnceCell, iter::Map, path::Path, sync::{LazyLock, OnceLock}};
use std::{cell::OnceCell, convert::Infallible, iter::Map, path::Path, sync::{LazyLock, OnceLock}};
use migration::migrate;
use rusqlite::{ffi::Error, params, Connection, MappedRows, OptionalExtension, Params, Row, ToSql};
use rusqlite::{ffi::Error, params, types::FromSql, Connection, MappedRows, OptionalExtension, Params, Row, ToSql};
use crate::{config::StorageConfig, global::{CONFIG, LOCAL_DATABASE_CONNECTION}};
pub use authorization_request::*;
// pub use authorization_request::*;
type LocalRecordError = rusqlite::Error;
/// a struct of id for local database record.
pub type LocalRecordId = u32;
/// a struct for the record without id before inserted
pub type NoLocalRecordId = rusqlite::types::Null;
/// A id struct
/// Model trait for local database data.
/// use LOCAL_DATABASE_CONNECTION for database connection.
pub trait LocalRecord: Sized {
const TABLE_NAME: &str;
const SELECT_COLUMNS: &[&str];
const INSERT_COLUMNS: &[&str];
const COLUMNS: &[&str];
/// Tuple form of the record.
/// the order of field must be same as COLUMNS.
type RowValues;
}
pub trait SelectableLocalRecord: LocalRecord<RowValues: TryInto<Self>> {
const SELECT_STATEMENT: LazyLock<String> = LazyLock::new(|| {
String::from("SELECT ") + &Self::SELECT_COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME
String::from("SELECT ") + &Self::COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME
});
const SELECT_PLACEHOLDER: LazyLock<String> = LazyLock::new(|| {
let mut result : Vec<String> = Vec::new();
for i in 0..Self::SELECT_COLUMNS.len() {
result.push(String::from("?") + &(i+1).to_string());
}
result.join(", ")
});
const INSERT_PLACEHOLDER: LazyLock<String> = LazyLock::new(|| {
let mut result : Vec<String> = Vec::new();
for i in 0..Self::INSERT_COLUMNS.len() {
for i in 0..Self::COLUMNS.len() {
result.push(String::from("?") + &(i+1).to_string());
}
result.join(", ")
});
type InsertParams<'a>: Params
where
Self: 'a;
fn insert(params: Self::InsertParams<'_>) -> Result<Self, rusqlite::Error>
{
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
Ok(connection.query_row(
&[
"INSERT INTO ", Self::TABLE_NAME, "(" , &Self::INSERT_COLUMNS.join(", "), ")",
"VALUES (" , &*Self::INSERT_PLACEHOLDER , ")",
"RETURNING", &Self::SELECT_COLUMNS.join(", ")
].join(" "),
params,
Self::from_row
)?)
}
fn get_one_where<P>(where_statement: &str, params: P) -> Result<Option<Self>, rusqlite::Error>
where P: Params
@ -73,7 +65,7 @@ pub trait LocalRecord: Sized {
{
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
Ok(Some(connection.query_row(
&("SELECT ".to_string() + &Self::SELECT_COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME + " WHERE " + field_name + "= ?1"),
&("SELECT ".to_string() + &Self::COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME + " WHERE " + field_name + "= ?1"),
params![field_value],
Self::from_row
)?))
@ -84,7 +76,7 @@ pub trait LocalRecord: Sized {
fn from_row(row: &Row<'_>) -> Result<Self, rusqlite::Error>;
fn get_all() -> Result<Vec<Self>, rusqlite::Error> {
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
let mut stmt = connection.prepare(&("SELECT ".to_string() + &Self::SELECT_COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME))?;
let mut stmt = connection.prepare(&("SELECT ".to_string() + &Self::COLUMNS.join(", ") + " FROM " + Self::TABLE_NAME))?;
let rows = stmt.query_map(
[],
Self::from_row
@ -97,3 +89,32 @@ pub trait LocalRecord: Sized {
}
}
pub trait InsertableLocalRecord: LocalRecord<RowValues: From<Self> + Params> {
type LocalRecord: Sized + SelectableLocalRecord;
/// Place holder for insertion.
/// Generated from Columns
const INSERT_PLACEHOLDER: LazyLock<String> = LazyLock::new(|| {
let mut result : Vec<String> = Vec::new();
for i in 0..Self::COLUMNS.len() {
result.push(String::from("?") + &(i+1).to_string());
}
result.join(", ")
});
/// Insert and get the inserted record.
fn insert(self) -> Result<Self::LocalRecord, rusqlite::Error>{
let params= Self::RowValues::from(self);
let connection = LOCAL_DATABASE_CONNECTION.get_unchecked();
Ok(connection.query_row(
&[
"INSERT INTO ", Self::TABLE_NAME, "(" , &Self::COLUMNS.join(", "), ")",
"VALUES (" , &*Self::INSERT_PLACEHOLDER , ")",
"RETURNING", &Self::COLUMNS.join(", ")
].join(" "),
params,
Self::LocalRecord::from_row
)?)
}
}

View file

@ -2,13 +2,13 @@
use std::os::unix::raw::time_t;
use caretta_id::DoubleId;
use tripod_id::Double;
use chrono::{DateTime, Local, NaiveDateTime};
use iroh::{NodeId, PublicKey};
use rusqlite::{params, types::{FromSqlError, Null}, Connection};
use uuid::Uuid;
use crate::{data::local::{self, LocalRecord}, global::LOCAL_DATABASE_CONNECTION};
use crate::{data::local::{self, InsertableLocalRecord, LocalRecord, LocalRecordId, NoLocalRecordId, SelectableLocalRecord}, global::LOCAL_DATABASE_CONNECTION};
/// Peer information cached in local database.
///
@ -18,56 +18,84 @@ use crate::{data::local::{self, LocalRecord}, global::LOCAL_DATABASE_CONNECTION}
/// - Once a peer is authorized, it is assigned a global (=synced) ID as authorized_peer so essentially this local id targets unauthorized peers.
///
#[derive(Clone, Debug, PartialEq)]
pub struct PeerRecord {
pub struct PeerRecord<T> {
/// primary key.
pub id: u32,
/// serial primary key.
pub id: T,
/// uid of peer.
/// public tripod id of peer.
/// this id is use only the node itself and not synced so another node has different local_peer_id even if its public_key is same.
pub uid: DoubleId,
pub public_id: Double,
/// Iroh public key
pub public_key: PublicKey,
}
impl PeerRecord {
impl PeerRecord<LocalRecordId> {
pub fn get_or_insert_by_public_key(public_key: &PublicKey) -> Result<Self, rusqlite::Error> {
match Self::get_by_public_key(public_key)? {
Some(x) => Ok(x),
None => {
let new_uid: DoubleId = rand::random();
Ok(Self::insert((&new_uid, public_key.as_bytes()))?)
let new = PeerRecord{
id: NoLocalRecordId{},
public_id: rand::random(),
public_key: public_key.clone()
};
Ok(new.insert()?)
}
}
}
pub fn get_by_uid(local_id: &DoubleId) -> Result<Option<Self>, rusqlite::Error> {
Self::get_one_where("WHERE local_peer_id = ?1", (local_id,))
pub fn get_by_public_id(public_id: &Double) -> Result<Option<Self>, rusqlite::Error> {
Self::get_one_where("WHERE public_id = ?1", (public_id,))
}
pub fn get_by_public_key(public_key: &PublicKey) -> Result<Option<Self>, rusqlite::Error> {
Self::get_one_where("WHERE public_Key = ?1", (public_key.as_bytes(),))
}
}
impl LocalRecord for PeerRecord {
impl<T> LocalRecord for PeerRecord<T> {
const TABLE_NAME: &str = "peer";
const SELECT_COLUMNS: &[&str] = &[
const COLUMNS: &[&str] = &[
"id",
"uid",
"public_id",
"public_key"
];
const INSERT_COLUMNS: &[&str] = &[
"uid",
"public_key"
];
type InsertParams<'a> = (&'a DoubleId, &'a [u8;32]);
type RowValues = (T, Double, [u8;32]);
}
impl SelectableLocalRecord for PeerRecord<LocalRecordId> {
fn from_row(row: &rusqlite::Row<'_>) -> Result<Self, rusqlite::Error> {
Ok(Self {
id: row.get(0)?,
uid: row.get(1)?,
public_id: row.get(1)?,
public_key: PublicKey::from_bytes(&row.get(2)?).map_err(|e| FromSqlError::Other(Box::new(e)))?
})
}
}
impl TryFrom<(LocalRecordId, Double, [u8;32])> for PeerRecord<LocalRecordId> {
type Error = rusqlite::Error;
fn try_from(value: (LocalRecordId, Double, [u8;32])) -> Result<Self, Self::Error> {
Ok(Self {
id: value.0,
public_id: value.1,
public_key: PublicKey::from_bytes(&value.2).map_err(|x| FromSqlError::Other(Box::new(x)))?
})
}
}
impl InsertableLocalRecord for PeerRecord<NoLocalRecordId> {
type LocalRecord = PeerRecord<LocalRecordId>;
}
impl From<PeerRecord<NoLocalRecordId>> for (NoLocalRecordId, Double, [u8;32]){
fn from(value: PeerRecord<NoLocalRecordId>) -> Self {
(value.id, value.public_id, value.public_key.as_bytes().to_owned())
}
}
@ -85,7 +113,7 @@ mod tests {
let key = SecretKey::generate(&mut rand::rngs::OsRng);
let pubkey = key.public();
let record = PeerRecord::get_or_insert_by_public_key(&pubkey).unwrap();
assert_eq!(record, PeerRecord::get_by_uid(&record.uid).unwrap().unwrap());
assert_eq!(record, PeerRecord::get_by_public_id(&record.public_id).unwrap().unwrap());
assert_eq!(record, PeerRecord::get_by_public_key(&record.public_key).unwrap().unwrap());
}
}

View file

@ -3,3 +3,4 @@ mod error;
mod server;
tonic::include_proto!("caretta_sync");
tonic::include_proto!("caretta_sync.common");

View file

@ -1,5 +1,5 @@
syntax = "proto3";
package fireturtle.tripod_id;
package tripod_id;
// Single size tripod id message
message Single {

View file

@ -1,4 +1,4 @@
mod generated {
pub mod generated {
include!(concat!(env!("OUT_DIR"), "/fireturtle.tripod_id.rs"));
}

View file

@ -15,7 +15,7 @@ use crate::TripodId;
/// # use tripod_id::{TripodId, Triple};
/// # use std::str::FromStr;
///
/// let _ = tripod_id::from_str("123-abc");
/// let _ = Triple::from_str("012-abc-def");
/// ```
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct Triple(u64);