419 lines
12 KiB
Rust
419 lines
12 KiB
Rust
use crate::{Collection, CollectionPath, Direction, Entry, EntryPath, MoveEntryPath, Token};
|
|
use actix_web::web;
|
|
use sled::{Db, Tree};
|
|
use std::{
|
|
collections::BTreeMap,
|
|
ops::RangeInclusive,
|
|
sync::{
|
|
atomic::{AtomicU64, Ordering},
|
|
Arc,
|
|
},
|
|
};
|
|
use uuid::Uuid;
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct Store {
|
|
healthz: Tree,
|
|
healthz_counter: Arc<AtomicU64>,
|
|
tree: Tree,
|
|
}
|
|
|
|
impl std::fmt::Debug for Store {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("Store").finish()
|
|
}
|
|
}
|
|
|
|
pub(crate) struct CreateCollection<'a> {
|
|
pub(crate) collection_path: &'a CollectionPath,
|
|
pub(crate) collection: &'a Collection,
|
|
pub(crate) token: &'a Token,
|
|
}
|
|
|
|
pub(crate) struct UpdateCollection<'a> {
|
|
pub(crate) collection_path: &'a CollectionPath,
|
|
pub(crate) collection: &'a Collection,
|
|
}
|
|
|
|
pub(crate) struct DeleteCollection<'a> {
|
|
pub(crate) collection_path: &'a CollectionPath,
|
|
}
|
|
|
|
pub(crate) struct CreateEntry<'a> {
|
|
pub(crate) entry_path: &'a EntryPath,
|
|
pub(crate) entry: &'a Entry,
|
|
}
|
|
|
|
pub(crate) struct MoveEntry<'a> {
|
|
pub(crate) move_entry_path: &'a MoveEntryPath,
|
|
}
|
|
|
|
pub(crate) struct GetEntry<'a> {
|
|
pub(crate) entry_path: &'a EntryPath,
|
|
}
|
|
|
|
pub(crate) struct UpdateEntry<'a> {
|
|
pub(crate) entry_path: &'a EntryPath,
|
|
pub(crate) entry: &'a Entry,
|
|
}
|
|
|
|
pub(crate) struct DeleteEntry<'a> {
|
|
pub(crate) entry_path: &'a EntryPath,
|
|
}
|
|
|
|
impl<'a> CreateCollection<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.create_collection(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> UpdateCollection<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.update_collection(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> DeleteCollection<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.delete_collection(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> CreateEntry<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.create_entry(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> MoveEntry<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.move_entry(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> GetEntry<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<Entry, Error> {
|
|
store.get_entry(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> UpdateEntry<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.update_entry(self).await
|
|
}
|
|
}
|
|
|
|
impl<'a> DeleteEntry<'a> {
|
|
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
|
|
store.delete_entry(self).await
|
|
}
|
|
}
|
|
|
|
impl Store {
|
|
pub(crate) fn new(db: &Db) -> Result<Self, sled::Error> {
|
|
Ok(Store {
|
|
healthz: db.open_tree("healthz")?,
|
|
healthz_counter: Arc::new(AtomicU64::new(0)),
|
|
tree: db.open_tree("collections")?,
|
|
})
|
|
}
|
|
|
|
pub(crate) async fn check_health(&self) -> Result<(), Error> {
|
|
let next = self.healthz_counter.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let healthz = self.healthz.clone();
|
|
web::block(move || healthz.insert("healthz", &next.to_be_bytes()[..])).await??;
|
|
|
|
self.healthz.flush_async().await?;
|
|
|
|
let healthz = self.healthz.clone();
|
|
web::block(move || healthz.get("healthz")).await??;
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_collection(&self, config: CreateCollection<'_>) -> Result<(), Error> {
|
|
let collection_key = config.collection_path.key();
|
|
let collection_value = serde_json::to_string(&config.collection)?;
|
|
|
|
let token_key = config.collection_path.token_key();
|
|
let token2 = config.token.clone();
|
|
let token_value = serde_json::to_string(&web::block(move || token2.hash()).await??)?;
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || {
|
|
tree.transaction(move |tree| {
|
|
tree.insert(collection_key.as_bytes(), collection_value.as_bytes())?;
|
|
tree.insert(token_key.as_bytes(), token_value.as_bytes())?;
|
|
Ok(())
|
|
})
|
|
})
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn update_collection(&self, config: UpdateCollection<'_>) -> Result<(), Error> {
|
|
let collection_key = config.collection_path.key();
|
|
let collection_value = serde_json::to_string(&config.collection)?;
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || tree.insert(collection_key.as_bytes(), collection_value.as_bytes()))
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_collection(&self, config: DeleteCollection<'_>) -> Result<(), Error> {
|
|
let entry_range = config.collection_path.entry_range();
|
|
let token_key = config.collection_path.token_key();
|
|
let collection_key = config.collection_path.key();
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || {
|
|
let entries = tree
|
|
.range(entry_range)
|
|
.keys()
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
|
|
tree.transaction(move |tree| {
|
|
for key in &entries {
|
|
tree.remove(key)?;
|
|
}
|
|
tree.remove(token_key.as_bytes())?;
|
|
tree.remove(collection_key.as_bytes())?;
|
|
Ok(())
|
|
})?;
|
|
|
|
Ok(()) as Result<(), Error>
|
|
})
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_entry(&self, config: CreateEntry<'_>) -> Result<(), Error> {
|
|
let entry_key = config.entry_path.key();
|
|
let entry_value = serde_json::to_string(&config.entry)?;
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn move_entry(&self, config: MoveEntry<'_>) -> Result<(), Error> {
|
|
let order_key = config.move_entry_path.order_key();
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
let order_key_2 = order_key.clone();
|
|
let order = web::block(move || tree.get(order_key_2)).await??;
|
|
|
|
let order_key_2 = order_key.clone();
|
|
let entries = self
|
|
.entries(order_key_2, config.move_entry_path.entry_range())
|
|
.await?;
|
|
let iter = entries.into_iter().map(|(k, _)| k);
|
|
let mut order = if let Some(order) = order {
|
|
let mut v = serde_json::from_slice::<Vec<Uuid>>(&order)?;
|
|
|
|
let iter = iter.filter(|uuid| !v.contains(uuid)).collect::<Vec<_>>();
|
|
|
|
v.extend(iter);
|
|
|
|
v
|
|
} else {
|
|
iter.collect()
|
|
};
|
|
|
|
let entry_uuid = config.move_entry_path.entry;
|
|
let direction = config.move_entry_path.direction;
|
|
|
|
for i in 0..(order.len() - 1) {
|
|
if order[i + 1] == entry_uuid && direction == Direction::Up {
|
|
order[i + 1] = order[i];
|
|
order[i] = entry_uuid;
|
|
break;
|
|
} else if order[i] == entry_uuid && direction == Direction::Down {
|
|
order[i] = order[i + 1];
|
|
order[i + 1] = entry_uuid;
|
|
break;
|
|
}
|
|
}
|
|
|
|
let order_vec = serde_json::to_vec(&order)?;
|
|
let tree = self.tree.clone();
|
|
web::block(move || tree.insert(order_key, order_vec)).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_entry(&self, config: GetEntry<'_>) -> Result<Entry, Error> {
|
|
let entry_key = config.entry_path.key();
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
let entry = web::block(move || tree.get(entry_key.as_bytes())).await??;
|
|
|
|
if let Some(entry) = entry {
|
|
let entry = serde_json::from_slice(&entry)?;
|
|
|
|
Ok(entry)
|
|
} else {
|
|
Err(Error::Missing)
|
|
}
|
|
}
|
|
|
|
async fn update_entry(&self, config: UpdateEntry<'_>) -> Result<(), Error> {
|
|
let entry_key = config.entry_path.key();
|
|
let entry_value = serde_json::to_string(&config.entry)?;
|
|
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || tree.insert(entry_key.as_bytes(), entry_value.as_bytes())).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_entry(&self, config: DeleteEntry<'_>) -> Result<(), Error> {
|
|
let entry_key = config.entry_path.key();
|
|
let tree = self.tree.clone();
|
|
|
|
web::block(move || tree.remove(entry_key)).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn collection(
|
|
&self,
|
|
path: &CollectionPath,
|
|
) -> Result<Option<crate::Collection>, Error> {
|
|
let collection_key = path.key();
|
|
let tree = self.tree.clone();
|
|
|
|
let opt = web::block(move || tree.get(collection_key.as_bytes())).await??;
|
|
|
|
match opt {
|
|
Some(a) => {
|
|
let collection = serde_json::from_slice(&a)?;
|
|
Ok(Some(collection))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn entry(&self, path: &EntryPath) -> Result<Option<crate::Entry>, Error> {
|
|
let entry_key = path.key();
|
|
let tree = self.tree.clone();
|
|
|
|
let opt = web::block(move || tree.get(entry_key.as_bytes())).await??;
|
|
|
|
match opt {
|
|
Some(e) => {
|
|
let entry = serde_json::from_slice(&e)?;
|
|
Ok(Some(entry))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn entries(
|
|
&self,
|
|
order_key: String,
|
|
range: RangeInclusive<Vec<u8>>,
|
|
) -> Result<Vec<(Uuid, Entry)>, Error> {
|
|
let tree = self.tree.clone();
|
|
|
|
let v = web::block(move || {
|
|
let mut hashmap = tree
|
|
.range(range)
|
|
.map(|res| match res {
|
|
Err(e) => Err(e.into()),
|
|
Ok((k, v)) => {
|
|
let string_key = String::from_utf8_lossy(&k);
|
|
let entry_uuid = string_key.split(|b| b == '/').rev().next().unwrap();
|
|
let uuid: Uuid = entry_uuid.parse()?;
|
|
|
|
let entry: Entry = serde_json::from_slice(&v)?;
|
|
|
|
Ok((uuid, entry))
|
|
}
|
|
})
|
|
.collect::<Result<BTreeMap<Uuid, Entry>, Error>>()?;
|
|
|
|
let order = if let Some(order) = tree.get(&order_key)? {
|
|
serde_json::from_slice::<Vec<Uuid>>(&order)?
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
let mut vec = order.into_iter().fold(Vec::new(), |mut vec, uuid| {
|
|
if let Some(entry) = hashmap.remove(&uuid) {
|
|
vec.push((uuid, entry));
|
|
vec
|
|
} else {
|
|
vec
|
|
}
|
|
});
|
|
|
|
vec.extend(hashmap);
|
|
|
|
Ok(vec) as Result<_, Error>
|
|
})
|
|
.await??;
|
|
|
|
Ok(v)
|
|
}
|
|
|
|
pub(crate) async fn token(
|
|
&self,
|
|
path: &CollectionPath,
|
|
) -> Result<Option<crate::TokenStorage>, Error> {
|
|
let token_key = path.token_key();
|
|
let tree = self.tree.clone();
|
|
|
|
let token_opt = web::block(move || tree.get(token_key.as_bytes())).await??;
|
|
|
|
match token_opt {
|
|
Some(token_ivec) => {
|
|
let token = serde_json::from_slice(&token_ivec)?;
|
|
Ok(Some(token))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub(crate) enum Error {
|
|
#[error("{0}")]
|
|
Json(#[from] serde_json::Error),
|
|
|
|
#[error("{0}")]
|
|
Uuid(#[from] uuid::Error),
|
|
|
|
#[error("{0}")]
|
|
Sled(#[from] sled::Error),
|
|
|
|
#[error("{0}")]
|
|
Transaction(#[from] sled::transaction::TransactionError),
|
|
|
|
#[error("{0}")]
|
|
Bcrypt(#[from] bcrypt::BcryptError),
|
|
|
|
#[error("Panic in blocking operation")]
|
|
Blocking,
|
|
|
|
#[error("Requested entry is missing")]
|
|
Missing,
|
|
}
|
|
|
|
impl From<actix_web::error::BlockingError> for Error {
|
|
fn from(_: actix_web::error::BlockingError) -> Self {
|
|
Error::Blocking
|
|
}
|
|
}
|