mod embedded; mod job_status; mod schema; use std::sync::Arc; use dashmap::{DashMap, DashSet}; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ deadpool::{BuildError, Pool, PoolError}, AsyncDieselConnectionManager, ManagerConfig, }, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; use tokio::sync::Notify; use tokio_postgres::{AsyncMessage, Notification}; use url::Url; use uuid::Uuid; use crate::{details::Details, error_code::ErrorCode}; use self::job_status::JobStatus; use super::{ Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, VariantAlreadyExists, }; #[derive(Clone)] pub(crate) struct PostgresRepo { inner: Arc, notifications: Arc>, } struct Inner { pool: Pool, queue_notifications: DashMap>, completed_uploads: DashSet, upload_notifier: Notify, } async fn delegate_notifications(receiver: flume::Receiver, inner: Arc) { while let Ok(notification) = receiver.recv_async().await { match notification.channel() { "queue_status_channel" => { // new job inserted for queue let queue_name = notification.payload().to_string(); inner .queue_notifications .entry(queue_name) .or_insert_with(|| Arc::new(Notify::new())) .notify_waiters(); } channel => { tracing::info!( "Unhandled postgres notification: {channel}: {}", notification.payload() ); } } todo!() } tracing::warn!("Notification delegator shutting down"); } #[derive(Debug, thiserror::Error)] pub(crate) enum ConnectPostgresError { #[error("Failed to connect to postgres for migrations")] ConnectForMigration(#[source] tokio_postgres::Error), #[error("Failed to run migrations")] Migration(#[source] refinery::Error), #[error("Failed to build postgres connection pool")] BuildPool(#[source] BuildError), } #[derive(Debug, thiserror::Error)] pub(crate) enum PostgresError { #[error("Error in db pool")] Pool(#[source] PoolError), #[error("Error in database")] Diesel(#[source] diesel::result::Error), #[error("Error deserializing hex value")] Hex(#[source] hex::FromHexError), #[error("Error serializing details")] SerializeDetails(#[source] serde_json::Error), #[error("Error deserializing details")] DeserializeDetails(#[source] serde_json::Error), } impl PostgresError { pub(super) const fn error_code(&self) -> ErrorCode { todo!() } } impl PostgresRepo { pub(crate) async fn connect(postgres_url: Url) -> Result { let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls) .await .map_err(ConnectPostgresError::ConnectForMigration)?; let handle = actix_rt::spawn(conn); embedded::migrations::runner() .run_async(&mut client) .await .map_err(ConnectPostgresError::Migration)?; handle.abort(); let _ = handle.await; let (tx, rx) = flume::bounded(10); let mut config = ManagerConfig::default(); config.custom_setup = build_handler(tx); let mgr = AsyncDieselConnectionManager::::new_with_config( postgres_url, config, ); let pool = Pool::builder(mgr) .build() .map_err(ConnectPostgresError::BuildPool)?; let inner = Arc::new(Inner { pool, queue_notifications: DashMap::new(), completed_uploads: DashSet::new(), upload_notifier: Notify::new(), }); let notifications = Arc::new(actix_rt::spawn(delegate_notifications(rx, inner.clone()))); Ok(PostgresRepo { inner, notifications, }) } } type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; type ConfigFn = Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; fn build_handler(sender: flume::Sender) -> ConfigFn { Box::new( move |config: &str| -> BoxFuture<'_, ConnectionResult> { let sender = sender.clone(); Box::pin(async move { let (client, mut conn) = tokio_postgres::connect(config, tokio_postgres::tls::NoTls) .await .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; // not very cash money (structured concurrency) of me actix_rt::spawn(async move { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { match res { Err(e) => { tracing::error!("Database Connection {e:?}"); return; } Ok(AsyncMessage::Notice(e)) => { tracing::warn!("Database Notice {e:?}"); } Ok(AsyncMessage::Notification(notification)) => { if sender.send_async(notification).await.is_err() { tracing::warn!("Missed notification. Are we shutting down?"); } } Ok(_) => { tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application"); } } } }); AsyncPgConnection::try_from(client).await }) }, ) } fn to_primitive(timestamp: time::OffsetDateTime) -> time::PrimitiveDateTime { let timestamp = timestamp.to_offset(time::UtcOffset::UTC); time::PrimitiveDateTime::new(timestamp.date(), timestamp.time()) } impl BaseRepo for PostgresRepo {} #[async_trait::async_trait(?Send)] impl HashRepo for PostgresRepo { async fn size(&self) -> Result { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let count = hashes .count() .get_result::(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(count.try_into().expect("non-negative count")) } async fn bound(&self, input_hash: Hash) -> Result, RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = hashes .select(created_at) .filter(hash.eq(&input_hash)) .first(&mut conn) .await .map(time::PrimitiveDateTime::assume_utc) .optional() .map_err(PostgresError::Diesel)?; Ok(timestamp.map(|timestamp| OrderedHash { timestamp, hash: input_hash, })) } async fn hash_page_by_date( &self, date: time::OffsetDateTime, limit: usize, ) -> Result { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = to_primitive(date); let ordered_hash = hashes .select((created_at, hash)) .filter(created_at.lt(timestamp)) .order(created_at.desc()) .first::<(time::PrimitiveDateTime, Hash)>(&mut conn) .await .optional() .map_err(PostgresError::Diesel)? .map(|tup| OrderedHash { timestamp: tup.0.assume_utc(), hash: tup.1, }); self.hashes_ordered(ordered_hash, limit).await } async fn hashes_ordered( &self, bound: Option, limit: usize, ) -> Result { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let (mut page, prev) = if let Some(OrderedHash { timestamp, hash: bound_hash, }) = bound { let timestamp = to_primitive(timestamp); let page = hashes .select(hash) .filter(created_at.lt(timestamp)) .or_filter(created_at.eq(timestamp).and(hash.le(&bound_hash))) .order(created_at.desc()) .then_order_by(hash.desc()) .limit(limit as i64 + 1) .load::(&mut conn) .await .map_err(PostgresError::Diesel)?; let prev = hashes .select(hash) .filter(created_at.gt(timestamp)) .or_filter(created_at.eq(timestamp).and(hash.gt(&bound_hash))) .order(created_at) .then_order_by(hash) .offset(limit.saturating_sub(1) as i64) .first::(&mut conn) .await .optional() .map_err(PostgresError::Diesel)?; (page, prev) } else { let page = hashes .select(hash) .order(created_at.desc()) .then_order_by(hash.desc()) .limit(limit as i64 + 1) .load::(&mut conn) .await .map_err(PostgresError::Diesel)?; (page, None) }; let next = if page.len() > limit { page.pop() } else { None }; Ok(HashPage { limit, prev, next, hashes: page, }) } async fn create_hash_with_timestamp( &self, input_hash: Hash, input_identifier: &Arc, timestamp: time::OffsetDateTime, ) -> Result, RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = to_primitive(timestamp); let res = diesel::insert_into(hashes) .values(( hash.eq(&input_hash), identifier.eq(input_identifier.as_ref()), created_at.eq(×tamp), )) .execute(&mut conn) .await; match res { Ok(_) => Ok(Ok(())), Err(diesel::result::Error::DatabaseError( diesel::result::DatabaseErrorKind::UniqueViolation, _, )) => Ok(Err(HashAlreadyExists)), Err(e) => Err(PostgresError::Diesel(e).into()), } } async fn update_identifier( &self, input_hash: Hash, input_identifier: &Arc, ) -> Result<(), RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::update(hashes) .filter(hash.eq(&input_hash)) .set(identifier.eq(input_identifier.as_ref())) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = hashes .select(identifier) .filter(hash.eq(&input_hash)) .get_result::(&mut conn) .await .optional() .map_err(PostgresError::Diesel)?; Ok(opt.map(Arc::from)) } async fn relate_variant_identifier( &self, input_hash: Hash, input_variant: String, input_identifier: &Arc, ) -> Result, RepoError> { use schema::variants::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let res = diesel::insert_into(variants) .values(( hash.eq(&input_hash), variant.eq(&input_variant), identifier.eq(input_identifier.as_ref()), )) .execute(&mut conn) .await; match res { Ok(_) => Ok(Ok(())), Err(diesel::result::Error::DatabaseError( diesel::result::DatabaseErrorKind::UniqueViolation, _, )) => Ok(Err(VariantAlreadyExists)), Err(e) => Err(PostgresError::Diesel(e).into()), } } async fn variant_identifier( &self, input_hash: Hash, input_variant: String, ) -> Result>, RepoError> { use schema::variants::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = variants .select(identifier) .filter(hash.eq(&input_hash)) .filter(variant.eq(&input_variant)) .get_result::(&mut conn) .await .optional() .map_err(PostgresError::Diesel)? .map(Arc::from); Ok(opt) } async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { use schema::variants::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let vec = variants .select((variant, identifier)) .filter(hash.eq(&input_hash)) .get_results::<(String, String)>(&mut conn) .await .map_err(PostgresError::Diesel)? .into_iter() .map(|(s, i)| (s, Arc::from(i))) .collect(); Ok(vec) } async fn remove_variant( &self, input_hash: Hash, input_variant: String, ) -> Result<(), RepoError> { use schema::variants::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(variants) .filter(hash.eq(&input_hash)) .filter(variant.eq(&input_variant)) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn relate_motion_identifier( &self, input_hash: Hash, input_identifier: &Arc, ) -> Result<(), RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::update(hashes) .filter(hash.eq(&input_hash)) .set(motion_identifier.eq(input_identifier.as_ref())) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn motion_identifier(&self, input_hash: Hash) -> Result>, RepoError> { use schema::hashes::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = hashes .select(motion_identifier) .filter(hash.eq(&input_hash)) .get_result::>(&mut conn) .await .optional() .map_err(PostgresError::Diesel)? .flatten() .map(Arc::from); Ok(opt) } async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; conn.transaction(|conn| { Box::pin(async move { diesel::delete(schema::hashes::dsl::hashes) .filter(schema::hashes::dsl::hash.eq(&input_hash)) .execute(conn) .await?; diesel::delete(schema::variants::dsl::variants) .filter(schema::variants::dsl::hash.eq(&input_hash)) .execute(conn) .await }) }) .await .map_err(PostgresError::Diesel)?; Ok(()) } } #[async_trait::async_trait(?Send)] impl AliasRepo for PostgresRepo { async fn create_alias( &self, input_alias: &Alias, delete_token: &DeleteToken, input_hash: Hash, ) -> Result, RepoError> { use schema::aliases::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let res = diesel::insert_into(aliases) .values(( alias.eq(input_alias), hash.eq(&input_hash), token.eq(delete_token), )) .execute(&mut conn) .await; match res { Ok(_) => Ok(Ok(())), Err(diesel::result::Error::DatabaseError( diesel::result::DatabaseErrorKind::UniqueViolation, _, )) => Ok(Err(AliasAlreadyExists)), Err(e) => Err(PostgresError::Diesel(e).into()), } } async fn delete_token(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = aliases .select(token) .filter(alias.eq(input_alias)) .get_result(&mut conn) .await .optional() .map_err(PostgresError::Diesel)?; Ok(opt) } async fn hash(&self, input_alias: &Alias) -> Result, RepoError> { use schema::aliases::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = aliases .select(hash) .filter(alias.eq(input_alias)) .get_result(&mut conn) .await .optional() .map_err(PostgresError::Diesel)?; Ok(opt) } async fn aliases_for_hash(&self, input_hash: Hash) -> Result, RepoError> { use schema::aliases::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let vec = aliases .select(alias) .filter(hash.eq(&input_hash)) .get_results(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(vec) } async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> { use schema::aliases::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(aliases) .filter(alias.eq(input_alias)) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } } #[async_trait::async_trait(?Send)] impl SettingsRepo for PostgresRepo { async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> { use schema::settings::dsl::*; let input_value = hex::encode(input_value); let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::insert_into(settings) .values((key.eq(input_key), value.eq(input_value))) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn get(&self, input_key: &'static str) -> Result>, RepoError> { use schema::settings::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = settings .select(value) .filter(key.eq(input_key)) .get_result::(&mut conn) .await .optional() .map_err(PostgresError::Diesel)? .map(hex::decode) .transpose() .map_err(PostgresError::Hex)? .map(Arc::from); Ok(opt) } async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> { use schema::settings::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(settings) .filter(key.eq(input_key)) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } } #[async_trait::async_trait(?Send)] impl DetailsRepo for PostgresRepo { async fn relate_details( &self, input_identifier: &Arc, input_details: &Details, ) -> Result<(), RepoError> { use schema::details::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let value = serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; diesel::insert_into(details) .values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn details(&self, input_identifier: &Arc) -> Result, RepoError> { use schema::details::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let opt = details .select(json) .filter(identifier.eq(input_identifier.as_ref())) .get_result::(&mut conn) .await .optional() .map_err(PostgresError::Diesel)? .map(serde_json::from_value) .transpose() .map_err(PostgresError::DeserializeDetails)? .map(|inner| Details { inner }); Ok(opt) } async fn cleanup_details(&self, input_identifier: &Arc) -> Result<(), RepoError> { use schema::details::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(details) .filter(identifier.eq(input_identifier.as_ref())) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } } #[async_trait::async_trait(?Send)] impl QueueRepo for PostgresRepo { async fn push( &self, queue_name: &'static str, job_json: serde_json::Value, ) -> Result { use schema::job_queue::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let job_id = diesel::insert_into(job_queue) .values((queue.eq(queue_name), job.eq(job_json))) .returning(id) .get_result::(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(JobId(job_id)) } async fn pop( &self, queue_name: &'static str, worker_id: Uuid, ) -> Result<(JobId, serde_json::Value), RepoError> { use schema::job_queue::dsl::*; loop { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let notifier: Arc = self .inner .queue_notifications .entry(String::from(queue_name)) .or_insert_with(|| Arc::new(Notify::new())) .clone(); diesel::sql_query("LISTEN queue_status_channel;") .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; let timestamp = to_primitive(time::OffsetDateTime::now_utc()); diesel::update(job_queue) .filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2)))) .set(( heartbeat.eq(Option::::None), status.eq(JobStatus::New), )) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; // TODO: for_update().skip_locked() let id_query = job_queue .select(id) .filter(status.eq(JobStatus::New).and(queue.eq(queue_name))) .order(queue_time) .into_boxed() .single_value(); let opt = diesel::update(job_queue) .filter(id.nullable().eq(id_query)) .set(( heartbeat.eq(timestamp), status.eq(JobStatus::Running), worker.eq(worker_id), )) .returning((id, job)) .get_result(&mut conn) .await .optional() .map_err(PostgresError::Diesel)?; if let Some((job_id, job_json)) = opt { diesel::sql_query("UNLISTEN queue_status_channel;") .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; return Ok((JobId(job_id), job_json)); } let _ = actix_rt::time::timeout(std::time::Duration::from_secs(5), notifier.notified()) .await; diesel::sql_query("UNLISTEN queue_status_channel;") .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; drop(conn); } } async fn heartbeat( &self, queue_name: &'static str, worker_id: Uuid, job_id: JobId, ) -> Result<(), RepoError> { use schema::job_queue::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let timestamp = to_primitive(time::OffsetDateTime::now_utc()); diesel::update(job_queue) .filter( id.eq(job_id.0) .and(queue.eq(queue_name)) .and(worker.eq(worker_id)), ) .set(heartbeat.eq(timestamp)) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn complete_job( &self, queue_name: &'static str, worker_id: Uuid, job_id: JobId, ) -> Result<(), RepoError> { use schema::job_queue::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(job_queue) .filter( id.eq(job_id.0) .and(queue.eq(queue_name)) .and(worker.eq(worker_id)), ) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } } #[async_trait::async_trait(?Send)] impl StoreMigrationRepo for PostgresRepo { async fn is_continuing_migration(&self) -> Result { use schema::store_migrations::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let count = store_migrations .count() .get_result::(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(count > 0) } async fn mark_migrated( &self, input_old_identifier: &Arc, input_new_identifier: &Arc, ) -> Result<(), RepoError> { use schema::store_migrations::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::insert_into(store_migrations) .values(( old_identifier.eq(input_old_identifier.as_ref()), new_identifier.eq(input_new_identifier.as_ref()), )) .on_conflict((old_identifier, new_identifier)) .do_nothing() .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } async fn is_migrated(&self, input_old_identifier: &Arc) -> Result { use schema::store_migrations::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let b = diesel::select(diesel::dsl::exists( store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())), )) .get_result(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(b) } async fn clear(&self) -> Result<(), RepoError> { use schema::store_migrations::dsl::*; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; diesel::delete(store_migrations) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; Ok(()) } } impl std::fmt::Debug for PostgresRepo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PostgresRepo") .field("pool", &"pool") .finish() } }