Merge global database connections
This commit is contained in:
parent
5120005128
commit
be896aaae3
8 changed files with 145 additions and 92 deletions
|
@ -40,6 +40,15 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
|||
uuid.workspace = true
|
||||
prost-types = "0.14.1"
|
||||
|
||||
[target.'cfg(target_os="android")'.dependencies]
|
||||
jni = "0.21.1"
|
||||
ndk = "0.9.0"
|
||||
|
||||
[target.'cfg(target_os="ios")'.dependencies]
|
||||
objc = "0.2.7"
|
||||
objc-foundation = "0.1.1"
|
||||
objc_id = "0.1.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.20.0"
|
||||
|
||||
|
|
5
core/src/cache/entity/mod.rs
vendored
5
core/src/cache/entity/mod.rs
vendored
|
@ -20,7 +20,7 @@ pub use cached_address::{
|
|||
mod tests {
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
use crate::{cache::entity::cached_peer, global::get_or_init_test_cache_database};
|
||||
use crate::{cache::entity::cached_peer, data::migration::DataMigrator, global::{DATABASE_CONNECTIONS}, tests::TEST_CONFIG};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -31,7 +31,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn insert() {
|
||||
let db = get_or_init_test_cache_database().await;
|
||||
|
||||
let db = DATABASE_CONNECTIONS.get_or_init_unchecked(&*TEST_CONFIG, DataMigrator).await.cache;
|
||||
let peer_id = Keypair::generate_ed25519().public().to_peer_id();
|
||||
let multiaddr = Multiaddr::empty()
|
||||
.with(Ipv4Addr::new(127,0,0,1).into())
|
||||
|
|
|
@ -42,7 +42,7 @@ impl AsRef<RpcConfig> for Config {
|
|||
}
|
||||
|
||||
#[cfg_attr(feature="desktop", derive(Args))]
|
||||
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
|
||||
pub struct PartialConfig {
|
||||
#[cfg_attr(feature="desktop", command(flatten))]
|
||||
pub p2p: PartialP2pConfig,
|
||||
|
|
|
@ -9,25 +9,12 @@ use crate::{config::{ConfigError, PartialConfig}, utils::{emptiable::Emptiable,
|
|||
use libp2p::mdns::Config;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
static DATA_DATABASE_NAME: &str = "data.sqlite";
|
||||
static CACHE_DATABASE_NAME: &str = "cache.sqlite";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StorageConfig {
|
||||
pub data_directory: PathBuf,
|
||||
pub cache_directory: PathBuf,
|
||||
}
|
||||
|
||||
impl StorageConfig {
|
||||
pub fn get_data_database_path(&self) -> PathBuf{
|
||||
self.data_directory.join(DATA_DATABASE_NAME)
|
||||
}
|
||||
pub fn get_cache_database_path(&self) -> PathBuf {
|
||||
self.cache_directory.join(CACHE_DATABASE_NAME)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl TryFrom<PartialStorageConfig> for StorageConfig {
|
||||
type Error = ConfigError;
|
||||
|
||||
|
@ -47,19 +34,46 @@ pub struct PartialStorageConfig {
|
|||
pub cache_directory: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl Default for PartialStorageConfig {
|
||||
fn default() -> Self {
|
||||
#[cfg(any(target_os="linux", target_os="macos", target_os="windows"))]
|
||||
{
|
||||
let mut data_dir = dirs::data_local_dir().unwrap();
|
||||
data_dir.push(get_binary_name().unwrap());
|
||||
let mut cache_dir = dirs::cache_dir().unwrap();
|
||||
cache_dir.push(get_binary_name().unwrap());
|
||||
impl PartialStorageConfig {
|
||||
#[cfg(not(any(target_os="android", target_os="ios")))]
|
||||
fn default_desktop(app_name: &'static str) -> Self {
|
||||
|
||||
let mut data_dir = dirs::data_local_dir().unwrap();
|
||||
data_dir.push(app_name);
|
||||
let mut cache_dir = dirs::cache_dir().unwrap();
|
||||
cache_dir.push(app_name);
|
||||
|
||||
Self {
|
||||
data_directory: Some(data_dir),
|
||||
cache_directory: Some(cache_dir)
|
||||
}
|
||||
Self {
|
||||
data_directory: Some(data_dir),
|
||||
cache_directory: Some(cache_dir)
|
||||
}
|
||||
}
|
||||
#[cfg(target_os="android")]
|
||||
fn default_android() -> Self{
|
||||
let ctx = ndk_context::android_context();
|
||||
let vm = unsafe { jni::JavaVM::from_raw(ctx.vm().cast()) }?;
|
||||
let mut env = vm.attach_current_thread()?;
|
||||
let ctx = unsafe { jni::objects::JObject::from_raw(ctx.context().cast()) };
|
||||
let cache_dir = env
|
||||
.call_method(ctx, "getFilesDir", "()Ljava/io/File;", &[])?
|
||||
.l()?;
|
||||
let cache_dir: jni::objects::JString = env
|
||||
.call_method(&cache_dir, "toString", "()Ljava/lang/String;", &[])?
|
||||
.l()?
|
||||
.try_into()?;
|
||||
let cache_dir = env.get_string(&cache_dir)?;
|
||||
let cache_dir = cache_dir.to_str()?;
|
||||
Ok(cache_dir.to_string())
|
||||
|
||||
}
|
||||
#[cfg(target_os="ios")]
|
||||
fn default_ios(){
|
||||
unsafe {
|
||||
let file_manager: *mut Object = msg_send![Class::get("NSFileManager").unwrap(), defaultManager];
|
||||
let paths: Id<Object> = msg_send![file_manager, URLsForDirectory:1 inDomains:1];
|
||||
let first_path: *mut Object = msg_send![paths, firstObject];
|
||||
let path: Id<NSString> = Id::from_ptr(msg_send![first_path, path]);
|
||||
Some(path.as_str().to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ pub use record_deletion::{
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{data::value::PeerIdValue, global::{generate_uuid, get_or_init_test_data_database}};
|
||||
use crate::{data::{migration::DataMigrator, value::PeerIdValue}, global::{generate_uuid, DATABASE_CONNECTIONS}, tests::TEST_CONFIG};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -26,7 +26,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn check_insert() {
|
||||
let db = get_or_init_test_data_database().await;
|
||||
let db = DATABASE_CONNECTIONS.get_or_init_unchecked(&*TEST_CONFIG, DataMigrator).await.cache;
|
||||
|
||||
let node = TrustedNodeActiveModel::new(PeerId::random(), "test note".to_owned()).insert(db).await.unwrap();
|
||||
let _ = RecordDeletionActiveModel::new(node.id, "test_table".to_string(), generate_uuid()).insert(db).await.unwrap();
|
||||
|
|
|
@ -2,66 +2,97 @@ use std::path::{Path, PathBuf};
|
|||
|
||||
use sea_orm::{ConnectOptions, Database, DbErr, DatabaseConnection};
|
||||
use sea_orm_migration::MigratorTrait;
|
||||
use crate::{config::StorageConfig, error::Error};
|
||||
use crate::{cache::migration::CacheMigrator, config::StorageConfig, error::Error};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
enum StorageType {
|
||||
Data,
|
||||
Cache,
|
||||
pub static DATABASE_CONNECTIONS: GlobalDatabaseConnections = GlobalDatabaseConnections::const_new();
|
||||
|
||||
pub struct DatabaseConnections<'a> {
|
||||
pub data: &'a DatabaseConnection,
|
||||
pub cache: &'a DatabaseConnection
|
||||
}
|
||||
|
||||
impl std::fmt::Display for StorageType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f,"{}", match self {
|
||||
StorageType::Data => "data",
|
||||
StorageType::Cache => "cache",
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
pub struct GlobalDatabaseConnections {
|
||||
data: OnceCell<DatabaseConnection>,
|
||||
cache: OnceCell<DatabaseConnection>,
|
||||
}
|
||||
|
||||
|
||||
pub static DATA_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new(StorageType::Data);
|
||||
pub static CACHE_DATABASE_CONNECTION: GlobalDatabaseConnection = GlobalDatabaseConnection::const_new(StorageType::Cache);
|
||||
|
||||
pub struct GlobalDatabaseConnection {
|
||||
storage: StorageType,
|
||||
inner: OnceCell<DatabaseConnection>
|
||||
}
|
||||
|
||||
impl GlobalDatabaseConnection {
|
||||
pub const fn const_new(storage: StorageType) -> Self {
|
||||
impl GlobalDatabaseConnections {
|
||||
pub const fn const_new() -> Self {
|
||||
Self {
|
||||
storage: storage,
|
||||
inner: OnceCell::const_new()
|
||||
data: OnceCell::const_new(),
|
||||
cache: OnceCell::const_new()
|
||||
}
|
||||
}
|
||||
pub fn get(&'static self) -> Option<&'static DatabaseConnection> {
|
||||
self.inner.get()
|
||||
|
||||
pub fn get_data(&'static self) -> Option<&'static DatabaseConnection> {
|
||||
self.data.get()
|
||||
}
|
||||
fn get_file_path<T>(&self, config: T) -> PathBuf
|
||||
|
||||
pub fn get_data_unchecked(&'static self) -> &'static DatabaseConnection {
|
||||
self.get_data().expect("Global data database connection should initialized before access!")
|
||||
}
|
||||
|
||||
pub fn get_cache(&'static self) -> Option<&'static DatabaseConnection> {
|
||||
self.cache.get()
|
||||
}
|
||||
|
||||
pub fn get_cache_unchecked(&'static self) -> &'static DatabaseConnection {
|
||||
self.get_cache().expect("Global cache database connection should initialized before access!")
|
||||
}
|
||||
|
||||
fn get_data_file_path<T>(config: &T) -> PathBuf
|
||||
where
|
||||
T: AsRef<StorageConfig>
|
||||
{
|
||||
match self.storage {
|
||||
StorageType::Cache => config.as_ref().cache_directory.join("cache.db"),
|
||||
StorageType::Data => config.as_ref().data_directory.join("data.db"),
|
||||
}
|
||||
config.as_ref().data_directory.join("data.db")
|
||||
}
|
||||
pub fn get_unchecked(&'static self) -> &'static DatabaseConnection {
|
||||
self.get().expect("Global database connection should initialized beforehand!")
|
||||
|
||||
fn get_cache_file_path<T>(config: &T) -> PathBuf
|
||||
where
|
||||
T: AsRef<StorageConfig>
|
||||
{
|
||||
config.as_ref().cache_directory.join("cache.db")
|
||||
}
|
||||
pub async fn get_or_try_init<T, U>(&'static self, config: T, _: U) -> Result<&'static DatabaseConnection, Error>
|
||||
where
|
||||
T: AsRef<StorageConfig>,
|
||||
|
||||
fn get_url_unchecked<T>(path: T) -> String
|
||||
where
|
||||
T: AsRef<Path>
|
||||
{
|
||||
"sqlite://".to_string() + path.as_ref().as_os_str().to_str().expect("Failed to convert path to string!") + "?mode=rwc"
|
||||
}
|
||||
|
||||
async fn get_or_init_database_connection_unchecked<T, U>(cell: &OnceCell<DatabaseConnection>, options: T, _: U ) -> &DatabaseConnection
|
||||
where
|
||||
T: Into<ConnectOptions>,
|
||||
U: MigratorTrait
|
||||
{
|
||||
let url = "sqlite://".to_string() + &self.get_file_path(config).into_os_string().into_string()? + "?mode=rwc";
|
||||
Ok(self.inner.get_or_try_init(|| async {
|
||||
let db = Database::connect(&url).await?;
|
||||
U::up(&db, None).await?;
|
||||
Ok::<DatabaseConnection, DbErr>(db)
|
||||
}).await?)
|
||||
cell.get_or_init(|| async {
|
||||
let db = Database::connect(options.into()).await.unwrap();
|
||||
U::up(&db, None).await.unwrap();
|
||||
db
|
||||
}).await
|
||||
}
|
||||
|
||||
|
||||
pub async fn get_or_init_unchecked<T, U>(&'static self, config: T, _migrator: U) -> DatabaseConnections
|
||||
where
|
||||
T: AsRef<StorageConfig>,
|
||||
U: MigratorTrait,
|
||||
{
|
||||
DatabaseConnections{
|
||||
data: Self::get_or_init_database_connection_unchecked(
|
||||
&self.data,
|
||||
Self::get_url_unchecked(Self::get_data_file_path(&config)),
|
||||
_migrator
|
||||
).await,
|
||||
cache: Self::get_or_init_database_connection_unchecked(
|
||||
&self.cache,
|
||||
Self::get_url_unchecked(Self::get_cache_file_path(&config)),
|
||||
CacheMigrator
|
||||
).await,
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,10 +105,8 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::{cache::migration::CacheMigrator, data::migration::DataMigrator, global::CONFIG, tests::{TEST_CONFIG}};
|
||||
|
||||
pub async fn get_or_init_test_data_database() -> &'static DatabaseConnection{
|
||||
DATA_DATABASE_CONNECTION.get_or_try_init(&*TEST_CONFIG, DataMigrator).await.unwrap()
|
||||
#[tokio::test]
|
||||
pub async fn get_or_init_database() {
|
||||
DATABASE_CONNECTIONS.get_or_init_unchecked(&*TEST_CONFIG, DataMigrator).await;
|
||||
}
|
||||
pub async fn get_or_init_test_cache_database() -> &'static DatabaseConnection{
|
||||
CACHE_DATABASE_CONNECTION.get_or_try_init(&*TEST_CONFIG, CacheMigrator).await.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use libp2p::{ identity::Keypair, mdns, ping, swarm, Multiaddr, PeerId};
|
|||
use sea_orm::{prelude::DateTimeUtc, ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, ModelTrait, QueryFilter};
|
||||
use tracing::{event, Level};
|
||||
|
||||
use crate::{cache::entity::{CachedPeerActiveModel, CachedAddressActiveModel, CachedAddressColumn, CachedAddressEntity, CachedAddressModel, CachedPeerColumn, CachedPeerEntity, CachedPeerModel}, data::value::{MultiaddrValue, PeerIdValue}, error::Error, global::CACHE_DATABASE_CONNECTION};
|
||||
use crate::{cache::entity::{CachedPeerActiveModel, CachedAddressActiveModel, CachedAddressColumn, CachedAddressEntity, CachedAddressModel, CachedPeerColumn, CachedPeerEntity, CachedPeerModel}, data::value::{MultiaddrValue, PeerIdValue}, error::Error, global::DATABASE_CONNECTIONS};
|
||||
|
||||
#[derive(swarm::NetworkBehaviour)]
|
||||
#[behaviour(to_swarm = "Event")]
|
||||
|
@ -69,34 +69,34 @@ impl From<ping::Event> for Event {
|
|||
|
||||
async fn try_get_or_insert_cached_peer(peer_id: &PeerId, peer_addr: &Multiaddr) -> Result<(CachedPeerModel, CachedAddressModel), Error> {
|
||||
match (
|
||||
CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(CACHE_DATABASE_CONNECTION.get_unchecked()).await?,
|
||||
CachedAddressEntity::find().filter(CachedAddressColumn::Multiaddress.eq(MultiaddrValue::from(peer_addr.clone()))).one(CACHE_DATABASE_CONNECTION.get_unchecked()).await?,
|
||||
CachedPeerEntity::find().filter(CachedPeerColumn::PeerId.eq(PeerIdValue::from(peer_id.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?,
|
||||
CachedAddressEntity::find().filter(CachedAddressColumn::Multiaddress.eq(MultiaddrValue::from(peer_addr.clone()))).one(DATABASE_CONNECTIONS.get_cache_unchecked()).await?,
|
||||
) {
|
||||
(Some(x), Some(y) ) => {
|
||||
if x.id == y.cached_peer_id {
|
||||
event!(Level::TRACE, "Known peer: {}, {}", peer_id, peer_addr);
|
||||
let mut addr: CachedAddressActiveModel = y.into();
|
||||
addr.updated_at = Set(Local::now().to_utc());
|
||||
let updated = addr.update(CACHE_DATABASE_CONNECTION.get_unchecked()).await?;
|
||||
let updated = addr.update(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
|
||||
Ok((x, updated))
|
||||
} else {
|
||||
y.delete(CACHE_DATABASE_CONNECTION.get().expect("Cache database should initialized beforehand!")).await?;
|
||||
Ok((x.clone(), CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get_unchecked()).await?))
|
||||
y.delete(DATABASE_CONNECTIONS.get_cache().expect("Cache database should initialized beforehand!")).await?;
|
||||
Ok((x.clone(), CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
|
||||
}
|
||||
}
|
||||
(Some(x), None) => {
|
||||
event!(Level::INFO, "New address {} for {}", peer_addr, peer_id);
|
||||
Ok((x.clone(),CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get_unchecked()).await?))
|
||||
Ok((x.clone(),CachedAddressActiveModel::new(x.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
|
||||
},
|
||||
(None, x) => {
|
||||
event!(Level::INFO, "Add new peer: {}", peer_id);
|
||||
let inserted = CachedPeerActiveModel::new(peer_id.clone()).insert(CACHE_DATABASE_CONNECTION.get_unchecked()).await?;
|
||||
let inserted = CachedPeerActiveModel::new(peer_id.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
|
||||
if let Some(y) = x {
|
||||
event!(Level::INFO, "Remove {} from {}", peer_addr, peer_id);
|
||||
y.delete(CACHE_DATABASE_CONNECTION.get_unchecked()).await?;
|
||||
y.delete(DATABASE_CONNECTIONS.get_cache_unchecked()).await?;
|
||||
};
|
||||
event!(Level::INFO, "Add address {} to {}", peer_addr, peer_id);
|
||||
Ok((inserted.clone(), CachedAddressActiveModel::new(inserted.id, peer_addr.clone()).insert(CACHE_DATABASE_CONNECTION.get_unchecked()).await?))
|
||||
Ok((inserted.clone(), CachedAddressActiveModel::new(inserted.id, peer_addr.clone()).insert(DATABASE_CONNECTIONS.get_cache_unchecked()).await?))
|
||||
},
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{CACHE_DATABASE_CONNECTION, DATA_DATABASE_CONNECTION}, proto::CachedAddressMessage};
|
||||
use crate::{cache::entity::{CachedAddressEntity, CachedPeerEntity, CachedPeerModel}, global::{DATABASE_CONNECTIONS}, proto::CachedAddressMessage};
|
||||
use futures::future::join_all;
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
|
@ -16,9 +16,9 @@ impl crate::proto::cached_peer_service_server::CachedPeerService for CachedPeerS
|
|||
println!("Got a request: {:?}", request);
|
||||
|
||||
let reply = CachedPeerListResponse {
|
||||
peers: join_all( CachedPeerEntity::find().all(CACHE_DATABASE_CONNECTION.get_unchecked()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.iter().map(|x| async move {
|
||||
peers: join_all( CachedPeerEntity::find().all(DATABASE_CONNECTIONS.get_cache_unchecked()).await.or_else(|e| Err(Status::from_error(Box::new(e))))?.iter().map(|x| async move {
|
||||
let addresses = CachedAddressEntity::find()
|
||||
.all(CACHE_DATABASE_CONNECTION.get_unchecked())
|
||||
.all(DATABASE_CONNECTIONS.get_cache_unchecked())
|
||||
.await
|
||||
.or_else(|e| Err(Status::from_error(Box::new(e))))?;
|
||||
Ok::<CachedPeerMessage, Status>(CachedPeerMessage::from((x, &addresses)))
|
||||
|
|
Loading…
Add table
Reference in a new issue