Update migration

This commit is contained in:
fluo10 2025-09-03 06:20:34 +09:00
parent 92fe596fd8
commit 731817d20c
17 changed files with 137 additions and 450 deletions

View file

@ -1,59 +0,0 @@
use std::str::FromStr;
use chrono::{Days, Local};
use libp2p::{multiaddr, Multiaddr, PeerId};
use prost_types::Timestamp;
use sea_orm::{entity::{
prelude::*, *
}, sea_query};
use serde::{Deserialize, Serialize};
use crate::{cache, data::value::{MultiaddrValue, PeerIdValue}, utils::utc_to_timestamp};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "cached_address")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u32,
#[sea_orm(indexed)]
pub created_at: DateTimeUtc,
#[sea_orm(indexed)]
pub updated_at: DateTimeUtc,
#[sea_orm(indexed)]
pub cached_peer_id: u32,
#[sea_orm(indexed)]
pub multiaddress: MultiaddrValue,
}
#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::CachedPeerEntity",
from = "Column::CachedPeerId",
to = "super::CachedPeerColumn::Id"
)]
CachedPeer,
}
impl Related<super::CachedPeerEntity> for Entity {
fn to() -> RelationDef {
Relation::CachedPeer.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn new(cached_peer_id: u32, multiaddr: Multiaddr) -> Self {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
cached_peer_id: Set(cached_peer_id),
multiaddress: Set(MultiaddrValue::from(multiaddr)),
created_at: Set(timestamp),
updated_at: Set(timestamp),
..Default::default()
}
}
}

View file

@ -1,57 +0,0 @@
use std::str::FromStr;
use chrono::{Days, Local};
use libp2p::{multiaddr, Multiaddr, PeerId};
use sea_orm::{entity::{
prelude::*, *
}, sea_query};
use serde::{Deserialize, Serialize};
use crate::data::value::{MultiaddrValue, PeerIdValue};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "cached_peer")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: u32,
#[sea_orm(indexed)]
pub created_at: DateTimeUtc,
#[sea_orm(indexed)]
pub updated_at: DateTimeUtc,
#[sea_orm(indexed)]
pub peer_id: PeerIdValue,
}
#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)]
pub enum Relation {
#[sea_orm(has_many = "super::CachedAddressEntity")]
CachedAddress,
}
impl Related<super::CachedAddressEntity> for Entity {
fn to() -> RelationDef {
Relation::CachedAddress.def()
}
}
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn new(peer_id: PeerId) -> Self {
let timestamp: DateTimeUtc = Local::now().to_utc();
Self{
peer_id: Set(PeerIdValue::from(peer_id)),
created_at: Set(timestamp),
updated_at: Set(timestamp),
..Default::default()
}
}
}
impl Entity {
pub fn find_by_peer_id(peer_id: PeerId) -> Select<Entity> {
Self::find().filter(Column::PeerId.eq(PeerIdValue::from(peer_id)))
}
}

View file

@ -1,48 +0,0 @@
mod cached_peer;
mod cached_address;
pub use cached_peer::{
ActiveModel as CachedPeerActiveModel,
Column as CachedPeerColumn,
Model as CachedPeerModel,
Entity as CachedPeerEntity,
};
pub use cached_address::{
ActiveModel as CachedAddressActiveModel,
Column as CachedAddressColumn,
Model as CachedAddressModel,
Entity as CachedAddressEntity,
};
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use crate::{cache::entity::cached_peer, data::migration::DataMigrator, global::{DATABASE_CONNECTIONS}, tests::TEST_CONFIG};
use super::*;
use libp2p::{identity::{self, Keypair}, multiaddr, swarm::handler::multi, Multiaddr, PeerId};
use sea_orm::ActiveModelTrait;
#[tokio::test]
async fn insert() {
let db = DATABASE_CONNECTIONS.get_or_init_unchecked(&*TEST_CONFIG, DataMigrator).await.cache;
let peer_id = Keypair::generate_ed25519().public().to_peer_id();
let multiaddr = Multiaddr::empty()
.with(Ipv4Addr::new(127,0,0,1).into())
.with(multiaddr::Protocol::Tcp(0));
let inserted_cached_peer: CachedPeerModel = CachedPeerActiveModel::new(peer_id.clone())
.insert(db).await.unwrap();
let inserted_cached_address: CachedAddressModel = CachedAddressActiveModel::new(inserted_cached_peer.id, multiaddr.clone())
.insert(db).await.unwrap();
assert_eq!(PeerId::from(inserted_cached_peer.peer_id), peer_id);
assert_eq!(Multiaddr::from(inserted_cached_address.multiaddress), multiaddr);
}
}

View file

@ -1,148 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
use crate::migration::TableMigration;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
CachedPeer::up(manager).await?;
CachedAddress::up(manager).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
CachedAddress::down(manager).await?;
CachedPeer::down(manager).await?;
Ok(())
}
}
#[derive(DeriveIden, DeriveMigrationName)]
enum CachedPeer {
Table,
Id,
PeerId,
CreatedAt,
UpdatedAt,
}
static IDX_CACHED_PEER_PEER_ID: &str = "idx_cached_peer_peer_id";
static IDX_CACHED_PEER_CREATED_AT: &str = "idx_cached_peer_created_at";
static IDX_CACHED_PEER_UPDATED_AT: &str = "idx_cached_peer_updated_at";
#[async_trait::async_trait]
impl TableMigration for CachedPeer {
async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> {
manager.create_table(
Table::create()
.table(Self::Table)
.if_not_exists()
.col(pk_auto(Self::Id))
.col(string_len(Self::PeerId, 255))
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::UpdatedAt))
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_PEER_PEER_ID)
.table(Self::Table)
.col(Self::PeerId)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_PEER_CREATED_AT)
.table(Self::Table)
.col(Self::CreatedAt)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_PEER_UPDATED_AT)
.table(Self::Table)
.col(Self::UpdatedAt)
.to_owned()
).await?;
Ok(())
}
async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{
manager.drop_table(Table::drop().table(Self::Table).to_owned()).await
}
}
#[derive(DeriveIden, DeriveMigrationName)]
enum CachedAddress {
Table,
Id,
CachedPeerId,
CreatedAt,
UpdatedAt,
Multiaddress,
}
static IDX_CACHED_ADDRESS_MULTIADDRESS: &str = "idx_cached_address_multiaddress";
static IDX_CACHED_ADDRESS_CACHED_PEER_ID: &str = "idx_cached_address_cached_peer_id";
static IDX_CACHED_ADDRESS_CREATED_AT: &str = "idx_cached_address_created_at";
static IDX_CACHED_ADDRESS_UPDATED_AT: &str = "idx_cached_address_updated_at";
static FK_CACHED_ADDRESS_CACHED_PEER: &str = "fk_cached_address_cached_peer";
#[async_trait::async_trait]
impl TableMigration for CachedAddress {
async fn up<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr> {
manager.create_table(
Table::create()
.table(Self::Table)
.if_not_exists()
.col(pk_auto(Self::Id))
.col(integer(Self::CachedPeerId))
.foreign_key(ForeignKey::create()
.name(FK_CACHED_ADDRESS_CACHED_PEER)
.from(Self::Table,Self::CachedPeerId)
.to(CachedPeer::Table, CachedPeer::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade)
)
.col(timestamp(Self::CreatedAt))
.col(timestamp(Self::UpdatedAt))
.col(text_uniq(Self::Multiaddress))
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_CACHED_PEER_ID)
.table(Self::Table)
.col(Self::CachedPeerId)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_MULTIADDRESS)
.table(Self::Table)
.col(Self::Multiaddress)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_CREATED_AT)
.table(Self::Table)
.col(Self::CreatedAt)
.to_owned()
).await?;
manager.create_index(
Index::create()
.name(IDX_CACHED_ADDRESS_UPDATED_AT)
.table(Self::Table)
.col(Self::UpdatedAt)
.to_owned()
).await?;
Ok(())
}
async fn down<'a>(manager: &'a SchemaManager<'a>) -> Result<(), DbErr>{
manager.drop_table(Table::drop().table(Self::Table).to_owned()).await
}
}

View file

@ -1,12 +0,0 @@
use sea_orm_migration::prelude::*;
pub mod m20220101_000001_create_cache_tables;
pub struct CacheMigrator;
#[async_trait::async_trait]
impl MigratorTrait for CacheMigrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20220101_000001_create_cache_tables::Migration)]
}
}

View file

@ -1,2 +0,0 @@
pub mod entity;
pub mod migration;

View file

@ -8,7 +8,7 @@ use crate::{utils::{emptiable::Emptiable, mergeable::Mergeable}};
pub use error::ConfigError;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}};
pub use storage::{StorageConfig, PartialStorageConfig};
pub use p2p::{P2pConfig, PartialP2pConfig};
pub use rpc::*;

View file

@ -15,6 +15,12 @@ pub struct StorageConfig {
pub cache_directory: PathBuf,
}
impl StorageConfig {
pub fn get_local_database_path(&self) -> PathBuf {
self.data_directory.join("local.sqlite")
}
}
impl TryFrom<PartialStorageConfig> for StorageConfig {
type Error = ConfigError;
@ -83,6 +89,7 @@ impl PartialStorageConfig {
}
}
impl From<StorageConfig> for PartialStorageConfig {

View file

@ -1,45 +0,0 @@
use rusqlite::{ffi::Error, Connection};
use tracing::{event, Level};
pub fn migrate(con: &mut Connection) -> Result<(), Error>{
let version: u32 = con.pragma_query_value(None,"user_version", |row| row.get(0))?;
if version < 1 {
event!(Level::INFO, "Migrate local db to version 1");
let tx = con.transaction()?;
tx.execute(
"CREATE TABLE known_peer (
id INTEGER PRIMARY KEY,
peer_id TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
)",
()
)?;
tx.execute(
"CREATE TABLE known_address (
id INTEGER PRIMARY KEY,
known_peer_id INTEGER NOT NULL,
multiaddr TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
protocol TEXT NOT NULL,
FOREIGN KEY(knwon_peer_id) REFERENCES knwon_peer(id),
)",
()
)?;
tx.execute(
"CREATE TABLE authorized_peer (
id INTEGER PRIMARY KEY,
known_peer_id INTEGER NOT NULL UNIQUE,
synced_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(known_peer_id) REFERENCES knwon_peer(id)",
()
)?;
tx.pragma_update(None, "user_version", 1)?;
tx.commit()?;
event!(Level::INFO, "Migration done.")
}
}

View file

@ -0,0 +1,14 @@
mod v1;
use rusqlite::{ffi::Error, Connection};
use tracing::{event, Level};
pub fn migrate(con: &Connection) -> Result<(), Error>{
let version: u32 = con.pragma_query_value(None,"user_version", |row| row.get(0)).expect("Failed to get user_version");
if version < 1 {
event!(Level::INFO, "Migrate local db to version 1");
v1::migrate(con)?;
event!(Level::INFO, "Migration done.");
}
Ok(())
}

View file

@ -0,0 +1,55 @@
use rusqlite::{ffi::Error, Connection};
pub fn migrate(con: &Connection) -> Result<(), Error>{
let tx = con.transaction()?;
tx.execute(
"CREATE TABLE peer (
id INTEGER PRIMARY KEY,
libp2p_peer_id TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
)",
()
)?;
tx.execute(
"CREATE INDEX idx_peer_created_at ON peer(created_at)",
()
)?;
tx.execute(
"CREATE INDEX idx_peer_updated_at ON peer(updated_at)",
()
)?;
tx.execute(
"CREATE TABLE address (
id INTEGER PRIMARY KEY,
peer_id INTEGER NOT NULL,
multiaddr TEXT UNIQUE NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
protocol TEXT NOT NULL,
FOREIGN KEY(peer_id) REFERENCES peer(id),
)",
()
)?;
tx.execute(
"CREATE INDEX idx_address_created_at ON address(created_at)",
()
)?;
tx.execute(
"CREATE INDEX idx_address_updated_at ON address(updated_at)",
()
)?;
tx.execute(
"CREATE TABLE authorized_peer (
id INTEGER PRIMARY KEY,
peer_id INTEGER NOT NULL UNIQUE,
synced_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(peer_id) REFERENCES peer(id)",
()
)?;
tx.pragma_update(None, "user_version", 1)?;
tx.commit()?;
}

View file

@ -1 +1,59 @@
pub mod migration;
use std::{cell::OnceCell, path::Path, sync::{LazyLock, OnceLock}};
use migration::migrate;
use rusqlite::{ffi::Error, Connection};
use crate::{config::StorageConfig, global::CONFIG};
static INITIALIZE_PARENT_DIRECTORY_RESULT: OnceLock<()> = OnceLock::new();
static MIGRATE_RESULT: OnceLock<()> = OnceLock::new();
fn initialize_parent_directory<P>(path: &P)
where
P: AsRef<Path>,
{
*INITIALIZE_PARENT_DIRECTORY_RESULT.get_or_init(|| {
let path2: &Path = path.as_ref();
if let Some(x) = path2.parent() {
if !x.exists() {
std::fs::create_dir_all(x).expect("Parent directory of the local database must be created.");
}
}
})
}
fn migrate_once(conn: &Connection) -> () {
*MIGRATE_RESULT.get_or_init(|| {
migrate(conn).expect("Local database migration should be done correctly")
})
}
pub trait LocalDatabaseConnection {
fn from_path<P>(path: &P) -> Self
where
P: AsRef<Path>;
fn from_storage_config<T>(config: &T) -> Self
where
T: AsRef<StorageConfig>
{
Self::from_path(&config.as_ref().get_local_database_path())
}
fn from_global_storage_config() -> Self {
Self::from_storage_config(CONFIG.get_unchecked())
}
}
impl LocalDatabaseConnection for Connection {
fn from_path<P>(path: &P) -> Self
where
P: AsRef<Path>
{
initialize_parent_directory(path);
let conn = Connection::open(path).expect("local database connection must be opened without error");
migrate_once(&conn);
conn
}
}

View file

@ -10,8 +10,6 @@ pub enum Error {
CiborSerialize(#[from] ciborium::ser::Error<std::io::Error>),
#[error("Config error: {0}")]
Config(#[from] crate::config::error::ConfigError),
#[error("DB Error: {0}")]
Db(#[from]sea_orm::DbErr),
#[error("Dial Error: {0}")]
Dial(#[from] libp2p::swarm::DialError),
#[error("Decoding identity error: {0}")]

View file

@ -1,66 +0,0 @@
use std::{path::{Path, PathBuf}, sync::OnceLock};
use dirs::cache_dir;
use rusqlite::Connection;
use crate::{cache::migration::CacheMigrator, config::StorageConfig, data::local::migration::migrate, error::Error};
use tokio::sync::OnceCell;
pub static LOCAL_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new();
pub struct GlobalDatabaseConnection {
inner: OnceLock<Connection>
}
impl GlobalDatabaseConnection {
pub const fn const_new() -> Self {
Self {
inner: OnceLock::new()
}
}
pub fn get(&'static self) -> Option<&'static Connection> {
self.inner.get()
}
pub fn get_unchecked(&'static self) -> &'static Connection {
self.get().expect("local data database connection should initialized before access!")
}
fn get_file_path<T>(config: &T) -> PathBuf
where
T: AsRef<StorageConfig>
{
config.as_ref().data_directory.join("local.sqlite")
}
pub async fn get_or_init_unchecked<T, U>(&'static self, config: T) -> Connection
where
T: AsRef<StorageConfig>,
{
let path = Self::get_file_path(&config);
if let Some(x) = path.parent() {
std::fs::create_dir_all(x).expect("Failed to create directory for local database");
}
self.inner.get_or_init(|| {
let db = Connection::open(path)?;
migrate(&db).unwrap();
db
})
}
}
#[cfg(test)]
pub use tests::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::{cache::migration::CacheMigrator, global::CONFIG, tests::{TEST_CONFIG}};
#[tokio::test]
pub async fn get_or_init_database() {
LOCAL_DATABASE_CONNECTION.get_or_init_unchecked(&*TEST_CONFIG).await;
}
}

View file

@ -2,14 +2,11 @@ use std::{any::type_name, collections::HashMap, net::{IpAddr, Ipv4Addr}, path::{
use crate::{config::{P2pConfig, PartialP2pConfig, StorageConfig}, error::Error };
use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId};
use sea_orm::{prelude::*, Database};
use sea_orm_migration::MigratorTrait;
use tokio::sync::{OnceCell, RwLock, RwLockReadGuard, RwLockWriteGuard};
mod config;
pub use config::*;
mod database_connection;
pub use database_connection::*;
use uuid::{ContextV7, Timestamp, Uuid};
pub fn generate_uuid() -> Uuid {

View file

@ -1,10 +1,7 @@
pub mod cache;
pub mod config;
pub mod data;
pub mod error;
pub mod global;
pub mod message;
pub mod migration;
pub mod p2p;
pub mod proto;
pub mod rpc;

View file

@ -1,10 +1,8 @@
use std::{path::PathBuf, sync::LazyLock};
use sea_orm::{sea_query::{FromValueTuple, IntoValueTuple, ValueType}, ActiveModelBehavior, ActiveModelTrait, ColumnTrait, Condition, DatabaseConnection, EntityTrait, IntoActiveModel, ModelTrait, PrimaryKeyToColumn, PrimaryKeyTrait, Value};
use sea_orm::QueryFilter;
use tempfile::TempDir;
use url::Url;
use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}, message::Message};
use crate::{ config::{Config, PartialConfig, PartialP2pConfig, PartialRpcConfig, RpcConfig, StorageConfig}};
use serde::{de::DeserializeOwned, Deserialize, Serialize};