399 lines
12 KiB
Rust
399 lines
12 KiB
Rust
use crate::store::{count, StoreError};
|
|
use chrono::{DateTime, Utc};
|
|
use sled::{Db, Transactional, Tree};
|
|
use uuid::Uuid;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct NotificationStore {
|
|
notification_type: String,
|
|
|
|
notifications: Tree,
|
|
dates: Tree,
|
|
profiles: Tree,
|
|
counts: Tree,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct NotificationKeys {
|
|
notification_type: String,
|
|
}
|
|
|
|
impl NotificationKeys {
|
|
fn notification_key(
|
|
&self,
|
|
profile_id: Uuid,
|
|
published: DateTime<Utc>,
|
|
notification_id: Uuid,
|
|
) -> String {
|
|
format!(
|
|
"/profile/{}/created/{}/{}/{}",
|
|
profile_id,
|
|
published.to_rfc3339(),
|
|
self.notification_type,
|
|
notification_id
|
|
)
|
|
}
|
|
|
|
fn date_key(&self, profile_id: Uuid, notification_id: Uuid) -> String {
|
|
format!(
|
|
"/profile/{}/{}/{}",
|
|
profile_id, self.notification_type, notification_id
|
|
)
|
|
}
|
|
|
|
fn notification_prefix(&self, profile_id: Uuid) -> String {
|
|
format!("/profile/{}", profile_id)
|
|
}
|
|
|
|
fn profile_key(&self, notification_id: Uuid, profile_id: Uuid) -> String {
|
|
format!(
|
|
"/{}/{}/profile/{}",
|
|
self.notification_type, notification_id, profile_id
|
|
)
|
|
}
|
|
|
|
fn profile_prefix(&self, notification_id: Uuid) -> String {
|
|
format!("/{}/{}/profile", self.notification_type, notification_id)
|
|
}
|
|
|
|
fn counts_key(&self, profile_id: Uuid) -> String {
|
|
format!("/profile/{}/{}/counts", profile_id, self.notification_type)
|
|
}
|
|
}
|
|
|
|
impl NotificationStore {
|
|
fn keys(&self) -> NotificationKeys {
|
|
NotificationKeys {
|
|
notification_type: self.notification_type.clone(),
|
|
}
|
|
}
|
|
|
|
pub(super) fn build(notification_type: &str, db: &Db) -> Result<Self, sled::Error> {
|
|
Ok(NotificationStore {
|
|
notifications: db
|
|
.open_tree(&format!("/profiles/notifications/{}", notification_type))?,
|
|
dates: db.open_tree(&format!(
|
|
"/profiles/notifications/{}/dates",
|
|
notification_type
|
|
))?,
|
|
profiles: db.open_tree(&format!(
|
|
"/profiles/notifications/{}/profiles",
|
|
notification_type
|
|
))?,
|
|
counts: db.open_tree(&format!(
|
|
"/profiles/notifications/{}/counts",
|
|
notification_type
|
|
))?,
|
|
|
|
notification_type: notification_type.to_owned(),
|
|
})
|
|
}
|
|
|
|
pub fn new(&self, profile_to_notify: Uuid, notification_id: Uuid, published: DateTime<Utc>) {
|
|
let keys = self.keys();
|
|
|
|
let res = [
|
|
&self.notifications,
|
|
&self.dates,
|
|
&self.profiles,
|
|
&self.counts,
|
|
]
|
|
.transaction(move |trees| {
|
|
let notifications = &trees[0];
|
|
let dates = &trees[1];
|
|
let profiles = &trees[2];
|
|
let counts = &trees[3];
|
|
|
|
notifications.insert(
|
|
keys.notification_key(profile_to_notify, published, notification_id)
|
|
.as_bytes(),
|
|
notification_id.as_bytes(),
|
|
)?;
|
|
dates.insert(
|
|
keys.date_key(profile_to_notify, notification_id).as_bytes(),
|
|
published.to_rfc3339().as_bytes(),
|
|
)?;
|
|
profiles.insert(
|
|
keys.profile_key(notification_id, profile_to_notify)
|
|
.as_bytes(),
|
|
profile_to_notify.as_bytes(),
|
|
)?;
|
|
count(counts, &keys.counts_key(profile_to_notify), |c| {
|
|
c.saturating_add(1)
|
|
})?;
|
|
|
|
Ok(())
|
|
});
|
|
|
|
if let Err(e) = res {
|
|
log::error!(
|
|
"Failed to notify user {} of notif: {}",
|
|
profile_to_notify,
|
|
e
|
|
);
|
|
}
|
|
}
|
|
|
|
pub fn count(&self, profile_id: Uuid) -> Result<u64, StoreError> {
|
|
let keys = self.keys();
|
|
if let Some(count) = self.counts.get(keys.counts_key(profile_id))? {
|
|
let count: u64 = String::from_utf8_lossy(&count).parse().unwrap_or(0);
|
|
Ok(count)
|
|
} else {
|
|
Ok(0)
|
|
}
|
|
}
|
|
|
|
pub(crate) fn remove(&self, notification_id: Uuid) {
|
|
for profile_id in self.profiles(notification_id) {
|
|
let keys = self.keys();
|
|
|
|
let res = [
|
|
&self.notifications,
|
|
&self.dates,
|
|
&self.profiles,
|
|
&self.counts,
|
|
]
|
|
.transaction(move |trees| {
|
|
let notifications = &trees[0];
|
|
let dates = &trees[1];
|
|
let profiles = &trees[2];
|
|
let counts = &trees[3];
|
|
|
|
if let Some(published) =
|
|
dates.remove(keys.date_key(profile_id, notification_id).as_bytes())?
|
|
{
|
|
if let Ok(published) =
|
|
String::from_utf8_lossy(&published).parse::<DateTime<Utc>>()
|
|
{
|
|
notifications.remove(
|
|
keys.notification_key(profile_id, published, notification_id)
|
|
.as_bytes(),
|
|
)?;
|
|
}
|
|
}
|
|
|
|
profiles.remove(keys.profile_key(notification_id, profile_id).as_bytes())?;
|
|
|
|
count(counts, &keys.counts_key(profile_id), |c| {
|
|
c.saturating_sub(1)
|
|
})?;
|
|
|
|
Ok(())
|
|
});
|
|
|
|
if let Err(e) = res {
|
|
log::error!("Error removing notif for profile {}: {}", profile_id, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn clear(
|
|
&self,
|
|
profile_id: Uuid,
|
|
notification_ids: Option<Vec<Uuid>>,
|
|
) -> Result<(), StoreError> {
|
|
if let Some(notification_ids) = notification_ids {
|
|
let keys = self.keys();
|
|
|
|
let date_keys = notification_ids
|
|
.into_iter()
|
|
.map(|id| {
|
|
(
|
|
id,
|
|
keys.date_key(profile_id, id),
|
|
keys.profile_key(id, profile_id),
|
|
)
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
let to_remove = date_keys.len() as u64;
|
|
|
|
self.counts.transaction(move |counts| {
|
|
count(counts, &keys.counts_key(profile_id), |c| {
|
|
c.saturating_sub(to_remove)
|
|
})?;
|
|
Ok(())
|
|
})?;
|
|
|
|
for (notification_id, date_key, profile_key) in date_keys {
|
|
let keys = self.keys();
|
|
|
|
let res =
|
|
[&self.notifications, &self.dates, &self.profiles].transaction(move |trees| {
|
|
let notifications = &trees[0];
|
|
let dates = &trees[1];
|
|
let profiles = &trees[2];
|
|
|
|
if let Some(published) = dates.remove(date_key.as_bytes())? {
|
|
if let Ok(published) =
|
|
String::from_utf8_lossy(&published).parse::<DateTime<Utc>>()
|
|
{
|
|
notifications.remove(
|
|
keys.notification_key(profile_id, published, notification_id)
|
|
.as_bytes(),
|
|
)?;
|
|
}
|
|
}
|
|
|
|
profiles.remove(profile_key.as_bytes())?;
|
|
|
|
Ok(()) as Result<(), sled::transaction::ConflictableTransactionError>
|
|
});
|
|
|
|
if let Err(e) = res {
|
|
log::error!("Error removing notif for profile {}, {}", profile_id, e);
|
|
}
|
|
}
|
|
} else {
|
|
let notification_keys = self.notification_keys(profile_id).collect::<Vec<_>>();
|
|
let date_keys = self.date_keys(profile_id).collect::<Vec<_>>();
|
|
|
|
let to_remove = notification_keys.len() as u64;
|
|
|
|
let keys = self.keys();
|
|
self.counts.transaction(move |counts| {
|
|
count(counts, &keys.counts_key(profile_id), |c| {
|
|
c.saturating_sub(to_remove)
|
|
})?;
|
|
Ok(())
|
|
})?;
|
|
|
|
let keys = self.keys();
|
|
for key in notification_keys {
|
|
match self.notifications.remove(key) {
|
|
Ok(Some(ivec)) => {
|
|
let notification_id = match Uuid::from_slice(&ivec) {
|
|
Ok(id) => id,
|
|
Err(_) => continue,
|
|
};
|
|
if let Err(e) = self
|
|
.profiles
|
|
.remove(keys.profile_key(notification_id, profile_id))
|
|
{
|
|
log::error!("Error clearing notif for profile {}: {}", profile_id, e);
|
|
}
|
|
}
|
|
Ok(None) => (),
|
|
Err(e) => log::error!("Error clearing notif for profile {}: {}", profile_id, e),
|
|
}
|
|
}
|
|
|
|
for key in date_keys {
|
|
if let Err(e) = self.dates.remove(key) {
|
|
log::error!(
|
|
"Error clearing notif date for profile {}: {}",
|
|
profile_id,
|
|
e
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn for_profile(&self, profile_id: Uuid) -> impl DoubleEndedIterator<Item = Uuid> {
|
|
let keys = self.keys();
|
|
|
|
self.notifications
|
|
.scan_prefix(keys.notification_prefix(profile_id))
|
|
.values()
|
|
.filter_map(|res| res.ok())
|
|
.filter_map(uuid_from_ivec)
|
|
.rev()
|
|
}
|
|
|
|
pub fn older_than_for_profile<'a>(
|
|
&'a self,
|
|
profile_id: Uuid,
|
|
notification_id: Uuid,
|
|
) -> impl DoubleEndedIterator<Item = Uuid> + 'a {
|
|
let keys = self.keys();
|
|
|
|
self.dates
|
|
.get(keys.date_key(profile_id, notification_id))
|
|
.ok()
|
|
.and_then(|opt| opt)
|
|
.and_then(date_from_ivec)
|
|
.into_iter()
|
|
.flat_map(move |date| {
|
|
let keys = keys.clone();
|
|
self.notifications
|
|
.range(
|
|
..keys
|
|
.notification_key(profile_id, date, notification_id)
|
|
.as_bytes(),
|
|
)
|
|
.values()
|
|
.filter_map(|res| res.ok())
|
|
.filter_map(uuid_from_ivec)
|
|
})
|
|
.rev()
|
|
}
|
|
|
|
pub fn newer_than_for_profile<'a>(
|
|
&'a self,
|
|
profile_id: Uuid,
|
|
notification_id: Uuid,
|
|
) -> impl DoubleEndedIterator<Item = Uuid> + 'a {
|
|
let keys = self.keys();
|
|
|
|
self.dates
|
|
.get(keys.date_key(profile_id, notification_id))
|
|
.ok()
|
|
.and_then(|opt| opt)
|
|
.and_then(date_from_ivec)
|
|
.into_iter()
|
|
.flat_map(move |date| {
|
|
self.notifications
|
|
.range(
|
|
keys.notification_key(profile_id, date, notification_id)
|
|
.as_bytes()..,
|
|
)
|
|
.values()
|
|
.filter_map(|res| res.ok())
|
|
.filter_map(uuid_from_ivec)
|
|
})
|
|
}
|
|
|
|
fn notification_keys(&self, profile_id: Uuid) -> impl DoubleEndedIterator<Item = sled::IVec> {
|
|
let keys = self.keys();
|
|
|
|
self.notifications
|
|
.scan_prefix(keys.notification_prefix(profile_id))
|
|
.keys()
|
|
.filter_map(|res| res.ok())
|
|
}
|
|
|
|
fn date_keys(&self, profile_id: Uuid) -> impl DoubleEndedIterator<Item = sled::IVec> {
|
|
let keys = self.keys();
|
|
|
|
self.dates
|
|
.scan_prefix(keys.notification_prefix(profile_id))
|
|
.keys()
|
|
.filter_map(|res| res.ok())
|
|
}
|
|
|
|
fn profiles(&self, notification_id: Uuid) -> impl DoubleEndedIterator<Item = Uuid> {
|
|
let keys = self.keys();
|
|
|
|
self.profiles
|
|
.scan_prefix(keys.profile_prefix(notification_id))
|
|
.values()
|
|
.filter_map(|res| res.ok())
|
|
.filter_map(uuid_from_ivec)
|
|
}
|
|
}
|
|
|
|
fn uuid_from_ivec(ivec: sled::IVec) -> Option<Uuid> {
|
|
Uuid::from_slice(&ivec)
|
|
.map_err(|e| log::warn!("Failed to parse ivec {}", e))
|
|
.ok()
|
|
}
|
|
|
|
fn date_from_ivec(ivec: sled::IVec) -> Option<DateTime<Utc>> {
|
|
let s = String::from_utf8_lossy(&ivec);
|
|
s.parse().ok()
|
|
}
|