From a4b1ab7dfb0907fca35272cfc47867f6efcc323c Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Sep 2023 21:51:27 -0500 Subject: [PATCH] Instrument postgres db calls --- src/error.rs | 28 +++++- src/future.rs | 75 ++++++++++++++ src/init_tracing.rs | 7 +- src/lib.rs | 4 +- src/queue.rs | 15 ++- src/queue/cleanup.rs | 3 +- src/queue/process.rs | 3 +- src/repo.rs | 10 +- src/repo/postgres.rs | 234 +++++++++++++++++++++++++++++++++++++++---- src/store.rs | 8 ++ 10 files changed, 350 insertions(+), 37 deletions(-) create mode 100644 src/future.rs diff --git a/src/error.rs b/src/error.rs index de2d9a3..f4ff1ed 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix_web::{http::StatusCode, HttpResponse, ResponseError}; use color_eyre::Report; @@ -5,6 +7,8 @@ use crate::error_code::ErrorCode; pub(crate) struct Error { inner: color_eyre::Report, + debug: Arc, + display: Arc, } impl Error { @@ -21,17 +25,21 @@ impl Error { .map(|e| e.error_code()) .unwrap_or(ErrorCode::UNKNOWN_ERROR) } + + pub(crate) fn is_disconnected(&self) -> bool { + self.kind().map(|e| e.is_disconnected()).unwrap_or(false) + } } impl std::fmt::Debug for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Debug::fmt(&self.inner, f) + f.write_str(&self.debug) } } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.inner, f) + f.write_str(&self.display) } } @@ -46,8 +54,14 @@ where UploadError: From, { fn from(error: T) -> Self { + let inner = Report::from(UploadError::from(error)); + let debug = Arc::from(format!("{inner:?}")); + let display = Arc::from(format!("{inner}")); + Error { - inner: Report::from(UploadError::from(error)), + inner, + debug, + display, } } } @@ -172,6 +186,14 @@ impl UploadError { Self::Timeout(_) => ErrorCode::STREAM_TOO_SLOW, } } + + const fn is_disconnected(&self) -> bool { + match self { + Self::Repo(e) => e.is_disconnected(), + Self::Store(s) => s.is_disconnected(), + _ => false, + } + } } impl From for UploadError { diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..aea858d --- /dev/null +++ b/src/future.rs @@ -0,0 +1,75 @@ +use std::{ + future::Future, + time::{Duration, Instant}, +}; + +pub(crate) type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; + +pub(crate) trait WithTimeout: Future { + fn with_timeout(self, duration: Duration) -> actix_rt::time::Timeout + where + Self: Sized, + { + actix_rt::time::timeout(duration, self) + } +} + +pub(crate) trait WithMetrics: Future { + fn with_metrics(self, name: &'static str) -> MetricsFuture + where + Self: Sized, + { + MetricsFuture { + future: self, + metrics: Metrics { + name, + start: Instant::now(), + complete: false, + }, + } + } +} + +impl WithMetrics for F where F: Future {} +impl WithTimeout for F where F: Future {} + +pin_project_lite::pin_project! { + pub(crate) struct MetricsFuture { + #[pin] + future: F, + + metrics: Metrics, + } +} + +struct Metrics { + name: &'static str, + start: Instant, + complete: bool, +} + +impl Future for MetricsFuture +where + F: Future, +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + + let out = std::task::ready!(this.future.poll(cx)); + + this.metrics.complete = true; + + std::task::Poll::Ready(out) + } +} + +impl Drop for Metrics { + fn drop(&mut self) { + metrics::histogram!(self.name, self.start.elapsed().as_secs_f64(), "complete" => self.complete.to_string()); + } +} diff --git a/src/init_tracing.rs b/src/init_tracing.rs index 62bfe74..5750d8c 100644 --- a/src/init_tracing.rs +++ b/src/init_tracing.rs @@ -8,9 +8,7 @@ use opentelemetry_otlp::WithExportConfig; use tracing::subscriber::set_global_default; use tracing_error::ErrorLayer; use tracing_log::LogTracer; -use tracing_subscriber::{ - fmt::format::FmtSpan, layer::SubscriberExt, registry::LookupSpan, Layer, Registry, -}; +use tracing_subscriber::{layer::SubscriberExt, registry::LookupSpan, Layer, Registry}; pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> { color_eyre::install()?; @@ -19,8 +17,7 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> { opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - let format_layer = - tracing_subscriber::fmt::layer().with_span_events(FmtSpan::NEW | FmtSpan::CLOSE); + let format_layer = tracing_subscriber::fmt::layer(); match tracing.logging.format { LogFormat::Compact => with_format(format_layer.compact(), tracing), diff --git a/src/lib.rs b/src/lib.rs index cbb3835..ad18041 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod exiftool; mod ffmpeg; mod file; mod formats; +mod future; mod generate; mod ingest; mod init_tracing; @@ -53,8 +54,8 @@ use std::{ time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; +use tracing::Instrument; use tracing_actix_web::TracingLogger; -use tracing_futures::Instrument; use self::{ backgrounded::Backgrounded, @@ -1550,6 +1551,7 @@ async fn identifier( }))) } +#[tracing::instrument(skip(repo, store))] async fn healthz( repo: web::Data, store: web::Data, diff --git a/src/queue.rs b/src/queue.rs index a7d3f25..a3cbe15 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -3,14 +3,13 @@ use crate::{ config::Configuration, error::{Error, UploadError}, formats::InputProcessableFormat, + future::LocalBoxFuture, repo::{Alias, DeleteToken, FullRepo, Hash, JobId, UploadId}, serde_str::Serde, store::Store, }; use std::{ - future::Future, path::PathBuf, - pin::Pin, sync::Arc, time::{Duration, Instant}, }; @@ -179,8 +178,6 @@ pub(crate) async fn process_images( .await } -type LocalBoxFuture<'a, T> = Pin + 'a>>; - async fn process_jobs( repo: &Arc, store: &S, @@ -205,6 +202,11 @@ async fn process_jobs( if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("{}", format!("{e:?}")); + + if e.is_disconnected() { + actix_rt::time::sleep(Duration::from_secs(3)).await; + } + continue; } @@ -323,6 +325,11 @@ async fn process_image_jobs( if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("{}", format!("{e:?}")); + + if e.is_disconnected() { + actix_rt::time::sleep(Duration::from_secs(3)).await; + } + continue; } diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index aa31814..ea3ff75 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use crate::{ config::Configuration, error::{Error, UploadError}, - queue::{Cleanup, LocalBoxFuture}, + future::LocalBoxFuture, + queue::Cleanup, repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, store::Store, diff --git a/src/queue/process.rs b/src/queue/process.rs index 13e8b78..14b0fce 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -3,8 +3,9 @@ use crate::{ config::Configuration, error::{Error, UploadError}, formats::InputProcessableFormat, + future::LocalBoxFuture, ingest::Session, - queue::{LocalBoxFuture, Process}, + queue::Process, repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, store::Store, diff --git a/src/repo.rs b/src/repo.rs index dd32c19..e75fb5d 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -8,6 +8,7 @@ use crate::{ config, details::Details, error_code::{ErrorCode, OwnedErrorCode}, + future::LocalBoxFuture, stream::LocalBoxStream, }; use base64::Engine; @@ -85,6 +86,13 @@ impl RepoError { Self::Canceled => ErrorCode::PANIC, } } + + pub(crate) const fn is_disconnected(&self) -> bool { + match self { + Self::PostgresError(e) => e.is_disconnected(), + _ => false, + } + } } #[async_trait::async_trait(?Send)] @@ -564,8 +572,6 @@ impl HashPage { } } -type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; - type PageFuture = LocalBoxFuture<'static, Result>; pub(crate) struct HashStream { diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 659eb6a..51b18a5 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -12,6 +12,7 @@ use std::{ }; use dashmap::DashMap; +use deadpool::managed::Hook; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ @@ -29,6 +30,7 @@ use uuid::Uuid; use crate::{ details::Details, error_code::{ErrorCode, OwnedErrorCode}, + future::{LocalBoxFuture, WithMetrics, WithTimeout}, serde_str::Serde, stream::LocalBoxStream, }; @@ -108,6 +110,9 @@ pub(crate) enum PostgresError { #[error("Error deserializing upload result")] DeserializeUploadResult(#[source] serde_json::Error), + + #[error("Timed out waiting for postgres")] + DbTimeout, } impl PostgresError { @@ -117,11 +122,26 @@ impl PostgresError { | Self::Diesel(_) | Self::SerializeDetails(_) | Self::SerializeUploadResult(_) - | Self::Hex(_) => ErrorCode::POSTGRES_ERROR, + | Self::Hex(_) + | Self::DbTimeout => ErrorCode::POSTGRES_ERROR, Self::DeserializeDetails(_) => ErrorCode::EXTRACT_DETAILS, Self::DeserializeUploadResult(_) => ErrorCode::EXTRACT_UPLOAD_RESULT, } } + + pub(super) const fn is_disconnected(&self) -> bool { + match self { + Self::Pool( + PoolError::Closed + | PoolError::Backend(diesel_async::pooled_connection::PoolError::ConnectionError(_)), + ) + | Self::Diesel(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ClosedConnection, + _, + )) => true, + _ => false, + } + } } impl PostgresRepo { @@ -140,6 +160,10 @@ impl PostgresRepo { handle.abort(); let _ = handle.await; + let parallelism = std::thread::available_parallelism() + .map(|u| u.into()) + .unwrap_or(1_usize); + let (tx, rx) = flume::bounded(10); let mut config = ManagerConfig::default(); @@ -149,7 +173,21 @@ impl PostgresRepo { postgres_url, config, ); + let pool = Pool::builder(mgr) + .runtime(deadpool::Runtime::Tokio1) + .wait_timeout(Some(Duration::from_secs(1))) + .create_timeout(Some(Duration::from_secs(2))) + .recycle_timeout(Some(Duration::from_secs(2))) + .post_create(Hook::sync_fn(|_, _| { + metrics::increment_counter!("pict-rs.postgres.pool.connection.create"); + Ok(()) + })) + .post_recycle(Hook::sync_fn(|_, _| { + metrics::increment_counter!("pict-rs.postgres.pool.connection.recycle"); + Ok(()) + })) + .max_size(parallelism * 8) .build() .map_err(ConnectPostgresError::BuildPool)?; @@ -160,7 +198,7 @@ impl PostgresRepo { upload_notifications: DashMap::new(), }); - let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone())); + let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone(), parallelism * 8)); let notifications = Arc::new(handle); @@ -195,7 +233,7 @@ impl GetConnectionMetricsGuard { impl Drop for GetConnectionMetricsGuard { fn drop(&mut self) { - metrics::increment_counter!("pict-rs.postgres.pool.get.end", "completed" => (!self.armed).to_string()); + metrics::increment_counter!("pict-rs.postgres.pool.get", "completed" => (!self.armed).to_string()); metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); } } @@ -204,9 +242,12 @@ impl Inner { #[tracing::instrument(level = "TRACE", skip(self))] async fn get_connection(&self) -> Result, PostgresError> { let guard = GetConnectionMetricsGuard::guard(); - let res = self.pool.get().await.map_err(PostgresError::Pool); + + let obj = self.pool.get().await.map_err(PostgresError::Pool)?; + guard.disarm(); - res + + Ok(obj) } fn interest(self: &Arc, upload_id: UploadId) -> UploadInterest { @@ -301,14 +342,14 @@ type BoxFuture<'a, T> = std::pin::Pin + type ConfigFn = Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; -async fn delegate_notifications(receiver: flume::Receiver, inner: Arc) { - let parallelism = std::thread::available_parallelism() - .map(|u| u.into()) - .unwrap_or(1_usize); - +async fn delegate_notifications( + receiver: flume::Receiver, + inner: Arc, + capacity: usize, +) { let mut job_notifier_state = JobNotifierState { inner: &inner, - capacity: parallelism * 8, + capacity, jobs: BTreeSet::new(), jobs_ordered: VecDeque::new(), }; @@ -409,7 +450,10 @@ impl HashRepo for PostgresRepo { let count = hashes .count() .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.count") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(count.try_into().expect("non-negative count")) @@ -424,8 +468,11 @@ impl HashRepo for PostgresRepo { let timestamp = hashes .select(created_at) .filter(hash.eq(&input_hash)) - .first(&mut conn) + .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.bound") .await + .map_err(|_| PostgresError::DbTimeout)? .map(time::PrimitiveDateTime::assume_utc) .optional() .map_err(PostgresError::Diesel)?; @@ -452,8 +499,11 @@ impl HashRepo for PostgresRepo { .select((created_at, hash)) .filter(created_at.lt(timestamp)) .order(created_at.desc()) - .first::<(time::PrimitiveDateTime, Hash)>(&mut conn) + .get_result::<(time::PrimitiveDateTime, Hash)>(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.ordered-hash") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(|tup| OrderedHash { @@ -488,8 +538,11 @@ impl HashRepo for PostgresRepo { .order(created_at.desc()) .then_order_by(hash.desc()) .limit(limit as i64 + 1) - .load::(&mut conn) + .get_results::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.next-hashes") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; let prev = hashes @@ -500,7 +553,10 @@ impl HashRepo for PostgresRepo { .then_order_by(hash) .limit(limit as i64) .get_results::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.prev-hashes") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)? .pop(); @@ -511,8 +567,11 @@ impl HashRepo for PostgresRepo { .order(created_at.desc()) .then_order_by(hash.desc()) .limit(limit as i64 + 1) - .load::(&mut conn) + .get_results::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.first-hashes") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; (page, None) @@ -548,7 +607,10 @@ impl HashRepo for PostgresRepo { created_at.eq(×tamp), )) .execute(&mut conn) - .await; + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.create-hash") + .await + .map_err(|_| PostgresError::DbTimeout)?; match res { Ok(_) => Ok(Ok(())), @@ -574,7 +636,10 @@ impl HashRepo for PostgresRepo { .filter(hash.eq(&input_hash)) .set(identifier.eq(input_identifier.as_ref())) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.update-identifier") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -590,7 +655,10 @@ impl HashRepo for PostgresRepo { .select(identifier) .filter(hash.eq(&input_hash)) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.identifier") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -615,7 +683,10 @@ impl HashRepo for PostgresRepo { identifier.eq(input_identifier.as_ref()), )) .execute(&mut conn) - .await; + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variants.relate-variant-identifier") + .await + .map_err(|_| PostgresError::DbTimeout)?; match res { Ok(_) => Ok(Ok(())), @@ -642,7 +713,10 @@ impl HashRepo for PostgresRepo { .filter(hash.eq(&input_hash)) .filter(variant.eq(&input_variant)) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variants.identifier") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(Arc::from); @@ -660,7 +734,10 @@ impl HashRepo for PostgresRepo { .select((variant, identifier)) .filter(hash.eq(&input_hash)) .get_results::<(String, String)>(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variants.for-hash") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)? .into_iter() .map(|(s, i)| (s, Arc::from(i))) @@ -683,7 +760,10 @@ impl HashRepo for PostgresRepo { .filter(hash.eq(&input_hash)) .filter(variant.eq(&input_variant)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variants.remove") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -703,7 +783,10 @@ impl HashRepo for PostgresRepo { .filter(hash.eq(&input_hash)) .set(motion_identifier.eq(input_identifier.as_ref())) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.relate-motion-identifier") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -719,7 +802,10 @@ impl HashRepo for PostgresRepo { .select(motion_identifier) .filter(hash.eq(&input_hash)) .get_result::>(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.hashes.motion-identifier") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .flatten() @@ -737,11 +823,13 @@ impl HashRepo for PostgresRepo { diesel::delete(schema::variants::dsl::variants) .filter(schema::variants::dsl::hash.eq(&input_hash)) .execute(conn) + .with_metrics("pict-rs.postgres.variants.cleanup") .await?; diesel::delete(schema::hashes::dsl::hashes) .filter(schema::hashes::dsl::hash.eq(&input_hash)) .execute(conn) + .with_metrics("pict-rs.postgres.hashes.cleanup") .await }) }) @@ -772,7 +860,10 @@ impl AliasRepo for PostgresRepo { token.eq(delete_token), )) .execute(&mut conn) - .await; + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.aliases.create") + .await + .map_err(|_| PostgresError::DbTimeout)?; match res { Ok(_) => Ok(Ok(())), @@ -794,7 +885,10 @@ impl AliasRepo for PostgresRepo { .select(token) .filter(alias.eq(input_alias)) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.aliases.delete-token") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -811,7 +905,10 @@ impl AliasRepo for PostgresRepo { .select(hash) .filter(alias.eq(input_alias)) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.aliases.hash") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -828,7 +925,10 @@ impl AliasRepo for PostgresRepo { .select(alias) .filter(hash.eq(&input_hash)) .get_results(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.aliases.for-hash") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(vec) @@ -843,7 +943,10 @@ impl AliasRepo for PostgresRepo { diesel::delete(aliases) .filter(alias.eq(input_alias)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.aliases.cleanup") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -866,7 +969,10 @@ impl SettingsRepo for PostgresRepo { .do_update() .set(value.eq(&input_value)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.settings.set") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -882,7 +988,10 @@ impl SettingsRepo for PostgresRepo { .select(value) .filter(key.eq(input_key)) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.settings.get") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(hex::decode) @@ -902,7 +1011,10 @@ impl SettingsRepo for PostgresRepo { diesel::delete(settings) .filter(key.eq(input_key)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.settings.remove") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -927,7 +1039,10 @@ impl DetailsRepo for PostgresRepo { diesel::insert_into(details) .values((identifier.eq(input_identifier.as_ref()), json.eq(&value))) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.details.relate") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -943,7 +1058,10 @@ impl DetailsRepo for PostgresRepo { .select(json) .filter(identifier.eq(input_identifier.as_ref())) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.details.get") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(serde_json::from_value) @@ -963,7 +1081,10 @@ impl DetailsRepo for PostgresRepo { diesel::delete(details) .filter(identifier.eq(input_identifier.as_ref())) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.details.cleanup") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -988,7 +1109,10 @@ impl QueueRepo for PostgresRepo { .values((queue.eq(queue_name), job.eq(job_json))) .returning(id) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.push") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; guard.disarm(); @@ -1018,7 +1142,10 @@ impl QueueRepo for PostgresRepo { diesel::sql_query("LISTEN queue_status_channel;") .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.listen") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; let timestamp = to_primitive(time::OffsetDateTime::now_utc()); @@ -1030,7 +1157,10 @@ impl QueueRepo for PostgresRepo { status.eq(JobStatus::New), )) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.requeue") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; if count > 0 { @@ -1045,7 +1175,10 @@ impl QueueRepo for PostgresRepo { .order(queue_time) .limit(1) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.select") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -1063,7 +1196,10 @@ impl QueueRepo for PostgresRepo { )) .returning((id, job)) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.claim") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -1110,7 +1246,10 @@ impl QueueRepo for PostgresRepo { ) .set(heartbeat.eq(timestamp)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.heartbeat") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1134,7 +1273,10 @@ impl QueueRepo for PostgresRepo { .and(worker.eq(worker_id)), ) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.queue.complete") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1152,7 +1294,10 @@ impl StoreMigrationRepo for PostgresRepo { let count = store_migrations .count() .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.store-migration.count") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(count > 0) @@ -1176,7 +1321,10 @@ impl StoreMigrationRepo for PostgresRepo { .on_conflict((old_identifier, new_identifier)) .do_nothing() .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.store-migration.mark-migrated") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1192,7 +1340,10 @@ impl StoreMigrationRepo for PostgresRepo { store_migrations.filter(old_identifier.eq(input_old_identifier.as_ref())), )) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.store-migration.is-migrated") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(b) @@ -1206,7 +1357,10 @@ impl StoreMigrationRepo for PostgresRepo { diesel::delete(store_migrations) .execute(&mut conn) + .with_timeout(Duration::from_secs(20)) + .with_metrics("pict-rs.postgres.store-migration.clear") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1224,7 +1378,10 @@ impl ProxyRepo for PostgresRepo { diesel::insert_into(proxies) .values((url.eq(input_url.as_str()), alias.eq(&input_alias))) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.proxy.relate-url") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1240,7 +1397,10 @@ impl ProxyRepo for PostgresRepo { .select(alias) .filter(url.eq(input_url.as_str())) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.proxy.related") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -1256,7 +1416,10 @@ impl ProxyRepo for PostgresRepo { diesel::delete(proxies) .filter(alias.eq(&input_alias)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.proxy.remove-relation") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1281,7 +1444,10 @@ impl AliasAccessRepo for PostgresRepo { .filter(alias.eq(&input_alias)) .set(accessed.eq(timestamp)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.alias-access.set-accessed") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1300,7 +1466,10 @@ impl AliasAccessRepo for PostgresRepo { .select(accessed) .filter(alias.eq(&input_alias)) .get_result::(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.alias-access.accessed-at") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(time::PrimitiveDateTime::assume_utc); @@ -1330,7 +1499,10 @@ impl AliasAccessRepo for PostgresRepo { .order(accessed.desc()) .limit(100) .get_results(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.alias-access.older-aliases") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(vec) @@ -1364,7 +1536,10 @@ impl VariantAccessRepo for PostgresRepo { .filter(hash.eq(&input_hash).and(variant.eq(&input_variant))) .set(accessed.eq(timestamp)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variant-access.set-accessed") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1384,7 +1559,10 @@ impl VariantAccessRepo for PostgresRepo { .select(accessed) .filter(hash.eq(&input_hash).and(variant.eq(&input_variant))) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variant-access.accessed-at") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? .map(time::PrimitiveDateTime::assume_utc); @@ -1414,7 +1592,10 @@ impl VariantAccessRepo for PostgresRepo { .order(accessed.desc()) .limit(100) .get_results(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.variant-access.older-variants") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(vec) @@ -1477,7 +1658,10 @@ impl UploadRepo for PostgresRepo { .default_values() .returning(id) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.uploads.create") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(UploadId { id: uuid }) @@ -1497,14 +1681,20 @@ impl UploadRepo for PostgresRepo { diesel::sql_query("LISTEN upload_completion_channel;") .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.uploads.listen") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; let nested_opt = uploads .select(result) .filter(id.eq(upload_id.id)) .get_result(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.uploads.wait") .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)?; @@ -1543,7 +1733,10 @@ impl UploadRepo for PostgresRepo { diesel::delete(uploads) .filter(id.eq(upload_id.id)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.uploads.claim") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1567,7 +1760,10 @@ impl UploadRepo for PostgresRepo { .filter(id.eq(upload_id.id)) .set(result.eq(upload_result)) .execute(&mut conn) + .with_timeout(Duration::from_secs(5)) + .with_metrics("pict-rs.postgres.uploads.complete") .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; Ok(()) @@ -1587,8 +1783,6 @@ impl FullRepo for PostgresRepo { } } -type LocalBoxFuture<'a, T> = std::pin::Pin + 'a>>; - type NextFuture = LocalBoxFuture<'static, Result, RepoError>>; struct PageStream { diff --git a/src/store.rs b/src/store.rs index 3293237..b6f4d1e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -39,9 +39,17 @@ impl StoreError { Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND, } } + pub(crate) const fn is_not_found(&self) -> bool { matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) } + + pub(crate) const fn is_disconnected(&self) -> bool { + match self { + Self::Repo(e) => e.is_disconnected(), + _ => false, + } + } } impl From for StoreError {