use crate::{Aggregation, AggregationPath, Entry, EntryPath, Token}; use actix_web::web; use sled::{Db, Tree}; use std::ops::Range; use uuid::Uuid; #[derive(Clone)] pub(crate) struct Store { tree: Tree, } pub(crate) struct CreateAggregation<'a> { pub(crate) aggregation_path: &'a AggregationPath, pub(crate) aggregation: &'a Aggregation, pub(crate) token: &'a Token, } pub(crate) struct UpdateAggregation<'a> { pub(crate) aggregation_path: &'a AggregationPath, pub(crate) aggregation: &'a Aggregation, } pub(crate) struct DeleteAggregation<'a> { pub(crate) aggregation_path: &'a AggregationPath, } pub(crate) struct CreateEntry<'a> { pub(crate) entry_path: &'a EntryPath, pub(crate) entry: &'a Entry, } pub(crate) struct UpdateEntry<'a> { pub(crate) entry_path: &'a EntryPath, pub(crate) entry: &'a Entry, } pub(crate) struct DeleteEntry<'a> { pub(crate) entry_path: &'a EntryPath, } impl<'a> CreateAggregation<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.create_aggregation(self).await } } impl<'a> UpdateAggregation<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.update_aggregation(self).await } } impl<'a> DeleteAggregation<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.delete_aggregation(self).await } } impl<'a> CreateEntry<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.create_entry(self).await } } impl<'a> UpdateEntry<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.update_entry(self).await } } impl<'a> DeleteEntry<'a> { pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> { store.delete_entry(self).await } } impl Store { pub(crate) fn new(db: &Db) -> Result { Ok(Store { tree: db.open_tree("aggregations")?, }) } async fn create_aggregation(&self, config: CreateAggregation<'_>) -> Result<(), Error> { let aggregation_key = config.aggregation_path.key(); let aggregation_value = serde_json::to_string(&config.aggregation)?; let token_key = config.aggregation_path.token_key(); let token2 = config.token.clone(); let token_value = serde_json::to_string(&web::block(move || token2.hash()).await?)?; let tree = self.tree.clone(); web::block(move || { tree.transaction(move |tree| { tree.insert(aggregation_key.as_bytes(), aggregation_value.as_bytes())?; tree.insert(token_key.as_bytes(), token_value.as_bytes())?; Ok(()) }) }) .await?; Ok(()) } async fn update_aggregation(&self, config: UpdateAggregation<'_>) -> Result<(), Error> { let aggregation_key = config.aggregation_path.key(); let aggregation_value = serde_json::to_string(&config.aggregation)?; let tree = self.tree.clone(); web::block(move || tree.insert(aggregation_key.as_bytes(), aggregation_value.as_bytes())) .await?; Ok(()) } async fn delete_aggregation(&self, config: DeleteAggregation<'_>) -> Result<(), Error> { let entry_range = config.aggregation_path.entry_range(); let token_key = config.aggregation_path.token_key(); let aggregation_key = config.aggregation_path.key(); let tree = self.tree.clone(); web::block(move || { let entries = tree .range(entry_range) .keys() .collect::, _>>()?; tree.transaction(move |tree| { for key in &entries { tree.remove(key)?; } tree.remove(token_key.as_bytes())?; tree.remove(aggregation_key.as_bytes())?; Ok(()) })?; Ok(()) as Result<(), Error> }) .await?; Ok(()) } async fn create_entry(&self, config: CreateEntry<'_>) -> Result<(), Error> { let entry_key = config.entry_path.key(); let entry_value = serde_json::to_string(&config.entry)?; let tree = self.tree.clone(); web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await?; Ok(()) } async fn update_entry(&self, config: UpdateEntry<'_>) -> Result<(), Error> { let entry_key = config.entry_path.key(); let entry_value = serde_json::to_string(&config.entry)?; let tree = self.tree.clone(); web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await?; Ok(()) } async fn delete_entry(&self, config: DeleteEntry<'_>) -> Result<(), Error> { let entry_key = config.entry_path.key(); let tree = self.tree.clone(); web::block(move || tree.remove(entry_key)).await?; Ok(()) } pub(crate) async fn aggregation( &self, path: &AggregationPath, ) -> Result { let aggregation_key = path.key(); let tree = self.tree.clone(); let opt = web::block(move || tree.get(aggregation_key.as_bytes())).await?; match opt { Some(a) => { let aggregation = serde_json::from_slice(&a)?; Ok(aggregation) } None => Err(Error::NotFound), } } pub(crate) async fn entry(&self, path: &EntryPath) -> Result { let entry_key = path.key(); let tree = self.tree.clone(); let opt = web::block(move || tree.get(entry_key.as_bytes())).await?; match opt { Some(e) => { let entry = serde_json::from_slice(&e)?; Ok(entry) } None => Err(Error::NotFound), } } pub(crate) async fn entries(&self, range: Range>) -> Result, Error> { let tree = self.tree.clone(); let v = web::block(move || { tree.range(range) .map(|res| match res { Err(e) => Err(e.into()), Ok((k, v)) => { let string_key = String::from_utf8_lossy(&k); let entry_uuid = string_key.split(|b| b == '/').rev().next().unwrap(); let uuid: Uuid = entry_uuid.parse()?; let string_value = String::from_utf8_lossy(&v); let entry: Entry = serde_json::from_str(&string_value)?; Ok((uuid, entry)) } }) .collect::, Error>>() }) .await?; Ok(v) } pub(crate) async fn token(&self, path: &AggregationPath) -> Result { let token_key = path.token_key(); let tree = self.tree.clone(); let token_opt = web::block(move || tree.get(token_key.as_bytes())).await?; let token = match token_opt { Some(token_ivec) => serde_json::from_slice(&token_ivec)?, None => return Err(Error::NotFound), }; Ok(token) } } #[derive(Debug, thiserror::Error)] pub(crate) enum Error { #[error("{0}")] Json(#[from] serde_json::Error), #[error("{0}")] Uuid(#[from] uuid::Error), #[error("{0}")] Sled(#[from] sled::Error), #[error("{0}")] Transaction(#[from] sled::transaction::TransactionError), #[error("{0}")] Bcrypt(#[from] bcrypt::BcryptError), #[error("Panic in blocking operation")] Blocking, #[error("Item is not found")] NotFound, } impl From> for Error where E: std::fmt::Debug, Error: From, { fn from(err: actix_web::error::BlockingError) -> Self { match err { actix_web::error::BlockingError::Error(e) => e.into(), _ => Error::Blocking, } } }