From d9d5ac53883213387f2456c689601a021da545a2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 30 Mar 2024 14:11:12 -0500 Subject: [PATCH] Make postgres work --- src/generate.rs | 49 ++++++++++------- src/lib.rs | 1 + src/repo.rs | 6 +++ src/repo/postgres.rs | 122 ++++++++++++++++++++++++++++++------------- src/repo/sled.rs | 5 ++ 5 files changed, 128 insertions(+), 55 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index a16e36f..91feba1 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -69,7 +69,7 @@ pub(crate) async fn generate( let variant = thumbnail_path.to_string_lossy().to_string(); let mut attempts = 0; - let (details, identifier) = loop { + let tup = loop { if attempts > 4 { todo!("return error"); } @@ -91,31 +91,44 @@ pub(crate) async fn generate( ) .with_poll_timer("process-future"); - let tuple = heartbeat(state, hash.clone(), variant.clone(), process_future) + let res = heartbeat(state, hash.clone(), variant.clone(), process_future) .with_poll_timer("heartbeat-future") - .await??; + .await; - break tuple; + match res { + Ok(Ok(tuple)) => break tuple, + Ok(Err(e)) | Err(e) => { + state + .repo + .fail_variant(hash.clone(), variant.clone()) + .await?; + + return Err(e); + } + } } - Err(_) => match state - .repo - .await_variant(hash.clone(), variant.clone()) - .await? - { - Some(identifier) => { - let details = crate::ensure_details_identifier(state, &identifier).await?; + Err(_) => { + match state + .repo + .await_variant(hash.clone(), variant.clone()) + .await? + { + Some(identifier) => { + let details = + crate::ensure_details_identifier(state, &identifier).await?; - break (details, identifier); + break (details, identifier); + } + None => { + attempts += 1; + continue; + } } - None => { - attempts += 1; - continue; - } - }, + } } }; - Ok((details, identifier)) + Ok(tup) } } diff --git a/src/lib.rs b/src/lib.rs index 65a4d79..fd89448 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,6 +121,7 @@ async fn ensure_details( ensure_details_identifier(state, &identifier).await } +#[tracing::instrument(skip(state))] async fn ensure_details_identifier( state: &State, identifier: &Arc, diff --git a/src/repo.rs b/src/repo.rs index 597f4e5..81af5ec 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -748,6 +748,8 @@ pub(crate) trait VariantRepo: BaseRepo { async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>; + async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; + async fn await_variant( &self, hash: Hash, @@ -789,6 +791,10 @@ where T::variant_heartbeat(self, hash, variant).await } + async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + T::fail_variant(self, hash, variant).await + } + async fn await_variant( &self, hash: Hash, diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index fc489ec..466d2bf 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -21,6 +21,7 @@ use diesel_async::{ bb8::{Pool, PooledConnection, RunError}, AsyncDieselConnectionManager, ManagerConfig, PoolError, }, + scoped_futures::ScopedFutureExt, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; use futures_core::Stream; @@ -133,7 +134,7 @@ pub(crate) enum PostgresError { Pool(#[source] RunError), #[error("Error in database")] - Diesel(#[source] diesel::result::Error), + Diesel(#[from] diesel::result::Error), #[error("Error deserializing hex value")] Hex(#[source] hex::FromHexError), @@ -404,6 +405,16 @@ impl PostgresRepo { let mut conn = self.get_connection().await?; + let timestamp = to_primitive(time::OffsetDateTime::now_utc()); + + diesel::delete(keyed_notifications) + .filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2)))) + .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + let res = diesel::insert_into(keyed_notifications) .values(key.eq(input_key)) .execute(&mut conn) @@ -468,7 +479,7 @@ impl PostgresRepo { async fn register_interest(&self) -> Result<(), PostgresError> { let mut notifier_conn = self.get_notifier_connection().await?; - diesel::sql_query("LISTEN upload_completion_channel;") + diesel::sql_query("LISTEN keyed_notification_channel;") .execute(&mut notifier_conn) .with_timeout(Duration::from_secs(5)) .await @@ -478,7 +489,7 @@ impl PostgresRepo { Ok(()) } - async fn clear_keyed_notifier(&self, input_key: &str) -> Result<(), PostgresError> { + async fn clear_keyed_notifier(&self, input_key: String) -> Result<(), PostgresError> { use schema::keyed_notifications::dsl::*; let mut conn = self.get_connection().await?; @@ -650,8 +661,8 @@ impl<'a> KeyedNotifierState<'a> { if let Some(notification_entry) = self .inner .keyed_notifications - .remove(key) - .and_then(|(_, weak)| weak.upgrade()) + .get(key) + .and_then(|weak| weak.upgrade()) { notification_entry.notify.notify_waiters(); } @@ -1165,16 +1176,23 @@ impl VariantRepo for PostgresRepo { .map_err(Into::into) } + #[tracing::instrument(level = "trace", skip(self))] + async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + let key = format!("{}{variant}", hash.to_base64()); + + self.clear_keyed_notifier(key).await.map_err(Into::into) + } + #[tracing::instrument(level = "debug", skip(self))] async fn await_variant( &self, hash: Hash, variant: String, ) -> Result>, RepoError> { - let key = Arc::from(format!("{}{}", hash.to_base64(), variant.clone())); + let key = Arc::from(format!("{}{variant}", hash.to_base64())); let listener = self.listen_on_key(key); - let notified = listener.notified_timeout(Duration::from_secs(10)); + let notified = listener.notified_timeout(Duration::from_secs(5)); self.register_interest().await?; @@ -1200,36 +1218,60 @@ impl VariantRepo for PostgresRepo { input_variant: String, input_identifier: &Arc, ) -> Result, RepoError> { - use schema::variants::dsl::*; - let mut conn = self.get_connection().await?; - let res = diesel::insert_into(variants) - .values(( - hash.eq(&input_hash), - variant.eq(&input_variant), - identifier.eq(input_identifier.as_ref()), - )) - .execute(&mut conn) - .with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER) - .with_timeout(Duration::from_secs(5)) - .await - .map_err(|_| PostgresError::DbTimeout)?; + conn.transaction(|conn| { + async move { + let res = async { + use schema::variants::dsl::*; - let key = format!("{}{}", input_hash.to_base64(), input_variant.clone()); - match self.clear_keyed_notifier(&key).await { - Ok(()) => {} - Err(e) => tracing::warn!("Failed to clear notifier: {e}"), - } + diesel::insert_into(variants) + .values(( + hash.eq(&input_hash), + variant.eq(&input_variant), + identifier.eq(input_identifier.to_string()), + )) + .execute(conn) + .with_metrics( + crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER, + ) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout) + } + .await; - match res { - Ok(_) => Ok(Ok(())), - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - _, - )) => Ok(Err(VariantAlreadyExists)), - Err(e) => Err(PostgresError::Diesel(e).into()), - } + let notification_res = async { + use schema::keyed_notifications::dsl::*; + + let input_key = format!("{}{input_variant}", input_hash.to_base64()); + diesel::delete(keyed_notifications) + .filter(key.eq(input_key)) + .execute(conn) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout) + } + .await; + + match notification_res? { + Ok(_) => {} + Err(e) => tracing::warn!("Failed to clear notifier: {e}"), + } + + match res? { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(VariantAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e)), + } + } + .scope_boxed() + }) + .await + .map_err(PostgresError::into) } #[tracing::instrument(level = "debug", skip(self))] @@ -1500,16 +1542,22 @@ impl DetailsRepo for PostgresRepo { let value = serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?; - diesel::insert_into(details) + let res = diesel::insert_into(details) .values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE) .with_timeout(Duration::from_secs(5)) .await - .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map_err(|_| PostgresError::DbTimeout)?; - Ok(()) + match res { + Ok(_) + | Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(()), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } #[tracing::instrument(level = "debug", skip(self))] diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 5d6af47..659d23d 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1462,6 +1462,11 @@ impl VariantRepo for SledRepo { todo!() } + #[tracing::instrument(level = "trace", skip(self))] + async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { + todo!() + } + #[tracing::instrument(level = "trace", skip(self))] async fn await_variant( &self,