Compare commits

...

11 commits

Author SHA1 Message Date
ebbe3d82d6 Add syncable traits 2025-06-25 07:31:49 +09:00
8fbf8a16ec Add wrapper value for Multiaddr and PeerId 2025-06-24 22:06:25 +09:00
3356684530 Add TestDefault trait 2025-06-22 11:29:44 +09:00
3a2f53dd46 Add reusable global structs 2025-06-20 08:42:02 +09:00
eb428eb537 flatten global structs 2025-06-20 07:28:51 +09:00
a80b9bcdf1 Add global traits 2025-06-19 07:24:44 +09:00
66be78dabf Add desktop config 2025-06-18 08:36:01 +09:00
1854e84949 Add ipc request 2025-06-17 08:47:06 +09:00
1a5ea87780 Merge branch 'main' into feature/unix_socket 2025-06-17 08:01:29 +09:00
70107257c2 Add unix socket listner 2025-06-17 07:20:24 +09:00
7569d296b1 Add .DS_Store to gitignore 2025-06-14 09:13:13 +09:00
52 changed files with 1310 additions and 377 deletions

4
.gitignore vendored
View file

@ -20,4 +20,6 @@ Cargo.lock
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
tmp/
tmp/
.DS_Store

View file

@ -10,6 +10,8 @@ license = "MIT OR Apache-2.0"
repository = "https://forgejo.fireturlte.net/lazy-supplements"
[workspace.dependencies]
ciborium = "0.2.2"
clap = { version = "4.5.38", features = ["derive"] }
dioxus = { version = "0.6.0", features = [] }
lazy-supplements-core.path = "lazy-supplements-core"
libp2p = { version = "0.55.0", features = ["macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux" ] }
@ -17,3 +19,4 @@ sea-orm-migration = { version = "1.1.0", features = ["runtime-tokio-rustls", "sq
serde = { version = "1.0.219", features = ["derive"] }
thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["macros", "rt", "rt-multi-thread"] }
uuid = { version = "1.17.0", features = ["v7"] }

View file

@ -8,12 +8,15 @@ repository.workspace = true
[features]
default = []
desktop = ["dep:clap"]
test = ["dep:tempfile"]
[dependencies]
base64 = "0.22.1"
chrono = "0.4.41"
chrono-tz = "0.10.3"
ciborium.workspace = true
clap = {workspace = true, optional = true}
futures = "0.3.31"
libp2p.workspace = true
sea-orm = { version = "1.1.11", features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros", "with-chrono", "with-uuid"] }
@ -25,7 +28,7 @@ tokio.workspace = true
toml = "0.8.22"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
uuid = { version = "1.17.0", features = ["v7"] }
uuid.workspace = true
[dev-dependencies]
tempfile = "3.20.0"

View file

@ -0,0 +1,8 @@
mod peer;
pub use peer::{
ActiveModel as ActivePeerModel,
Column as PeerColumn,
Model as PeerModel,
Entity as PeerEntity,
};

View file

@ -0,0 +1,73 @@
use std::str::FromStr;
use chrono::{Days, Local};
use libp2p::{multiaddr, Multiaddr, PeerId};
use sea_orm::{entity::{
prelude::*, *
}, sea_query};
use serde::{Deserialize, Serialize};
use crate::data::value::{MultiaddrValue, PeerIdValue};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "peer")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u32,
#[sea_orm(indexed)]
pub created_at: DateTimeUtc,
#[sea_orm(indexed)]
pub updated_at: DateTimeUtc,
#[sea_orm(indexed)]
pub expires_at: DateTimeUtc,
#[sea_orm(indexed)]
pub peer_id: PeerIdValue,
#[sea_orm(indexed)]
pub address: MultiaddrValue,
}
#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn new(peer_id: PeerId, multiaddr: Multiaddr) -> Self {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
peer_id: Set(PeerIdValue::from(peer_id)),
address: Set(MultiaddrValue::from(multiaddr)),
created_at: Set(timestamp),
updated_at: Set(timestamp),
expires_at: Set(timestamp.checked_add_days(Days::new(30)).unwrap()),
..Default::default()
}
}
}
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use crate::{cache::entity::peer, global::get_or_init_test_cache_database};
use super::*;
use libp2p::{identity::{self, Keypair}, swarm::handler::multi};
#[tokio::test]
async fn insert() {
let db = get_or_init_test_cache_database().await;
let peer_id = Keypair::generate_ed25519().public().to_peer_id();
let multiaddr = Multiaddr::empty()
.with(Ipv4Addr::new(127,0,0,1).into())
.with(multiaddr::Protocol::Tcp(0));
let inserted: Model = ActiveModel::new(peer_id.clone(), multiaddr.clone())
.insert(db).await.unwrap();
assert_eq!(PeerId::from(inserted.peer_id), peer_id);
assert_eq!(Multiaddr::from(inserted.address), multiaddr);
}
}

View file

@ -9,18 +9,16 @@ pub struct Migration;
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Peer::up(manager).await?;
Address::up(manager).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Peer::down(manager).await?;
Address::down(manager).await?;
Ok(())
}
}
#[derive(DeriveIden)]
#[derive(DeriveIden, DeriveMigrationName)]
enum Peer {
Table,
Id,
@ -28,9 +26,14 @@ enum Peer {
CreatedAt,
UpdatedAt,
ExpiresAt,
Address,
}
static IDX_PEER_ADDRESS: &str = "idx_peer_address";
static IDX_PEER_PEER_ID: &str = "idx_peer_peer_id";
static IDX_PEER_CREATED_AT: &str = "idx_peer_created_at";
static IDX_PEER_UPDATED_AT: &str = "idx_peer_updated_at";
static IDX_PEER_EXPIRES_AT: &str = "idx_peer_expires_at";
#[async_trait::async_trait]
impl TableMigration for Peer {
@ -40,10 +43,11 @@ impl TableMigration for Peer {
.table(Self::Table)
.if_not_exists()
.col(pk_auto(Self::Id))
.col(string_len(Self::PeerId, 255))
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::UpdatedAt))
.col(timestamp(Self::ExpiresAt))
.col(string_len_uniq(Self::PeerId, 255))
.col(text_uniq(Self::Address))
.to_owned()
).await?;
manager.create_index(
@ -53,53 +57,32 @@ impl TableMigration for Peer {
.col(Self::PeerId)
.to_owned()
).await?;
Ok(())
}
async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{
manager.drop_table(Table::drop().table(Self::Table).to_owned()).await
}
}
#[derive(DeriveIden, DeriveMigrationName)]
enum Address {
Table,
Id,
PeerId,
CreatedAt,
UpdatedAt,
ExpiresAt,
MultiAddress,
}
static IDX_ADDRESS_MULTIADDRESS: &str = "idx_address_multiaddress";
static FK_ADDRESS_PEER: &str = "fk_address_peer";
#[async_trait::async_trait]
impl TableMigration for Address {
async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> {
manager.create_table(
Table::create()
manager.create_index(
Index::create()
.name(IDX_PEER_ADDRESS)
.table(Self::Table)
.if_not_exists()
.col(pk_auto(Self::Id))
.col(integer(Self::PeerId))
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::UpdatedAt))
.col(timestamp(Self::ExpiresAt))
.col(text_uniq(Self::MultiAddress))
.foreign_key(
ForeignKey::create()
.name(FK_ADDRESS_PEER)
.from(Self::Table, Self::PeerId)
.to(Peer::Table, Peer::Id)
)
.col(Self::Address)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_ADDRESS_MULTIADDRESS)
.name(IDX_PEER_CREATED_AT)
.table(Self::Table)
.col(Self::MultiAddress)
.col(Self::CreatedAt)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_PEER_UPDATED_AT)
.table(Self::Table)
.col(Self::UpdatedAt)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_PEER_EXPIRES_AT)
.table(Self::Table)
.col(Self::ExpiresAt)
.to_owned()
).await?;
Ok(())

View file

@ -0,0 +1,5 @@
#[derive(thiserror::Error, Debug)]
pub enum ConfigError {
#[error("missing config: {0}")]
MissingConfig(String),
}

View file

@ -1,14 +1,112 @@
mod node;
pub mod error;
mod storage;
mod p2p;
use std::path::Path;
use crate::error::Error;
pub use node::{ NodeConfig, RawNodeConfig };
use serde::{Deserialize, Serialize};
pub use error::ConfigError;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
#[derive(Debug, Deserialize, Serialize)]
pub struct PartialConfig {
node: Option<NodeConfig>,
pub use storage::{StorageConfig, PartialStorageConfig};
pub use p2p::{P2pConfig, PartialP2pConfig};
pub trait PartialConfig: Serialize + Sized + DeserializeOwned
{
fn default() -> Self;
fn empty() -> Self;
fn merge(&mut self, other: Self);
fn from_toml(s: &str) -> Result<Self, toml::de::Error> {
toml::from_str(s)
}
fn into_toml(&self) -> Result<String, toml::ser::Error> {
toml::to_string(self)
}
fn is_empty(&self) -> bool;
}
pub trait ConfigRoot: DeserializeOwned + Serialize {
fn new() -> Self;
async fn read_or_create<T>(path: T) -> Result<Self, Error>
where
T: AsRef<Path>
{
if !path.as_ref().exists() {
Self::new().write_to(&path).await?;
}
Self::read_from(&path).await
}
async fn read_from<T>(path:T) -> Result<Self, Error>
where
T: AsRef<Path>
{
let mut file = File::open(path.as_ref()).await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let config: Self = toml::from_str(&content)?;
Ok(config)
}
async fn write_to<T>(&self, path:T) -> Result<(), Error>
where
T: AsRef<Path>
{
if !path.as_ref().exists() {
if let Some(x) = path.as_ref().parent() {
std::fs::create_dir_all(x)?;
};
let _ = File::create(&path).await?;
}
let mut file = File::create(&path).await?;
file.write_all(toml::to_string(self)?.as_bytes()).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use crate::tests::test_toml_serialize_deserialize;
use super::{p2p::{P2pConfig, PartialP2pConfig}, PartialConfig};
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct TestConfig {
p2p: Option<PartialP2pConfig>
}
impl PartialConfig for TestConfig {
fn default() -> Self {
Self {
p2p: Some(PartialP2pConfig::default()),
}
}
fn empty() -> Self {
Self {
p2p: None,
}
}
fn is_empty(&self) -> bool {
self.p2p.is_none()
}
fn merge(&mut self, other: Self) {
if let Some(p2p) = other.p2p {
self.p2p = Some(p2p);
}
}
}
#[tokio::test]
async fn test_p2p_config_serialize_deserialize() {
test_toml_serialize_deserialize(TestConfig::empty());
test_toml_serialize_deserialize(TestConfig::default());
assert_eq!(TestConfig::empty(), toml::from_str("").unwrap());
assert_eq!("", &toml::to_string(&TestConfig::empty()).unwrap());
}
}

View file

@ -1,20 +1,28 @@
use std::{net::IpAddr, ops, path::{Path, PathBuf}};
use std::{net::{IpAddr, Ipv4Addr}, ops, path::{Path, PathBuf}};
use base64::{prelude::BASE64_STANDARD, Engine};
#[cfg(feature="desktop")]
use clap::Args;
use libp2p::{identity::{self, DecodingError, Keypair}, noise, ping, tcp, yamux, Swarm};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
use tracing_subscriber::EnvFilter;
use crate::{error::Error, p2p};
use crate::{
config::PartialConfig,
error::Error, p2p
};
use super::{PartialConfig};
static DEFAULT_P2P_LISTEN_IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))];
static DEFAULT_P2P_PORT: u16 = 0;
fn keypair_to_base64(keypair: &Keypair) -> Result<String, Error> {
let vec = keypair.to_protobuf_encoding()?;
let base64 = BASE64_STANDARD.encode(vec);
Ok(base64)
fn keypair_to_base64(keypair: &Keypair) -> String {
let vec = match keypair.to_protobuf_encoding() {
Ok(x) => x,
Err(_) => unreachable!(),
};
BASE64_STANDARD.encode(vec)
}
fn base64_to_keypair(base64: &str) -> Result<Keypair, Error> {
@ -23,15 +31,14 @@ fn base64_to_keypair(base64: &str) -> Result<Keypair, Error> {
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NodeConfig {
pub struct P2pConfig {
#[serde(with = "keypair_parser")]
pub secret: Keypair,
pub database_path: PathBuf,
pub listen_ips: Vec<IpAddr>,
pub port: u16,
}
impl NodeConfig {
impl P2pConfig {
pub async fn try_into_swarm (self) -> Result<Swarm<p2p::Behaviour>, Error> {
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(self.secret)
.with_tokio()
@ -47,12 +54,11 @@ impl NodeConfig {
}
}
impl TryFrom<RawNodeConfig> for NodeConfig {
impl TryFrom<PartialP2pConfig> for P2pConfig {
type Error = Error;
fn try_from(raw: RawNodeConfig) -> Result<NodeConfig, Self::Error> {
Ok(NodeConfig {
fn try_from(raw: PartialP2pConfig) -> Result<P2pConfig, Self::Error> {
Ok(P2pConfig {
secret: base64_to_keypair(&raw.secret.ok_or(Error::MissingConfig("secret"))?)?,
database_path: raw.database_path.ok_or(Error::MissingConfig("database_path"))?,
listen_ips: raw.listen_ips.ok_or(Error::MissingConfig("listen_ips"))?,
port: raw.port.ok_or(Error::MissingConfig("port"))?
})
@ -66,10 +72,7 @@ mod keypair_parser {
pub fn serialize<S>(keypair: &Keypair, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
match super::keypair_to_base64(keypair) {
Ok(x) => serializer.serialize_str(&x),
Err(_) => Err(serde::ser::Error::custom("Decoding keypair error"))
}
serializer.serialize_str(&super::keypair_to_base64(keypair))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Keypair, D::Error>
where D: Deserializer<'de>
@ -82,35 +85,28 @@ mod keypair_parser {
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RawNodeConfig {
#[cfg_attr(feature="desktop",derive(Args))]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct PartialP2pConfig {
#[cfg_attr(feature="desktop",arg(long))]
pub secret: Option<String>,
pub database_path: Option<PathBuf>,
#[cfg_attr(feature="desktop",arg(long))]
pub listen_ips: Option<Vec<IpAddr>>,
#[cfg_attr(feature="desktop",arg(long))]
pub port: Option<u16>,
}
impl RawNodeConfig {
impl PartialP2pConfig {
pub fn with_new_secret(mut self) -> Self {
self.secret = Some(keypair_to_base64(&Keypair::generate_ed25519()).unwrap());
self.secret = Some(keypair_to_base64(&Keypair::generate_ed25519()));
self
}
pub fn new() -> Self {
RawNodeConfig {
secret: None,
database_path: None,
listen_ips: None,
port: None,
}
}
pub async fn read_or_create<T>(path: T) -> Result<Self, Error>
where
T: AsRef<Path>
{
if !path.as_ref().exists() {
Self::new().write_to(&path).await?;
Self::empty().write_to(&path).await?;
}
Self::read_from(&path).await
}
@ -121,7 +117,7 @@ impl RawNodeConfig {
let mut file = File::open(path.as_ref()).await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let config: RawNodeConfig = toml::from_str(&content)?;
let config: Self = toml::from_str(&content)?;
Ok(config)
}
pub async fn write_to<T>(&self, path:T) -> Result<(), Error>
@ -138,14 +134,33 @@ impl RawNodeConfig {
file.write_all(toml::to_string(self)?.as_bytes()).await?;
Ok(())
}
}
pub fn merge(&mut self, another: RawNodeConfig) {
impl From<P2pConfig> for PartialP2pConfig {
fn from(config: P2pConfig) -> Self {
Self {
secret: Some(keypair_to_base64(&config.secret)),
listen_ips: Some(config.listen_ips),
port: Some(config.port)
}
}
}
impl PartialConfig for PartialP2pConfig {
fn empty() -> Self {
Self {
secret: None,
listen_ips: None,
port: None,
}
}
fn is_empty(&self) -> bool {
self.secret.is_none() && self.listen_ips.is_none() && self.port.is_none()
}
fn merge(&mut self, another: Self) {
if let Some(x) = another.secret {
self.secret = Some(x);
};
if let Some(x) = another.database_path {
self.database_path = Some(x);
};
if let Some(x) = another.listen_ips {
self.listen_ips = Some(x);
};
@ -153,29 +168,36 @@ impl RawNodeConfig {
self.port = Some(x);
};
}
}
impl ops::Add<RawNodeConfig> for RawNodeConfig {
type Output = RawNodeConfig;
fn add(mut self, another: RawNodeConfig) -> RawNodeConfig {
self.merge(another);
self
fn default() -> Self {
Self {
secret: None,
listen_ips: Some(Vec::from(DEFAULT_P2P_LISTEN_IPS)),
port: Some(DEFAULT_P2P_PORT),
}
}
}
#[cfg(test)]
mod tests {
use libp2p::identity;
use super::*;
use crate::{config::PartialConfig, tests::test_toml_serialize_deserialize};
#[tokio::test]
async fn parse_keypair() {
let keypair = identity::Keypair::generate_ed25519();
let keypair2 = base64_to_keypair(&keypair_to_base64(&keypair).unwrap()).unwrap();
let keypair2 = base64_to_keypair(&keypair_to_base64(&keypair)).unwrap();
assert_eq!(keypair.public(), keypair2.public());
}
#[tokio::test]
async fn test_p2p_config_serialize_deserialize() {
test_toml_serialize_deserialize(PartialP2pConfig::empty());
test_toml_serialize_deserialize(PartialP2pConfig::default());
}
}

View file

@ -0,0 +1,91 @@
use std::path::PathBuf;
#[cfg(feature="desktop")]
use clap::Args;
#[cfg(any(test, feature="test"))]
use tempfile::tempdir;
use crate::{config::{ConfigError, PartialConfig}};
use libp2p::mdns::Config;
use serde::{Deserialize, Serialize};
static DATA_DATABASE_NAME: &str = "data.sqlite";
static CACHE_DATABASE_NAME: &str = "cache.sqlite";
#[cfg(any(test, feature="test"))]
use crate::tests::{GlobalTestDefault, TestDefault};
#[derive(Debug)]
pub struct StorageConfig {
pub data_directory: PathBuf,
pub cache_directory: PathBuf,
}
impl StorageConfig {
pub fn get_data_database_path(&self) -> PathBuf{
self.data_directory.join(DATA_DATABASE_NAME)
}
pub fn get_cache_database_path(&self) -> PathBuf {
self.cache_directory.join(CACHE_DATABASE_NAME)
}
}
#[cfg(any(test, feature="test"))]
impl TestDefault for StorageConfig {
fn test_default() -> Self {
let temp_dir = tempdir().unwrap().keep();
Self { data_directory: temp_dir.clone(), cache_directory: temp_dir }
}
}
impl TryFrom<PartialStorageConfig> for StorageConfig {
type Error = ConfigError;
fn try_from(value: PartialStorageConfig) -> Result<Self, Self::Error> {
Ok(Self {
data_directory: value.data_directory.ok_or(ConfigError::MissingConfig("data_directory".to_string()))?,
cache_directory: value.cache_directory.ok_or(ConfigError::MissingConfig("cache_directory".to_string()))?,
})
}
}
#[cfg_attr(feature="desktop", derive(Args))]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PartialStorageConfig {
#[cfg_attr(feature="desktop", arg(long))]
pub data_directory: Option<PathBuf>,
#[cfg_attr(feature="desktop", arg(long))]
pub cache_directory: Option<PathBuf>,
}
impl From<StorageConfig> for PartialStorageConfig {
fn from(config: StorageConfig) -> PartialStorageConfig {
Self {
data_directory: Some(config.data_directory),
cache_directory: Some(config.cache_directory),
}
}
}
impl PartialConfig for PartialStorageConfig {
fn empty() -> Self {
Self{
data_directory: None,
cache_directory: None,
}
}
fn is_empty(&self) -> bool {
self.data_directory.is_none() && self.cache_directory.is_none()
}
fn default() -> Self {
todo!()
}
fn merge(&mut self, other: Self) {
if let Some(x) = other.data_directory {
self.data_directory = Some(x);
}
if let Some(x) = other.cache_directory {
self.cache_directory = Some(x);
}
}
}

View file

@ -1,11 +1,11 @@
mod node;
mod trusted_peer;
mod record_deletion;
pub use node::{
ActiveModel as NodeActiveModel,
Column as NodeColumn,
Entity as NodeEntity,
Model as NodeModel,
pub use trusted_peer::{
ActiveModel as TrustedPeerActiveModel,
Column as TrustedPeerColumn,
Entity as TrustedPeerEntity,
Model as TrustedPeerModel,
};
pub use record_deletion::{
@ -14,8 +14,3 @@ pub use record_deletion::{
Entity as RecordDeletionEntity,
Model as RecordDeletionModel,
};
use uuid::{ContextV7, Timestamp, Uuid};
pub fn generate_uuid() -> Uuid {
Uuid::new_v7(Timestamp::now(ContextV7::new()))
}

View file

@ -26,7 +26,7 @@ impl ActiveModel {
pub fn new() -> Self {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
id: Set(super::generate_uuid()),
id: Set(crate::global::generate_uuid()),
created_at: Set(timestamp),
..Default::default()
}
@ -35,18 +35,19 @@ impl ActiveModel {
#[cfg(test)]
mod tests {
use crate::global::get_or_init_test_data_database;
use super::*;
use uuid::{Timestamp, Uuid};
use crate::global::get_or_init_temporary_main_database;
#[tokio::test]
async fn check_insert_record_deletion() {
let db = get_or_init_temporary_main_database().await;
let db = get_or_init_test_data_database().await;
assert!(ActiveModel{
table_name: Set("test_table".to_string()),
record_id: Set(super::super::generate_uuid()),
record_id: Set(crate::global::generate_uuid()),
..ActiveModel::new()
}.insert(db).await.is_ok());
}

View file

@ -5,9 +5,11 @@ use sea_orm::entity::{
};
use serde::{Deserialize, Serialize};
use crate::data::value::PeerIdValue;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "node")]
#[sea_orm(table_name = "trusted_peer")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Uuid,
@ -18,9 +20,10 @@ pub struct Model {
#[sea_orm(indexed)]
pub synced_at: Option<DateTimeUtc>,
#[sea_orm(indexed)]
pub peer_id: String,
pub peer_id: PeerIdValue,
#[sea_orm(column_type = "Text")]
pub note: String,
pub is_prefered: bool,
}
#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)]
@ -32,7 +35,7 @@ impl ActiveModel {
pub fn new() -> Self {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
id: Set(super::generate_uuid()),
id: Set(crate::global::generate_uuid()),
created_at: Set(timestamp),
updated_at: Set(timestamp),
..Default::default()
@ -42,17 +45,18 @@ impl ActiveModel {
#[cfg(test)]
mod tests {
use crate::global::get_or_init_test_data_database;
use super::*;
use libp2p::identity;
use crate::global::GLOBAL;
use libp2p::{identity, PeerId};
#[tokio::test]
async fn check_insert_node() {
let db = crate::global::get_or_init_temporary_main_database().await;
let db = get_or_init_test_data_database().await;
ActiveModel{
peer_id: Set(identity::Keypair::generate_ed25519().public().to_peer_id().to_string()),
peer_id: Set(PeerIdValue::from(PeerId::random())),
note: Set("test note".to_owned()),
..ActiveModel::new()
}.insert(db).await.unwrap();

View file

@ -8,20 +8,20 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Node::up(manager).await?;
TrustedPeer::up(manager).await?;
RecordDeletion::up(manager).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
Node::down(manager).await?;
TrustedPeer::down(manager).await?;
RecordDeletion::down(manager).await?;
Ok(())
}
}
#[derive(DeriveIden)]
enum Node {
enum TrustedPeer {
Table,
Id,
CreatedAt,
@ -29,10 +29,11 @@ enum Node {
SyncedAt,
PeerId,
Note,
IsPrefered,
}
#[async_trait::async_trait]
impl TableMigration for Node {
impl TableMigration for TrustedPeer {
async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> {
manager.create_table(
Table::create()
@ -44,6 +45,7 @@ impl TableMigration for Node {
.col(timestamp_null(Self::SyncedAt))
.col(string_len(Self::PeerId, 255))
.col(text(Self::Note))
.col(boolean(Self::IsPrefered))
.to_owned()
).await?;
Ok(())

View file

@ -3,11 +3,11 @@ use sea_orm_migration::prelude::*;
pub mod m20220101_000001_create_main_tables;
#[cfg(any(test, feature="test"))]
pub struct MainMigrator;
pub struct DataMigrator;
#[cfg(any(test, feature="test"))]
#[async_trait::async_trait]
impl MigratorTrait for MainMigrator {
impl MigratorTrait for DataMigrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20220101_000001_create_main_tables::Migration)]
}

View file

@ -1,2 +1,4 @@
pub mod entity;
pub mod migration;
pub mod migration;
pub mod syncable;
pub mod value;

View file

@ -0,0 +1,64 @@
use sea_orm::{*, prelude::*, query::*};
pub trait SyncableModel: ModelTrait<Entity = Self::SyncableEntity> {
type SyncableEntity: SyncableEntity<SyncableModel = Self>;
fn get_updated_at(&self) -> DateTimeUtc;
fn get_uuid(&self) -> Uuid;
}
pub trait SyncableEntity: EntityTrait<
Model = Self::SyncableModel,
ActiveModel = Self::SyncableActiveModel,
Column = Self::SyncableColumn,
>{
type SyncableModel: SyncableModel<SyncableEntity = Self> + FromQueryResult;
type SyncableActiveModel: SyncableActiveModel<SyncableEntity= Self>;
type SyncableColumn: SyncableColumn;
async fn get_updated_after(date: DateTimeUtc, db: &DatabaseConnection) -> Result<Vec<<Self as EntityTrait>::Model>, SyncableError> {
let result: Vec<Self::SyncableModel> = <Self as EntityTrait>::find()
.filter(Self::SyncableColumn::updated_at().gte(date))
.all(db)
.await.unwrap();
Ok(result)
}
fn apply_updated(models: Vec<<Self as EntityTrait>::Model>, db: &DatabaseConnection) {
todo!()
}
}
pub trait SyncableActiveModel: ActiveModelTrait<Entity = Self::SyncableEntity> {
type SyncableEntity: SyncableEntity<SyncableActiveModel = Self>;
fn get_uuid(&self) -> Option<Uuid>;
fn get_updated_at(&self) -> Option<DateTimeUtc>;
fn try_merge(&mut self, other: <Self::SyncableEntity as SyncableEntity>::SyncableModel) -> Result<(), SyncableError> {
if self.get_uuid().ok_or(SyncableError::MissingField("uuid"))? != other.get_uuid() {
return Err(SyncableError::MismatchUuid)
}
if self.get_updated_at().ok_or(SyncableError::MissingField("updated_at"))? < other.get_updated_at() {
for column in <<<Self as ActiveModelTrait>::Entity as EntityTrait>::Column as Iterable>::iter() {
self.take(column).set_if_not_equals(other.get(column));
}
}
Ok(())
}
}
pub trait SyncableColumn: ColumnTrait {
fn is_uuid(&self) -> bool;
fn is_updated_at(&self) -> bool;
fn updated_at() -> Self;
fn should_not_sync(&self);
}
#[derive(Debug, thiserror::Error)]
pub enum SyncableError {
#[error("Invalid UUID")]
MismatchUuid,
#[error("mandatory field {0} is missing")]
MissingField(&'static str),
}

View file

@ -0,0 +1,5 @@
mod multiaddr;
mod peer_id;
pub use multiaddr::MultiaddrValue;
pub use peer_id::PeerIdValue;

View file

@ -0,0 +1,68 @@
use std::str::FromStr;
use libp2p::Multiaddr;
use sea_orm::{sea_query::ValueTypeErr, DbErr};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct MultiaddrValue(Multiaddr);
impl From<Multiaddr> for MultiaddrValue {
fn from(source: Multiaddr) -> Self {
Self(source)
}
}
impl From<MultiaddrValue> for Multiaddr {
fn from(source: MultiaddrValue) -> Self {
source.0
}
}
impl From<MultiaddrValue> for sea_orm::Value {
fn from(value: MultiaddrValue) -> Self {
Self::from(value.0.to_string())
}
}
impl sea_orm::TryGetable for MultiaddrValue {
fn try_get_by<I: sea_orm::ColIdx>(res: &sea_orm::QueryResult, idx: I)
-> std::result::Result<Self, sea_orm::TryGetError> {
match <String as sea_orm::TryGetable>::try_get_by(res, idx){
Ok(x) => match Multiaddr::from_str(&x) {
Ok(y) => Ok(Self(y)),
Err(_) => Err(DbErr::Type("Multiaddr".to_string()).into()),
},
Err(x) => Err(x),
}
}
}
impl sea_orm::sea_query::ValueType for MultiaddrValue {
fn try_from(v: sea_orm::Value) -> std::result::Result<Self, sea_orm::sea_query::ValueTypeErr> {
match <String as sea_orm::sea_query::ValueType>::try_from(v) {
Ok(x) => match Multiaddr::from_str(&x) {
Ok(y) => Ok(Self(y)),
Err(_) => Err(ValueTypeErr{}),
},
Err(e) => Err(e)
}
}
fn type_name() -> std::string::String {
stringify!(MultiaddrValue).to_owned()
}
fn array_type() -> sea_orm::sea_query::ArrayType {
sea_orm::sea_query::ArrayType::String
}
fn column_type() -> sea_orm::sea_query::ColumnType {
sea_orm::sea_query::ColumnType::Text
}
}
impl sea_orm::sea_query::Nullable for MultiaddrValue {
fn null() -> sea_orm::Value {
<String as sea_orm::sea_query::Nullable>::null()
}
}

View file

@ -0,0 +1,120 @@
use std::str::FromStr;
use libp2p::PeerId;
use sea_orm::{sea_query::ValueTypeErr, DbErr};
use serde::{Deserialize, Serialize};
use crate::error::Error;
#[derive(Clone, Debug, PartialEq)]
pub struct PeerIdValue(PeerId);
impl<'de> Deserialize<'de> for PeerIdValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de> {
Self::from_str(&String::deserialize(deserializer)?).or(Err(<D::Error as serde::de::Error>::custom("fail to parse PeerId")))
}
}
impl Serialize for PeerIdValue {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer {
serializer.serialize_str(&self.0.to_string())
}
}
impl FromStr for PeerIdValue{
type Err = libp2p::identity::ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(PeerId::from_str(s)?))
}
}
impl ToString for PeerIdValue {
fn to_string(&self) -> String {
self.0.to_string()
}
}
impl From<PeerId> for PeerIdValue {
fn from(source: PeerId) -> Self {
Self(source)
}
}
impl From<PeerIdValue> for PeerId {
fn from(source: PeerIdValue) -> Self {
source.0
}
}
impl From<PeerIdValue> for sea_orm::Value {
fn from(value: PeerIdValue) -> Self {
Self::from(value.0.to_string())
}
}
impl sea_orm::TryGetable for PeerIdValue {
fn try_get_by<I: sea_orm::ColIdx>(res: &sea_orm::QueryResult, idx: I)
-> std::result::Result<Self, sea_orm::TryGetError> {
match <String as sea_orm::TryGetable>::try_get_by(res, idx){
Ok(x) => match PeerId::from_str(&x) {
Ok(y) => Ok(Self(y)),
Err(_) => Err(DbErr::Type("PeerId".to_string()).into()),
},
Err(x) => Err(x),
}
}
}
impl sea_orm::sea_query::ValueType for PeerIdValue {
fn try_from(v: sea_orm::Value) -> std::result::Result<Self, sea_orm::sea_query::ValueTypeErr> {
match <String as sea_orm::sea_query::ValueType>::try_from(v) {
Ok(x) => match PeerId::from_str(&x) {
Ok(y) => Ok(Self(y)),
Err(_) => Err(ValueTypeErr{}),
},
Err(e) => Err(e)
}
}
fn type_name() -> std::string::String {
stringify!(PeerIdValue).to_owned()
}
fn array_type() -> sea_orm::sea_query::ArrayType {
sea_orm::sea_query::ArrayType::String
}
fn column_type() -> sea_orm::sea_query::ColumnType {
sea_orm::sea_query::ColumnType::Text
}
}
impl sea_orm::sea_query::Nullable for PeerIdValue {
fn null() -> sea_orm::Value {
<String as sea_orm::sea_query::Nullable>::null()
}
}
#[cfg(test)]
mod tests {
use crate::tests::{test_cbor_serialize_deserialize, test_toml_serialize_deserialize};
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
struct PeerIdValueWrapper {
content: PeerIdValue
}
#[test]
fn test_serialize_deserialize() {
let peer_id= PeerIdValueWrapper{content: PeerIdValue::from(PeerId::random())};
let x = toml::to_string(&peer_id).unwrap();
assert_eq!(peer_id.content, toml::from_str::<PeerIdValueWrapper>(&x).unwrap().content)
}
}

View file

@ -2,6 +2,10 @@
pub enum Error {
#[error("Base64 decode error: {0}")]
Base64Decode(#[from] base64::DecodeError),
#[error(transparent)]
CiborDeserialize(#[from] ciborium::de::Error<std::io::Error>),
#[error(transparent)]
CiborSerialize(#[from] ciborium::ser::Error<std::io::Error>),
#[error("DB Error: {0}")]
Db(#[from]sea_orm::DbErr),
#[error("Dial Error: {0}")]
@ -18,10 +22,13 @@ pub enum Error {
Multiaddr(#[from] libp2p::multiaddr::Error),
#[error("Noise error: {0}")]
Noise(#[from] libp2p::noise::Error),
#[cfg(feature="desktop")]
#[error("Parse args error: {0}")]
ParseCommand(#[from] clap::Error),
#[error("toml deserialization error: {0}")]
TomlDe(#[from] toml::de::Error),
#[error("toml serialization error: {0}")]
TomlSer(#[from] toml::ser::Error),
TomlSer(#[from] toml::ser::Error),
#[error("Transport error: {0}")]
Transport(#[from]libp2p::TransportError<std::io::Error>)
}

View file

@ -0,0 +1,15 @@
use crate::{config::{P2pConfig, StorageConfig}, error::Error, global::GlobalConstant};
pub static STORAGE_CONFIG: GlobalConstant<StorageConfig> = GlobalConstant::const_new(stringify!(STORAGE_CONFIG));
pub static P2P_CONFIG: GlobalConstant<P2pConfig> = GlobalConstant::const_new(stringify!(P2P_CONFIG));
#[cfg(test)]
mod tests {
use crate::global::{config::P2P_CONFIG, STORAGE_CONFIG};
#[test]
fn test_global_constant_names() {
assert_eq!(STORAGE_CONFIG.name, stringify!(STORAGE_CONFIG));
assert_eq!(P2P_CONFIG.name, stringify!(P2P_CONFIG));
}
}

View file

@ -1,64 +0,0 @@
use std::path::Path;
use sea_orm::{ConnectOptions, Database, DbErr, DatabaseConnection};
use sea_orm_migration::MigratorTrait;
use crate::error::Error;
use tokio::sync::OnceCell;
use super::Global;
#[cfg(any(test, feature="test"))]
pub static TEST_MAIN_DATABASE_URL: std::sync::LazyLock<tempfile::TempPath> = std::sync::LazyLock::new(|| {
let mut temp_path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
temp_path.disable_cleanup(true);
println!("{}", temp_path.as_os_str().to_str().unwrap());
temp_path
});
#[cfg(any(test, feature="test"))]
pub static TEST_CACHE_DATABASE_URL: std::sync::LazyLock<tempfile::TempPath> = std::sync::LazyLock::new(|| {
let mut temp_path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
temp_path.disable_cleanup(true);
println!("{}", temp_path.as_os_str().to_str().unwrap());
temp_path
});
pub trait GlobalDatabase {
fn get_main_database(&self) -> Option<&DatabaseConnection>;
async fn get_or_try_init_main_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait
;
fn get_unwrapped_main_database(&self) -> &DatabaseConnection {
match self.get_main_database() {
Some(x) => x,
None => unreachable!("Error: global main database is not initialized!")
}
}
fn get_cache_database(&self) -> Option<&DatabaseConnection>;
async fn get_or_try_init_cache_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait
;
fn get_unwrapped_cache_database(&self) -> &DatabaseConnection {
match self.get_cache_database() {
Some(x) => x,
None => unreachable!("Error: global main database is not initialized!")
}
}
#[cfg(any(test, feature="test"))]
async fn get_or_try_init_temporary_main_database<T>(&self, migrator: T) -> Result<&DatabaseConnection, Error>
where
T: MigratorTrait,
{
self.get_or_try_init_main_database(&*TEST_MAIN_DATABASE_URL, migrator).await
}
#[cfg(any(test, feature="test"))]
async fn get_or_try_init_temporary_cache_database<T>(&self, migrator: T) -> Result<&DatabaseConnection, Error>
where
T: MigratorTrait,
{
self.get_or_try_init_cache_database(&*TEST_CACHE_DATABASE_URL, migrator).await
}
}

View file

@ -0,0 +1,55 @@
use std::path::Path;
use sea_orm::{ConnectOptions, Database, DbErr, DatabaseConnection};
use sea_orm_migration::MigratorTrait;
use crate::error::Error;
use tokio::sync::OnceCell;
pub static DATA_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new(stringify!(DATA_DATABASE_CONNECTION));
pub static CACHE_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new(stringify!(CACHE_DATABASE_CONNECTION));
pub struct GlobalDatabaseConnection {
name: &'static str,
inner: OnceCell<DatabaseConnection>
}
impl GlobalDatabaseConnection {
pub const fn const_new(name: &'static str) -> Self {
Self {
name: name,
inner: OnceCell::const_new()
}
}
pub fn get(&'static self) -> &'static DatabaseConnection {
self.inner.get().expect(&format!("{} is uninitialized!", self.name))
}
pub async fn get_or_init<T, U>(&'static self, path: T, _: U) -> &'static DatabaseConnection
where
T: AsRef<Path>,
U: MigratorTrait
{
let url = "sqlite://".to_string() + path.as_ref().to_str().unwrap() + "?mode=rwc";
self.inner.get_or_try_init(|| async {
let db = Database::connect(&url).await?;
U::up(&db, None).await?;
Ok::<DatabaseConnection, DbErr>(db)
}).await.expect(&format!("Fail to initialize {}!", self.name))
}
}
#[cfg(test)]
pub use tests::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::{cache::migration::CacheMigrator, data::migration::DataMigrator, global::STORAGE_CONFIG, tests::GlobalTestDefault};
pub async fn get_or_init_test_data_database() -> &'static DatabaseConnection{
DATA_DATABASE_CONNECTION.get_or_init(STORAGE_CONFIG.get_or_init_test_default().await.get_data_database_path(), DataMigrator).await
}
pub async fn get_or_init_test_cache_database() -> &'static DatabaseConnection{
CACHE_DATABASE_CONNECTION.get_or_init(STORAGE_CONFIG.get_or_init_test_default().await.get_cache_database_path(), CacheMigrator).await
}
}

View file

@ -1,14 +1,25 @@
use std::{collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::LazyLock};
use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::LazyLock};
use crate::{config::{NodeConfig, RawNodeConfig}, error::Error};
use crate::{config::{P2pConfig, PartialP2pConfig, StorageConfig}, error::Error };
#[cfg(any(test, feature="test"))]
use crate::tests::{GlobalTestDefault, TestDefault};
use futures::StreamExt;
use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId};
use sea_orm::{prelude::*, Database};
use sea_orm_migration::MigratorTrait;
use tokio::sync::{OnceCell, RwLock};
use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard};
mod database;
use database::GlobalDatabase;
mod peers;
pub use peers::PEERS;
mod config;
pub use config::STORAGE_CONFIG;
mod database_connection;
pub use database_connection::*;
use uuid::{ContextV7, Timestamp, Uuid};
pub fn generate_uuid() -> Uuid {
Uuid::new_v7(Timestamp::now(ContextV7::new()))
}
pub static PRODUCT_NAME: LazyLock<String> = LazyLock::new(|| {
env!("CARGO_PKG_NAME").to_string()
@ -24,124 +35,69 @@ pub static DEFAULT_CONFIG_FILE_NAME: LazyLock<PathBuf> = LazyLock::new(|| {
pub static DEFAULT_DATABASE_FILE_NAME: LazyLock<PathBuf> = LazyLock::new(|| {
PathBuf::from(String::new() + env!("CARGO_PKG_NAME") + ".sqlite")
});
pub static GLOBAL: Global = Global{
node_config: OnceCell::const_new(),
main_database: OnceCell::const_new(),
cache_database: OnceCell::const_new(),
peers: OnceCell::const_new(),
};
pub struct Global {
pub node_config: OnceCell<NodeConfig>,
pub main_database: OnceCell<DatabaseConnection>,
pub cache_database: OnceCell<DatabaseConnection>,
pub peers: OnceCell<RwLock<HashMap<PeerId, Multiaddr>>>,
fn uninitialized_message<T>(var: T) -> String {
format!("{} is uninitialized!", &stringify!(var))
}
impl Global {
pub fn get_node_config(&self) -> Option<&NodeConfig> {
self.node_config.get()
}
pub async fn get_or_init_node_config(&self, config: NodeConfig) -> &NodeConfig {
self.node_config.get_or_init(|| async {config}).await
}
pub async fn get_or_init_peers(&self) -> &RwLock<HashMap<PeerId, Multiaddr>> {
self.peers.get_or_init(|| async {
RwLock::new(HashMap::new())
}).await
}
pub async fn read_peers(&self) -> tokio::sync::RwLockReadGuard<'_, HashMap<PeerId, Multiaddr>>{
self.get_or_init_peers().await.read().await
}
pub async fn write_peers(&self) -> tokio::sync::RwLockWriteGuard<'_, HashMap<PeerId, Multiaddr>>{
self.get_or_init_peers().await.write().await
}
pub async fn launch_swarm(&self) -> Result<(), Error> {
let mut swarm = self.get_node_config().unwrap().clone().try_into_swarm().await?;
loop{
let swarm_event = swarm.select_next_some().await;
tokio::spawn(async move{
match swarm_event {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
SwarmEvent::Behaviour(event) => {
println!("{event:?}");
event.run().await;
},
_ => {}
}
});
pub struct GlobalConstant<T> {
pub name: &'static str,
inner: OnceCell<T>
}
impl<T> GlobalConstant<T> {
pub const fn const_new(name: &'static str ) -> Self {
Self{
name: name,
inner: OnceCell::const_new()
}
}
pub async fn get_or_init(&'static self, source: T) -> &'static T {
self.inner.get_or_init(|| async {
source
}).await
}
pub fn get(&'static self) -> Option<&'static T> {
self.inner.get()
}
pub fn get_and_unwrap(&'static self) -> &'static T {
self.get().expect(&format!("{} is uninitialized!", &stringify!(self)))
}
}
impl GlobalDatabase for Global {
fn get_main_database(&self) -> Option<&DatabaseConnection> {
self.main_database.get()
#[cfg(any(test, feature="test"))]
impl<T> GlobalTestDefault<T> for GlobalConstant<T>
where
T: TestDefault + 'static
{
async fn get_or_init_test_default(&'static self) -> &'static T {
self.get_or_init(T::test_default()).await
}
async fn get_or_try_init_main_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait,
{
let url = "sqlite://".to_string() + path.as_ref().to_str().unwrap() + "?mode=rwc";
Ok(self.main_database.get_or_try_init(|| async {
let db = Database::connect(&url).await?;
U::up(&db, None).await?;
Ok::<DatabaseConnection, DbErr>(db)
}).await?)
}
fn get_cache_database(&self) -> Option<&DatabaseConnection> {
self.cache_database.get()
}
async fn get_or_try_init_cache_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait,
{
let url = "sqlite://".to_string() + path.as_ref().to_str().unwrap() + "?mode=rwc";
Ok(self.cache_database.get_or_try_init(|| async {
let db = Database::connect(&url).await?;
U::up(&db, None).await?;
Ok::<DatabaseConnection, DbErr>(db)
}).await?)
}
}
struct GlobalRwLock<T> {
pub name: &'static str,
inner: OnceCell<RwLock<T>>
}
impl<T> GlobalRwLock<T> {
pub const fn const_new(name: &'static str) -> Self {
Self{
name: name,
inner: OnceCell::const_new()
}
}
pub fn get(&'static self) -> &'static RwLock<T> {
self.inner.get().expect(&format!("{} is uninitialized", self.name))
}
pub async fn write(&'static self) -> RwLockWriteGuard<'_ ,T> {
self.get().write().await
}
pub async fn read(&'static self) -> RwLockReadGuard<'_, T> {
self.get().read().await
}
}
#[cfg(test)]
pub use tests::{get_or_init_temporary_main_database, get_or_init_temporary_cache_database};
#[cfg(test)]
pub mod tests {
use std::sync::LazyLock;
mod tests {
use sea_orm_migration::MigratorTrait;
use crate::{global::GLOBAL, cache::migration::CacheMigrator, data::migration::MainMigrator};
use super::*;
pub async fn get_or_init_temporary_main_database() -> &'static DatabaseConnection {
GLOBAL.get_or_try_init_temporary_main_database(MainMigrator).await.unwrap()
}
pub async fn get_or_init_temporary_cache_database() -> &'static DatabaseConnection {
GLOBAL.get_or_try_init_temporary_cache_database(CacheMigrator).await.unwrap()
}
#[tokio::test]
async fn connect_main_database () {
let db = get_or_init_temporary_main_database().await;
assert!(db.ping().await.is_ok());
}
#[tokio::test]
async fn connect_cache_database () {
let db = get_or_init_temporary_cache_database().await;
assert!(db.ping().await.is_ok());
}
}
}

View file

@ -0,0 +1,11 @@
use std::collections::HashSet;
use libp2p::bytes::buf::UninitSlice;
use tokio::sync::{OnceCell, RwLock, RwLockReadGuard};
use crate::cache::entity::PeerModel;
use super::GlobalRwLock;
pub static PEERS: GlobalRwLock<HashSet<PeerModel>> = GlobalRwLock::const_new(stringify!(PEERS));

View file

@ -0,0 +1,4 @@
pub trait Message {
fn into_vec_u8(self) -> Vec<u8>;
fn from_vec_u8() -> Self;
}

View file

@ -3,6 +3,8 @@ pub mod config;
pub mod data;
pub mod error;
pub mod global;
pub mod ipc;
pub mod message;
pub mod migration;
pub mod p2p;
#[cfg(any(test, feature="test"))]

View file

@ -0,0 +1,15 @@
use serde::{de::DeserializeOwned, Serialize};
pub trait Message: DeserializeOwned + Sized + Serialize {
fn into_writer<W: std::io::Write>(&self, writer: W) -> Result<(), ciborium::ser::Error<std::io::Error>> {
ciborium::into_writer(self, writer)
}
fn into_vec_u8(&self) -> Result<Vec<u8>, ciborium::ser::Error<std::io::Error>> {
let mut buf: Vec<u8> = Vec::new();
self.into_writer(&mut buf)?;
Ok(buf)
}
fn from_reader<R: std::io::Read>(reader: R) -> Result<Self, ciborium::de::Error<std::io::Error>> {
ciborium::from_reader(reader)
}
}

View file

@ -1,6 +1,7 @@
use libp2p::{ identity::Keypair, mdns, ping, swarm};
use sea_orm::{ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter};
use crate::error::Error;
use crate::{cache::entity::{ActivePeerModel, PeerColumn, PeerEntity}, error::Error, global::{CACHE_DATABASE_CONNECTION, PEERS}};
#[derive(swarm::NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
@ -29,17 +30,21 @@ pub enum Event {
}
impl Event {
pub async fn run(self) {
pub async fn run(&self)
{
match self {
Self::Mdns(x) => {
match x {
mdns::Event::Discovered(e) => {
for peer in e {
let mut peers = crate::global::GLOBAL.write_peers().await;
peers.insert(peer.0, peer.1);
for peer in e.iter() {
match PeerEntity::find().filter(PeerColumn::PeerId.contains(&peer.0.to_string())).one(CACHE_DATABASE_CONNECTION.get()).await {
Ok(_) => {}
Err(_) => {
ActivePeerModel::new(peer.0.clone(), peer.1.clone())
.insert(CACHE_DATABASE_CONNECTION.get()).await;
}
}
}
let peers = crate::global::GLOBAL.read_peers().await;
println!("Peers: {peers:?}");
},
_ => {},
}

View file

@ -1,6 +1,12 @@
use std::{path::PathBuf, sync::LazyLock};
use sea_orm::{sea_query::{FromValueTuple, IntoValueTuple, ValueType}, ActiveModelBehavior, ActiveModelTrait, ColumnTrait, Condition, DatabaseConnection, EntityTrait, IntoActiveModel, ModelTrait, PrimaryKeyToColumn, PrimaryKeyTrait, Value};
use sea_orm::QueryFilter;
use tempfile::TempDir;
use crate::{ config::PartialConfig, message::Message};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub static TEST_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
let pkg_name = env!("CARGO_PKG_NAME");
@ -15,3 +21,28 @@ pub static TEST_DIR: LazyLock<PathBuf> = LazyLock::new(|| {
pub static TEST_DATABASE_PATH: std::sync::LazyLock<PathBuf> = std::sync::LazyLock::new(|| {
TEST_DIR_PATH.join("lazy-supplements.sqlite")
});
pub trait TestDefault {
fn test_default() -> Self;
}
pub trait GlobalTestDefault<T: 'static> {
async fn get_or_init_test_default(&'static self) -> &'static T;
}
pub fn test_cbor_serialize_deserialize<T>(src: T)
where T: DeserializeOwned + Serialize + PartialEq + std::fmt::Debug
{
let mut buf: Vec<u8> = Vec::new();
ciborium::into_writer(&src, &mut buf).unwrap();
let dst: T = ciborium::from_reader(buf.as_slice()).unwrap();
assert_eq!(src, dst);
}
pub fn test_toml_serialize_deserialize<T>(src: T)
where T: DeserializeOwned + Serialize + PartialEq + std::fmt::Debug
{
let buf = toml::to_string(&src).unwrap();
let dst: T = toml::from_str(&buf).unwrap();
assert_eq!(src, dst);
}

View file

@ -11,13 +11,15 @@ default = []
test = ["lazy-supplements-core/test"]
[dependencies]
clap = { version = "4.5.38", features = ["derive"] }
ciborium.workspace = true
clap.workspace = true
dirs = "6.0.0"
lazy-supplements-core.workspace = true
lazy-supplements-core = { workspace = true, features = ["desktop"] }
libp2p.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
lazy-supplements-core = {workspace = true, features = ["test"]}
lazy-supplements-core = {workspace = true, features = ["test"]}

View file

@ -1,19 +1,22 @@
use std::{net::IpAddr, path::PathBuf};
use clap::Args;
use lazy_supplements_core::config::RawNodeConfig;
use lazy_supplements_core::config::{PartialConfig, PartialCoreConfig};
use serde::{Deserialize, Serialize};
use crate::{config::NodeConfig, error::Error, global::{DEFAULT_CONFIG_FILE_PATH, DEFAULT_RAW_NODE_CONFIG}};
use crate::{config::{desktop::PartialDesktopConfig, CoreConfig}, error::Error, global::{DEFAULT_CONFIG_FILE_PATH, DEFAULT_PARTIAL_CORE_CONFIG,}};
#[derive(Args, Clone, Debug)]
pub struct ConfigArgs {
#[arg(long)]
pub config: Option<PathBuf>,
#[command(flatten)]
pub config_values: ConfigValueArgs,
pub core_config: PartialCoreConfig,
#[command(flatten)]
pub desktop_config: PartialDesktopConfig,
}
impl ConfigArgs {
pub fn get_config_path_or_default(&self) -> PathBuf {
if let Some(x) = self.config.as_ref() {
@ -22,33 +25,14 @@ impl ConfigArgs {
DEFAULT_CONFIG_FILE_PATH.to_path_buf()
}
}
pub async fn try_into_raw_node_config(self) -> Result<RawNodeConfig, Error> {
Ok(RawNodeConfig::read_from(self.get_config_path_or_default()).await? + self.config_values.into())
pub async fn try_into_partial_core_config(self) -> Result<PartialCoreConfig, Error> {
let mut config = PartialCoreConfig::read_from(self.get_config_path_or_default()).await?;
config.merge(self.core_config.into());
Ok(config)
}
pub async fn try_into_node_config(self) -> Result<NodeConfig, Error> {
Ok((DEFAULT_RAW_NODE_CONFIG.clone() + self.try_into_raw_node_config().await?).try_into()?)
pub async fn try_into_core_config(self) -> Result<CoreConfig, Error> {
let mut config = DEFAULT_PARTIAL_CORE_CONFIG.clone();
config.merge(self.try_into_partial_core_config().await?);
config.try_into()
}
}
#[derive(Args, Clone, Debug, Deserialize, Serialize)]
pub struct ConfigValueArgs {
#[arg(skip)]
pub secret: Option<String>,
#[arg(long)]
pub database_path: Option<PathBuf>,
#[arg(long)]
pub listen_ips: Option<Vec<IpAddr>>,
#[arg(long)]
pub port: Option<u16>,
}
impl Into<RawNodeConfig> for ConfigValueArgs {
fn into(self) -> RawNodeConfig {
RawNodeConfig {
secret : self.secret,
database_path: self.database_path,
listen_ips: self.listen_ips,
port: self.port
}
}
}

View file

@ -5,7 +5,7 @@ use libp2p::{
multiaddr::Protocol, noise, ping, swarm::SwarmEvent, tcp, yamux, Multiaddr, PeerId
};
use crate::{cli::ServerArgs, error::{CoreError, DesktopError, Error}};
use crate::{cli::ServerArgs, error::Error};
use super::ConfigArgs;

View file

@ -1,7 +1,7 @@
use clap::Args;
use libp2p::{noise, ping, swarm::{NetworkBehaviour, SwarmEvent}, tcp, yamux, Swarm};
use crate::{error::Error, global::GLOBAL, error::CoreError};
use crate::{error::Error, global::GLOBAL};
use super::ConfigArgs;
@ -12,7 +12,7 @@ pub struct ServerArgs {
}
impl ServerArgs {
pub async fn start_server(self) -> Result<(), Error>{
let _ = crate::global::GLOBAL.get_or_init_node_config(self.config.try_into_node_config().await?).await;
GLOBAL.launch_swarm().await.or_else(|e| {Err(Error::from(CoreError::from(e)))})
let _ = crate::global::GLOBAL.get_or_init_core_config(self.config.try_into_core_config().await?).await;
GLOBAL.launch_swarm().await
}
}

View file

@ -0,0 +1,15 @@
#[cfg(unix)]
pub mod unix;
#[cfg(windows)]
pub mod windows;
pub mod desktop;
pub use lazy_supplements_core::config::*;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
pub use windows::*;

View file

@ -0,0 +1,48 @@
use std::path::PathBuf;
use clap::Args;
use lazy_supplements_core::config::PartialConfig;
use libp2p::mdns::Config;
use serde::{Deserialize, Serialize};
use crate::config::error::ConfigError;
pub struct UnixConfig {
pub socket_path: PathBuf,
}
impl TryFrom<PartialUnixConfig> for UnixConfig {
type Error = ConfigError;
fn try_from(config: PartialUnixConfig) -> Result<Self, Self::Error> {
Ok(Self{
socket_path: config.socket_path.ok_or(ConfigError::MissingConfig("socket_path".to_string()))?
})
}
}
#[derive(Args, Clone, Debug, Deserialize, Serialize)]
pub struct PartialUnixConfig {
pub socket_path: Option<PathBuf>,
}
impl From<UnixConfig> for PartialUnixConfig {
fn from(source: UnixConfig) -> Self {
Self {
socket_path: Some(source.socket_path)
}
}
}
impl PartialConfig for PartialUnixConfig {
fn empty() -> Self {
Self { socket_path: None }
}
fn default() -> Self {
todo!()
}
fn merge(&mut self, other: Self) {
if let Some(x) = other.socket_path {
self.socket_path = Some(x);
};
}
}

View file

@ -0,0 +1,3 @@
pub struct WindowsConfig {
pub pipe_name: String
}

View file

@ -1,16 +0,0 @@
pub use lazy_supplements_core::error::Error as CoreError;
#[derive(thiserror::Error, Debug)]
pub enum DesktopError {
#[error("Parse args error: {0}")]
ParseCommand(#[from] clap::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("{0}")]
Core(#[from] CoreError),
#[error("{0}")]
Desktop(#[from] DesktopError),
}

View file

@ -1,6 +1,6 @@
use std::{path::PathBuf, sync::LazyLock};
use lazy_supplements_core::config::RawNodeConfig;
use lazy_supplements_core::config::PartialCoreConfig;
pub use lazy_supplements_core::global::*;
pub static DEFAULT_DATA_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
@ -29,11 +29,122 @@ pub static DEFAULT_DATABASE_FILE_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
DEFAULT_DATA_DIR_PATH.join(&*DEFAULT_DATABASE_FILE_NAME)
});
pub static DEFAULT_RAW_NODE_CONFIG: LazyLock<RawNodeConfig> = LazyLock::new(|| {
RawNodeConfig {
pub static DEFAULT_PARTIAL_CORE_CONFIG: LazyLock<PartialCoreConfig> = LazyLock::new(|| {
PartialCoreConfig {
secret: None,
database_path: Some(DEFAULT_DATABASE_FILE_PATH.to_path_buf()),
listen_ips: Some(DEFAULT_LISTEN_IPS.to_vec()),
port: Some(0),
}
});
});
pub struct Global {
pub p2p_config: OnceCell<P2pConfig>,
pub main_database: OnceCell<DatabaseConnection>,
pub cache_database: OnceCell<DatabaseConnection>,
pub peers: OnceCell<RwLock<HashMap<PeerId, Multiaddr>>>,
}
impl Global {
pub fn get_p2p_config(&self) -> Option<&P2pConfig> {
self.p2p_config.get()
}
pub async fn get_or_init_p2p_config(&self, config: P2pConfig) -> &P2pConfig {
self.p2p_config.get_or_init(|| async {config}).await
}
pub async fn get_or_init_peers(&self) -> &RwLock<HashMap<PeerId, Multiaddr>> {
self.peers.get_or_init(|| async {
RwLock::new(HashMap::new())
}).await
}
pub async fn read_peers(&self) -> tokio::sync::RwLockReadGuard<'_, HashMap<PeerId, Multiaddr>>{
self.get_or_init_peers().await.read().await
}
pub async fn write_peers(&self) -> tokio::sync::RwLockWriteGuard<'_, HashMap<PeerId, Multiaddr>>{
self.get_or_init_peers().await.write().await
}
pub async fn launch_swarm(&self) -> Result<(), Error> {
let mut swarm = self.get_p2p_config().unwrap().clone().try_into_swarm().await?;
loop{
let swarm_event = swarm.select_next_some().await;
tokio::spawn(async move{
match swarm_event {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
SwarmEvent::Behaviour(event) => {
println!("{event:?}");
event.run().await;
},
_ => {}
}
});
}
}
}
impl GlobalDatabase for Global {
fn get_main_database(&self) -> Option<&DatabaseConnection> {
self.main_database.get()
}
async fn get_or_try_init_main_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait,
{
let url = "sqlite://".to_string() + path.as_ref().to_str().unwrap() + "?mode=rwc";
Ok(self.main_database.get_or_try_init(|| async {
let db = Database::connect(&url).await?;
U::up(&db, None).await?;
Ok::<DatabaseConnection, DbErr>(db)
}).await?)
}
fn get_cache_database(&self) -> Option<&DatabaseConnection> {
self.cache_database.get()
}
async fn get_or_try_init_cache_database<T, U>(&self, path: T, _: U) -> Result<&DatabaseConnection, Error>
where
T: AsRef<Path>,
U: MigratorTrait,
{
let url = "sqlite://".to_string() + path.as_ref().to_str().unwrap() + "?mode=rwc";
Ok(self.cache_database.get_or_try_init(|| async {
let db = Database::connect(&url).await?;
U::up(&db, None).await?;
Ok::<DatabaseConnection, DbErr>(db)
}).await?)
}
}
#[cfg(test)]
pub use tests::{get_or_init_temporary_main_database, get_or_init_temporary_cache_database};
#[cfg(test)]
pub mod tests {
use std::sync::LazyLock;
use sea_orm_migration::MigratorTrait;
use crate::{global::GLOBAL, cache::migration::CacheMigrator, data::migration::MainMigrator};
use super::*;
pub async fn get_or_init_temporary_main_database() -> &'static DatabaseConnection {
GLOBAL.get_or_try_init_temporary_main_database(MainMigrator).await.unwrap()
}
pub async fn get_or_init_temporary_cache_database() -> &'static DatabaseConnection {
GLOBAL.get_or_try_init_temporary_cache_database(CacheMigrator).await.unwrap()
}
#[tokio::test]
async fn connect_main_database () {
let db = get_or_init_temporary_main_database().await;
assert!(db.ping().await.is_ok());
}
#[tokio::test]
async fn connect_cache_database () {
let db = get_or_init_temporary_cache_database().await;
assert!(db.ping().await.is_ok());
}
}

View file

@ -0,0 +1,11 @@
#[cfg(unix)]
pub mod unix;
#[cfg(windows)]
pub mod windows;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
pub use windows::*;

View file

@ -0,0 +1,50 @@
use std::path::Path;
use tokio::{io::Interest, net::UnixStream};
use crate::{
error::Error,
ipc::message::{Request, Response, ResponseContent},
};
pub async fn request<T, U>(path: T, request: U) -> Result<ResponseContent, Error>
where
T: AsRef<Path>,
U: Into<Request>
{
let stream = UnixStream::connect(path).await?;
let ready = stream.ready(Interest::WRITABLE).await?;
let request: Request = request.into();
let mut response_buf = Vec::new();
if let Err(e) = ciborium::into_writer(&request, &mut response_buf) {
todo!();
};
match stream.try_write(&response_buf) {
Ok(x) => {
println!("write {} bytes", x)
}
Err(e) => {
return Err(e.into())
}
}
loop {
let ready_write = stream.ready(Interest::READABLE).await?;
let mut read_buf : Vec<u8> = Vec::new();
match stream.try_read_buf(&mut read_buf) {
Ok(x) => {
println!("read {} bytes", x)
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into())
}
}
let mut buf : Vec<u8> = Vec::new();
let response: Response = ciborium::from_reader_with_buffer(read_buf.as_slice(), &mut buf)?;
if response.id == request.id {
return Ok(response.content)
}
}
}

View file

@ -0,0 +1,4 @@
mod response;
mod request;
pub use response::*;
pub use request::*;

View file

@ -0,0 +1,24 @@
use lazy_supplements_core::global::generate_uuid;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Deserialize, Serialize)]
pub struct Request {
pub id: Uuid,
pub content: RequestContent,
}
impl From<RequestContent> for Request {
fn from(c: RequestContent) -> Self {
Self{
id: generate_uuid(),
content: c
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub enum RequestContent {
Ping,
ListPeers,
}

View file

@ -0,0 +1,27 @@
use lazy_supplements_core::{
global::generate_uuid,
cache::entity::PeerModel,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Deserialize, Serialize)]
pub struct Response {
pub id: Uuid,
pub content: ResponseContent,
}
impl From<ResponseContent> for Response {
fn from(c: ResponseContent) -> Self {
Self{
id: generate_uuid(),
content: c
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub enum ResponseContent {
Pong,
ListPeers(Vec<PeerModel>)
}

View file

@ -0,0 +1,3 @@
pub mod client;
pub mod server;
pub mod message;

View file

@ -0,0 +1,11 @@
#[cfg(unix)]
pub mod unix;
#[cfg(windows)]
pub mod windows;
#[cfg(unix)]
pub use unix::*;
#[cfg(windows)]
pub use windows::*;

View file

@ -0,0 +1,69 @@
use std::{collections::VecDeque, path::Path, sync::Arc};
use lazy_supplements_core::error::Error;
use tokio::{io::Interest, net::UnixStream, sync::Mutex};
use crate::ipc::message::{RequestContent, Response, ResponseContent};
pub async fn listen<T>(path: T) -> Result<(), Error>
where T: AsRef<Path> {
let stream = UnixStream::connect(path).await?;
let write_que: Arc<Mutex<VecDeque<Vec<u8>>>> = Arc::new(Mutex::new(VecDeque::new()));
let mut write_next: Option<Vec<u8>> = None;
loop {
let ready = stream.ready(Interest::READABLE).await?;
if ready.is_readable() {
let mut data = Vec::new();
match stream.try_read(&mut data) {
Ok(x) => {
println!("read {} bytes", x)
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into())
}
}
let write_que2 = write_que.clone();
tokio::spawn( async move {
let mut buf = Vec::new();
let request: crate::ipc::message::Request = ciborium::from_reader_with_buffer(data.as_slice(), &mut buf).unwrap();
let response_id = request.id;
let response_content: ResponseContent = match request.content {
RequestContent::Ping => {
ResponseContent::Pong
}
RequestContent::ListPeers => todo!(),
};
let mut response_buf = Vec::new();
if let Err(e) = ciborium::into_writer(&Response{
id: response_id,
content: response_content,
}, &mut response_buf) {
todo!();
};
let mut que = write_que2.lock().await;
que.push_back(response_buf);
});
} else if ready.is_writable() {
if let Some(x) = write_next.take() {
match stream.try_write(&x) {
Ok(x) => {
println!("write {} bytes", x)
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into())
}
}
}
}
let mut locked_que = write_que.lock().await;
write_next = locked_que.pop_front();
}
}

View file

@ -1,8 +1,9 @@
pub mod cli;
pub mod error;
pub mod config;
pub mod global;
pub mod ipc;
pub use lazy_supplements_core::{
cache,
config,
data,
};
error,
};

View file

@ -19,7 +19,7 @@ enum Command {
#[tokio::main]
async fn main() {
let cli = Cli::parse();
let _ = GLOBAL.get_or_init_node_config(cli.config.try_into_node_config().await.unwrap()).await;
let _ = GLOBAL.get_or_init_core_config(cli.config.try_into_core_config().await.unwrap()).await;
match cli.command {
Command::Node(x) => x.run().await.unwrap(),
Command::Server(x) => x.start_server().await.unwrap(),