pict-rs-aggregator/src/store.rs

280 lines
7.8 KiB
Rust

use crate::{Collection, CollectionPath, Entry, EntryPath, Token};
use actix_web::web;
use sled::{Db, Tree};
use std::ops::Range;
use uuid::Uuid;
#[derive(Clone)]
pub(crate) struct Store {
tree: Tree,
}
pub(crate) struct CreateCollection<'a> {
pub(crate) collection_path: &'a CollectionPath,
pub(crate) collection: &'a Collection,
pub(crate) token: &'a Token,
}
pub(crate) struct UpdateCollection<'a> {
pub(crate) collection_path: &'a CollectionPath,
pub(crate) collection: &'a Collection,
}
pub(crate) struct DeleteCollection<'a> {
pub(crate) collection_path: &'a CollectionPath,
}
pub(crate) struct CreateEntry<'a> {
pub(crate) entry_path: &'a EntryPath,
pub(crate) entry: &'a Entry,
}
pub(crate) struct UpdateEntry<'a> {
pub(crate) entry_path: &'a EntryPath,
pub(crate) entry: &'a Entry,
}
pub(crate) struct DeleteEntry<'a> {
pub(crate) entry_path: &'a EntryPath,
}
impl<'a> CreateCollection<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.create_collection(self).await
}
}
impl<'a> UpdateCollection<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.update_collection(self).await
}
}
impl<'a> DeleteCollection<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.delete_collection(self).await
}
}
impl<'a> CreateEntry<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.create_entry(self).await
}
}
impl<'a> UpdateEntry<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.update_entry(self).await
}
}
impl<'a> DeleteEntry<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.delete_entry(self).await
}
}
impl Store {
pub(crate) fn new(db: &Db) -> Result<Self, sled::Error> {
Ok(Store {
tree: db.open_tree("collections")?,
})
}
async fn create_collection(&self, config: CreateCollection<'_>) -> Result<(), Error> {
let collection_key = config.collection_path.key();
let collection_value = serde_json::to_string(&config.collection)?;
let token_key = config.collection_path.token_key();
let token2 = config.token.clone();
let token_value = serde_json::to_string(&web::block(move || token2.hash()).await??)?;
let tree = self.tree.clone();
web::block(move || {
tree.transaction(move |tree| {
tree.insert(collection_key.as_bytes(), collection_value.as_bytes())?;
tree.insert(token_key.as_bytes(), token_value.as_bytes())?;
Ok(())
})
})
.await??;
Ok(())
}
async fn update_collection(&self, config: UpdateCollection<'_>) -> Result<(), Error> {
let collection_key = config.collection_path.key();
let collection_value = serde_json::to_string(&config.collection)?;
let tree = self.tree.clone();
web::block(move || tree.insert(collection_key.as_bytes(), collection_value.as_bytes()))
.await??;
Ok(())
}
async fn delete_collection(&self, config: DeleteCollection<'_>) -> Result<(), Error> {
let entry_range = config.collection_path.entry_range();
let token_key = config.collection_path.token_key();
let collection_key = config.collection_path.key();
let tree = self.tree.clone();
web::block(move || {
let entries = tree
.range(entry_range)
.keys()
.collect::<Result<Vec<_>, _>>()?;
tree.transaction(move |tree| {
for key in &entries {
tree.remove(key)?;
}
tree.remove(token_key.as_bytes())?;
tree.remove(collection_key.as_bytes())?;
Ok(())
})?;
Ok(()) as Result<(), Error>
})
.await??;
Ok(())
}
async fn create_entry(&self, config: CreateEntry<'_>) -> Result<(), Error> {
let entry_key = config.entry_path.key();
let entry_value = serde_json::to_string(&config.entry)?;
let tree = self.tree.clone();
web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await??;
Ok(())
}
async fn update_entry(&self, config: UpdateEntry<'_>) -> Result<(), Error> {
let entry_key = config.entry_path.key();
let entry_value = serde_json::to_string(&config.entry)?;
let tree = self.tree.clone();
web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await??;
Ok(())
}
async fn delete_entry(&self, config: DeleteEntry<'_>) -> Result<(), Error> {
let entry_key = config.entry_path.key();
let tree = self.tree.clone();
web::block(move || tree.remove(entry_key)).await??;
Ok(())
}
pub(crate) async fn collection(
&self,
path: &CollectionPath,
) -> Result<Option<crate::Collection>, Error> {
let collection_key = path.key();
let tree = self.tree.clone();
let opt = web::block(move || tree.get(collection_key.as_bytes())).await??;
match opt {
Some(a) => {
let collection = serde_json::from_slice(&a)?;
Ok(Some(collection))
}
None => Ok(None),
}
}
pub(crate) async fn entry(&self, path: &EntryPath) -> Result<Option<crate::Entry>, Error> {
let entry_key = path.key();
let tree = self.tree.clone();
let opt = web::block(move || tree.get(entry_key.as_bytes())).await??;
match opt {
Some(e) => {
let entry = serde_json::from_slice(&e)?;
Ok(Some(entry))
}
None => Ok(None),
}
}
pub(crate) async fn entries(&self, range: Range<Vec<u8>>) -> Result<Vec<(Uuid, Entry)>, Error> {
let tree = self.tree.clone();
let v = web::block(move || {
tree.range(range)
.map(|res| match res {
Err(e) => Err(e.into()),
Ok((k, v)) => {
let string_key = String::from_utf8_lossy(&k);
let entry_uuid = string_key.split(|b| b == '/').rev().next().unwrap();
let uuid: Uuid = entry_uuid.parse()?;
let string_value = String::from_utf8_lossy(&v);
let entry: Entry = serde_json::from_str(&string_value)?;
Ok((uuid, entry))
}
})
.collect::<Result<Vec<_>, Error>>()
})
.await??;
Ok(v)
}
pub(crate) async fn token(
&self,
path: &CollectionPath,
) -> Result<Option<crate::TokenStorage>, Error> {
let token_key = path.token_key();
let tree = self.tree.clone();
let token_opt = web::block(move || tree.get(token_key.as_bytes())).await??;
match token_opt {
Some(token_ivec) => {
let token = serde_json::from_slice(&token_ivec)?;
Ok(Some(token))
}
None => Ok(None),
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("{0}")]
Json(#[from] serde_json::Error),
#[error("{0}")]
Uuid(#[from] uuid::Error),
#[error("{0}")]
Sled(#[from] sled::Error),
#[error("{0}")]
Transaction(#[from] sled::transaction::TransactionError),
#[error("{0}")]
Bcrypt(#[from] bcrypt::BcryptError),
#[error("Panic in blocking operation")]
Blocking,
}
impl From<actix_web::error::BlockingError> for Error {
fn from(_: actix_web::error::BlockingError) -> Self {
Error::Blocking
}
}