diff --git a/core/src/cache/entity/cached_address.rs b/core/src/cache/entity/cached_address.rs deleted file mode 100644 index 4826631..0000000 --- a/core/src/cache/entity/cached_address.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::str::FromStr; - -use chrono::{Days, Local}; -use libp2p::{multiaddr, Multiaddr, PeerId}; -use prost_types::Timestamp; -use sea_orm::{entity::{ - prelude::*, * -}, sea_query}; -use serde::{Deserialize, Serialize}; - -use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, utils::utc_to_timestamp}; - - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)] -#[sea_orm(table_name = "cached_address")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: u32, - #[sea_orm(indexed)] - pub created_at: DateTimeUtc, - #[sea_orm(indexed)] - pub updated_at: DateTimeUtc, - #[sea_orm(indexed)] - pub cached_peer_id: u32, - #[sea_orm(indexed)] - pub multiaddress: MultiaddrValue, -} - - -#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::CachedPeerEntity", - from = "Column::CachedPeerId", - to = "super::CachedPeerColumn::Id" - )] - CachedPeer, -} -impl Related for Entity { - fn to() -> RelationDef { - Relation::CachedPeer.def() - } -} - -impl ActiveModelBehavior for ActiveModel {} - -impl ActiveModel { - pub fn new(cached_peer_id: u32, multiaddr: Multiaddr) -> Self { - let timestamp: DateTimeUtc = Local::now().to_utc(); - Self{ - cached_peer_id: Set(cached_peer_id), - multiaddress: Set(MultiaddrValue::from(multiaddr)), - created_at: Set(timestamp), - updated_at: Set(timestamp), - ..Default::default() - } - } -} - diff --git a/core/src/cache/entity/cached_peer.rs b/core/src/cache/entity/cached_peer.rs deleted file mode 100644 index 0920e45..0000000 --- a/core/src/cache/entity/cached_peer.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::str::FromStr; - -use chrono::{Days, Local}; -use libp2p::{multiaddr, Multiaddr, PeerId}; -use sea_orm::{entity::{ - prelude::*, * -}, sea_query}; -use serde::{Deserialize, Serialize}; - -use crate::data::value::{MultiaddrValue, PeerIdValue}; - - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)] -#[sea_orm(table_name = "cached_peer")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: u32, - #[sea_orm(indexed)] - pub created_at: DateTimeUtc, - #[sea_orm(indexed)] - pub updated_at: DateTimeUtc, - #[sea_orm(indexed)] - pub peer_id: PeerIdValue, -} - - -#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)] -pub enum Relation { - #[sea_orm(has_many = "super::CachedAddressEntity")] - CachedAddress, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::CachedAddress.def() - } -} - -impl ActiveModelBehavior for ActiveModel {} - -impl ActiveModel { - pub fn new(peer_id: PeerId) -> Self { - let timestamp: DateTimeUtc = Local::now().to_utc(); - Self{ - peer_id: Set(PeerIdValue::from(peer_id)), - created_at: Set(timestamp), - updated_at: Set(timestamp), - ..Default::default() - } - } -} - -impl Entity { - pub fn find_by_peer_id(peer_id: PeerId) -> Select { - Self::find().filter(Column::PeerId.eq(PeerIdValue::from(peer_id))) - } -} \ No newline at end of file diff --git a/core/src/cache/entity/mod.rs b/core/src/cache/entity/mod.rs deleted file mode 100644 index 7e783c0..0000000 --- a/core/src/cache/entity/mod.rs +++ /dev/null @@ -1,48 +0,0 @@ -mod cached_peer; -mod cached_address; - -pub use cached_peer::{ - ActiveModel as CachedPeerActiveModel, - Column as CachedPeerColumn, - Model as CachedPeerModel, - Entity as CachedPeerEntity, -}; - -pub use cached_address::{ - ActiveModel as CachedAddressActiveModel, - Column as CachedAddressColumn, - Model as CachedAddressModel, - Entity as CachedAddressEntity, -}; - - -#[cfg(test)] -mod tests { - use std::net::Ipv4Addr; - - use crate::{cache::entity::cached_peer, data::migration::DataMigrator, global::{DATABASE_CONNECTIONS}, tests::TEST_CONFIG}; - - use super::*; - - use libp2p::{identity::{self, Keypair}, multiaddr, swarm::handler::multi, Multiaddr, PeerId}; - use sea_orm::ActiveModelTrait; - - - - #[tokio::test] - async fn insert() { - - let db = DATABASE_CONNECTIONS.get_or_init_unchecked(&*TEST_CONFIG, DataMigrator).await.cache; - let peer_id = Keypair::generate_ed25519().public().to_peer_id(); - let multiaddr = Multiaddr::empty() - .with(Ipv4Addr::new(127,0,0,1).into()) - .with(multiaddr::Protocol::Tcp(0)); - let inserted_cached_peer: CachedPeerModel = CachedPeerActiveModel::new(peer_id.clone()) - .insert(db).await.unwrap(); - let inserted_cached_address: CachedAddressModel = CachedAddressActiveModel::new(inserted_cached_peer.id, multiaddr.clone()) - .insert(db).await.unwrap(); - assert_eq!(PeerId::from(inserted_cached_peer.peer_id), peer_id); - assert_eq!(Multiaddr::from(inserted_cached_address.multiaddress), multiaddr); - } - -} \ No newline at end of file diff --git a/core/src/cache/migration/m20220101_000001_create_cache_tables.rs b/core/src/cache/migration/m20220101_000001_create_cache_tables.rs deleted file mode 100644 index 09bdfd1..0000000 --- a/core/src/cache/migration/m20220101_000001_create_cache_tables.rs +++ /dev/null @@ -1,148 +0,0 @@ -use sea_orm_migration::{prelude::*, schema::*}; - -use crate::migration::TableMigration; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - CachedPeer::up(manager).await?; - CachedAddress::up(manager).await?; - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - CachedAddress::down(manager).await?; - CachedPeer::down(manager).await?; - Ok(()) - } -} - -#[derive(DeriveIden, DeriveMigrationName)] -enum CachedPeer { - Table, - Id, - PeerId, - CreatedAt, - UpdatedAt, -} - -static IDX_CACHED_PEER_PEER_ID: &str = "idx_cached_peer_peer_id"; -static IDX_CACHED_PEER_CREATED_AT: &str = "idx_cached_peer_created_at"; -static IDX_CACHED_PEER_UPDATED_AT: &str = "idx_cached_peer_updated_at"; - -#[async_trait::async_trait] -impl TableMigration for CachedPeer { - async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> { - manager.create_table( - Table::create() - .table(Self::Table) - .if_not_exists() - .col(pk_auto(Self::Id)) - .col(string_len(Self::PeerId, 255)) - .col(timestamp(Self::CreatedAt)) - .col(timestamp(Self::UpdatedAt)) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_PEER_PEER_ID) - .table(Self::Table) - .col(Self::PeerId) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_PEER_CREATED_AT) - .table(Self::Table) - .col(Self::CreatedAt) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_PEER_UPDATED_AT) - .table(Self::Table) - .col(Self::UpdatedAt) - .to_owned() - ).await?; - Ok(()) - } - async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{ - manager.drop_table(Table::drop().table(Self::Table).to_owned()).await - } -} - -#[derive(DeriveIden, DeriveMigrationName)] -enum CachedAddress { - Table, - Id, - CachedPeerId, - CreatedAt, - UpdatedAt, - Multiaddress, -} - -static IDX_CACHED_ADDRESS_MULTIADDRESS: &str = "idx_cached_address_multiaddress"; -static IDX_CACHED_ADDRESS_CACHED_PEER_ID: &str = "idx_cached_address_cached_peer_id"; -static IDX_CACHED_ADDRESS_CREATED_AT: &str = "idx_cached_address_created_at"; -static IDX_CACHED_ADDRESS_UPDATED_AT: &str = "idx_cached_address_updated_at"; -static FK_CACHED_ADDRESS_CACHED_PEER: &str = "fk_cached_address_cached_peer"; - -#[async_trait::async_trait] -impl TableMigration for CachedAddress { - async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> { - manager.create_table( - Table::create() - .table(Self::Table) - .if_not_exists() - .col(pk_auto(Self::Id)) - .col(integer(Self::CachedPeerId)) - .foreign_key(ForeignKey::create() - .name(FK_CACHED_ADDRESS_CACHED_PEER) - .from(Self::Table,Self::CachedPeerId) - .to(CachedPeer::Table, CachedPeer::Id) - .on_delete(ForeignKeyAction::Cascade) - .on_update(ForeignKeyAction::Cascade) - ) - .col(timestamp(Self::CreatedAt)) - .col(timestamp(Self::UpdatedAt)) - .col(text_uniq(Self::Multiaddress)) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_ADDRESS_CACHED_PEER_ID) - .table(Self::Table) - .col(Self::CachedPeerId) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_ADDRESS_MULTIADDRESS) - .table(Self::Table) - .col(Self::Multiaddress) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_ADDRESS_CREATED_AT) - .table(Self::Table) - .col(Self::CreatedAt) - .to_owned() - ).await?; - manager.create_index( - Index::create() - .name(IDX_CACHED_ADDRESS_UPDATED_AT) - .table(Self::Table) - .col(Self::UpdatedAt) - .to_owned() - ).await?; - - Ok(()) - } - async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{ - manager.drop_table(Table::drop().table(Self::Table).to_owned()).await - } -} \ No newline at end of file diff --git a/core/src/cache/migration/mod.rs b/core/src/cache/migration/mod.rs deleted file mode 100644 index 87f27fd..0000000 --- a/core/src/cache/migration/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -use sea_orm_migration::prelude::*; - -pub mod m20220101_000001_create_cache_tables; - -pub struct CacheMigrator; - -#[async_trait::async_trait] -impl MigratorTrait for CacheMigrator { - fn migrations() -> Vec> { - vec![Box::new(m20220101_000001_create_cache_tables::Migration)] - } -} diff --git a/core/src/cache/mod.rs b/core/src/cache/mod.rs deleted file mode 100644 index 0bfdf3e..0000000 --- a/core/src/cache/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod entity; -pub mod migration; \ No newline at end of file diff --git a/core/src/config/mod.rs b/core/src/config/mod.rs index 9a3d280..b7074dc 100644 --- a/core/src/config/mod.rs +++ b/core/src/config/mod.rs @@ -8,7 +8,7 @@ use crate::{utils::{emptiable::Emptiable, mergeable::Mergeable}}; pub use error::ConfigError; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}}; +use tokio::{io::{AsyncReadExt, AsyncWriteExt}}; pub use storage::{StorageConfig, PartialStorageConfig}; pub use p2p::{P2pConfig, PartialP2pConfig}; pub use rpc::*; diff --git a/core/src/config/storage.rs b/core/src/config/storage.rs index 642bf0a..5ae99bb 100644 --- a/core/src/config/storage.rs +++ b/core/src/config/storage.rs @@ -15,6 +15,12 @@ pub struct StorageConfig { pub cache_directory: PathBuf, } +impl StorageConfig { + pub fn get_local_database_path(&self) -> PathBuf { + self.data_directory.join("local.sqlite") + } +} + impl TryFrom for StorageConfig { type Error = ConfigError; @@ -83,6 +89,7 @@ impl PartialStorageConfig { } + } impl From for PartialStorageConfig { diff --git a/core/src/data/local/migration.rs b/core/src/data/local/migration.rs deleted file mode 100644 index adfc08a..0000000 --- a/core/src/data/local/migration.rs +++ /dev/null @@ -1,45 +0,0 @@ -use rusqlite::{ffi::Error, Connection}; -use tracing::{event, Level}; - - -pub fn migrate(con: &mut Connection) -> Result<(), Error>{ - let version: u32 = con.pragma_query_value(None,"user_version", |row| row.get(0))?; - if version < 1 { - event!(Level::INFO, "Migrate local db to version 1"); - let tx = con.transaction()?; - tx.execute( - "CREATE TABLE known_peer ( - id INTEGER PRIMARY KEY, - peer_id TEXT UNIQUE NOT NULL, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - )", - () - )?; - tx.execute( - "CREATE TABLE known_address ( - id INTEGER PRIMARY KEY, - known_peer_id INTEGER NOT NULL, - multiaddr TEXT UNIQUE NOT NULL, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - protocol TEXT NOT NULL, - FOREIGN KEY(knwon_peer_id) REFERENCES knwon_peer(id), - )", - () - )?; - tx.execute( - "CREATE TABLE authorized_peer ( - id INTEGER PRIMARY KEY, - known_peer_id INTEGER NOT NULL UNIQUE, - synced_at TEXT, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - FOREIGN KEY(known_peer_id) REFERENCES knwon_peer(id)", - () - )?; - tx.pragma_update(None, "user_version", 1)?; - tx.commit()?; - event!(Level::INFO, "Migration done.") - } -} \ No newline at end of file diff --git a/core/src/data/local/migration/mod.rs b/core/src/data/local/migration/mod.rs new file mode 100644 index 0000000..209a64f --- /dev/null +++ b/core/src/data/local/migration/mod.rs @@ -0,0 +1,14 @@ +mod v1; + +use rusqlite::{ffi::Error, Connection}; +use tracing::{event, Level}; + +pub fn migrate(con: &Connection) -> Result<(), Error>{ + let version: u32 = con.pragma_query_value(None,"user_version", |row| row.get(0)).expect("Failed to get user_version"); + if version < 1 { + event!(Level::INFO, "Migrate local db to version 1"); + v1::migrate(con)?; + event!(Level::INFO, "Migration done."); + } + Ok(()) +} \ No newline at end of file diff --git a/core/src/data/local/migration/v1.rs b/core/src/data/local/migration/v1.rs new file mode 100644 index 0000000..1b71d4d --- /dev/null +++ b/core/src/data/local/migration/v1.rs @@ -0,0 +1,55 @@ +use rusqlite::{ffi::Error, Connection}; + +pub fn migrate(con: &Connection) -> Result<(), Error>{ + let tx = con.transaction()?; + tx.execute( + "CREATE TABLE peer ( + id INTEGER PRIMARY KEY, + libp2p_peer_id TEXT UNIQUE NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + )", + () + )?; + tx.execute( + "CREATE INDEX idx_peer_created_at ON peer(created_at)", + () + )?; + tx.execute( + "CREATE INDEX idx_peer_updated_at ON peer(updated_at)", + () + )?; + tx.execute( + "CREATE TABLE address ( + id INTEGER PRIMARY KEY, + peer_id INTEGER NOT NULL, + multiaddr TEXT UNIQUE NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + protocol TEXT NOT NULL, + FOREIGN KEY(peer_id) REFERENCES peer(id), + )", + () + )?; + tx.execute( + "CREATE INDEX idx_address_created_at ON address(created_at)", + () + )?; + tx.execute( + "CREATE INDEX idx_address_updated_at ON address(updated_at)", + () + )?; + tx.execute( + "CREATE TABLE authorized_peer ( + id INTEGER PRIMARY KEY, + peer_id INTEGER NOT NULL UNIQUE, + synced_at TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(peer_id) REFERENCES peer(id)", + () + )?; + + tx.pragma_update(None, "user_version", 1)?; + tx.commit()?; +} \ No newline at end of file diff --git a/core/src/data/local/mod.rs b/core/src/data/local/mod.rs index 61dbed5..de01ac2 100644 --- a/core/src/data/local/mod.rs +++ b/core/src/data/local/mod.rs @@ -1 +1,59 @@ -pub mod migration; \ No newline at end of file +pub mod migration; + +use std::{cell::OnceCell, path::Path, sync::{LazyLock, OnceLock}}; + +use migration::migrate; +use rusqlite::{ffi::Error, Connection}; + +use crate::{config::StorageConfig, global::CONFIG}; + +static INITIALIZE_PARENT_DIRECTORY_RESULT: OnceLock<()> = OnceLock::new(); + +static MIGRATE_RESULT: OnceLock<()> = OnceLock::new(); + +fn initialize_parent_directory

(path: &P) +where + P: AsRef, +{ + *INITIALIZE_PARENT_DIRECTORY_RESULT.get_or_init(|| { + let path2: &Path = path.as_ref(); + if let Some(x) = path2.parent() { + if !x.exists() { + std::fs::create_dir_all(x).expect("Parent directory of the local database must be created."); + } + } + }) +} + +fn migrate_once(conn: &Connection) -> () { + *MIGRATE_RESULT.get_or_init(|| { + migrate(conn).expect("Local database migration should be done correctly") + }) + +} +pub trait LocalDatabaseConnection { + fn from_path

(path: &P) -> Self + where + P: AsRef; + fn from_storage_config(config: &T) -> Self + where + T: AsRef + { + Self::from_path(&config.as_ref().get_local_database_path()) + } + fn from_global_storage_config() -> Self { + Self::from_storage_config(CONFIG.get_unchecked()) + } +} + +impl LocalDatabaseConnection for Connection { + fn from_path

(path: &P) -> Self + where + P: AsRef + { + initialize_parent_directory(path); + let conn = Connection::open(path).expect("local database connection must be opened without error"); + migrate_once(&conn); + conn + } +} \ No newline at end of file diff --git a/core/src/error.rs b/core/src/error.rs index e1836cf..025232c 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -10,8 +10,6 @@ pub enum Error { CiborSerialize(#[from] ciborium::ser::Error), #[error("Config error: {0}")] Config(#[from] crate::config::error::ConfigError), - #[error("DB Error: {0}")] - Db(#[from]sea_orm::DbErr), #[error("Dial Error: {0}")] Dial(#[from] libp2p::swarm::DialError), #[error("Decoding identity error: {0}")] diff --git a/core/src/global/database_connection.rs b/core/src/global/database_connection.rs deleted file mode 100644 index d1a7e05..0000000 --- a/core/src/global/database_connection.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{path::{Path, PathBuf}, sync::OnceLock}; - -use dirs::cache_dir; -use rusqlite::Connection; -use crate::{cache::migration::CacheMigrator, config::StorageConfig, data::local::migration::migrate, error::Error}; -use tokio::sync::OnceCell; - -pub static LOCAL_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new(); - -pub struct GlobalDatabaseConnection { - inner: OnceLock -} - -impl GlobalDatabaseConnection { - pub const fn const_new() -> Self { - Self { - inner: OnceLock::new() - } - } - - pub fn get(&'static self) -> Option<&'static Connection> { - self.inner.get() - } - - pub fn get_unchecked(&'static self) -> &'static Connection { - self.get().expect("local data database connection should initialized before access!") - } - - fn get_file_path(config: &T) -> PathBuf - where - T: AsRef - { - config.as_ref().data_directory.join("local.sqlite") - } - - pub async fn get_or_init_unchecked(&'static self, config: T) -> Connection - where - T: AsRef, - { - let path = Self::get_file_path(&config); - if let Some(x) = path.parent() { - std::fs::create_dir_all(x).expect("Failed to create directory for local database"); - } - self.inner.get_or_init(|| { - let db = Connection::open(path)?; - migrate(&db).unwrap(); - db - }) - - } -} - -#[cfg(test)] -pub use tests::*; - - -#[cfg(test)] -mod tests { - use super::*; - use crate::{cache::migration::CacheMigrator, global::CONFIG, tests::{TEST_CONFIG}}; - - #[tokio::test] - pub async fn get_or_init_database() { - LOCAL_DATABASE_CONNECTION.get_or_init_unchecked(&*TEST_CONFIG).await; - } -} diff --git a/core/src/global/mod.rs b/core/src/global/mod.rs index 0c9727e..db09e28 100644 --- a/core/src/global/mod.rs +++ b/core/src/global/mod.rs @@ -2,14 +2,11 @@ use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{ use crate::{config::{P2pConfig, PartialP2pConfig, StorageConfig}, error::Error }; use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId}; -use sea_orm::{prelude::*, Database}; -use sea_orm_migration::MigratorTrait; use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard}; mod config; pub use config::*; mod database_connection; -pub use database_connection::*; use uuid::{ContextV7, Timestamp, Uuid}; pub fn generate_uuid() -> Uuid { diff --git a/core/src/lib.rs b/core/src/lib.rs index 65b33ee..e88f75c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,10 +1,7 @@ -pub mod cache; pub mod config; pub mod data; pub mod error; pub mod global; -pub mod message; -pub mod migration; pub mod p2p; pub mod proto; pub mod rpc; diff --git a/core/src/tests.rs b/core/src/tests.rs index f6b7b54..ba19fa7 100644 --- a/core/src/tests.rs +++ b/core/src/tests.rs @@ -1,10 +1,8 @@ use std::{path::PathBuf, sync::LazyLock}; -use sea_orm::{sea_query::{FromValueTuple, IntoValueTuple, ValueType}, ActiveModelBehavior, ActiveModelTrait, ColumnTrait, Condition, DatabaseConnection, EntityTrait, IntoActiveModel, ModelTrait, PrimaryKeyToColumn, PrimaryKeyTrait, Value}; -use sea_orm::QueryFilter; use tempfile::TempDir; use url::Url; -use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}, message::Message}; +use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}}; use serde::{de::DeserializeOwned, Deserialize, Serialize};