diff --git a/Cargo.lock b/Cargo.lock index 6ec3678..470c0f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,6 +444,19 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bb8" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df7c2093d15d6a1d33b1f972e1c5ea3177748742b97a5f392aa83a65262c6780" +dependencies = [ + "async-trait", + "futures-channel", + "futures-util", + "parking_lot 0.12.1", + "tokio", +] + [[package]] name = "bcder" version = "0.7.4" @@ -754,28 +767,6 @@ dependencies = [ "parking_lot_core 0.9.9", ] -[[package]] -name = "deadpool" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" -dependencies = [ - "async-trait", - "deadpool-runtime", - "num_cpus", - "retain_mut", - "tokio", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" -dependencies = [ - "tokio", -] - [[package]] name = "der" version = "0.7.8" @@ -831,7 +822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be" dependencies = [ "async-trait", - "deadpool", + "bb8", "diesel", "futures-util", "scoped-futures", @@ -1049,6 +1040,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1841,13 +1833,13 @@ dependencies = [ "async-trait", "barrel", "base64", + "bb8", "blurhash-update", "clap", "color-eyre", "config", "console-subscriber", "dashmap", - "deadpool", "diesel", "diesel-async", "diesel-derive-enum", @@ -2313,12 +2305,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "retain_mut" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" - [[package]] name = "ring" version = "0.17.8" diff --git a/Cargo.toml b/Cargo.toml index 6826620..3d9fadb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,15 +22,15 @@ actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0 async-trait = "0.1.51" barrel = { version = "0.7.0", features = ["pg"] } base64 = "0.21.0" +bb8 = "0.8.3" blurhash-update = "0.1.0" clap = { version = "4.0.2", features = ["derive"] } color-eyre = "0.6" config = { version = "0.14.0", default-features = false, features = ["json", "ron", "toml", "yaml"] } console-subscriber = "0.2" dashmap = "5.1.0" -deadpool = { version = "0.9.5", features = ["rt_tokio_1"] } diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } -diesel-async = { version = "0.4.1", features = ["postgres", "deadpool"] } +diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } flume = "0.11.0" futures-core = "0.3" diff --git a/src/queue.rs b/src/queue.rs index 0556348..42da976 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -346,7 +346,7 @@ where loop { tracing::trace!("job_loop: looping"); - crate::sync::cooperate().await; + crate::sync::cooperate().with_poll_timer("cooperate").await; async { let (job_id, job) = state @@ -370,6 +370,7 @@ where state .repo .complete_job(queue, worker_id, job_id, job_result(&res)) + .with_poll_timer("cleanup-job-complete") .await?; res?; diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index a7dce4a..b66ab99 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -12,12 +12,13 @@ use std::{ time::{Duration, Instant}, }; +use bb8::CustomizeConnection; use dashmap::DashMap; use diesel::prelude::*; use diesel_async::{ pooled_connection::{ - deadpool::{BuildError, Hook, Object, Pool, PoolError}, - AsyncDieselConnectionManager, ManagerConfig, + bb8::{Pool, PooledConnection, RunError}, + AsyncDieselConnectionManager, ManagerConfig, PoolError, }, AsyncConnection, AsyncPgConnection, RunQueryDsl, }; @@ -32,7 +33,7 @@ use uuid::Uuid; use crate::{ details::Details, error_code::{ErrorCode, OwnedErrorCode}, - future::{WithMetrics, WithTimeout}, + future::{WithMetrics, WithPollTimer, WithTimeout}, serde_str::Serde, stream::LocalBoxStream, sync::DropHandle, @@ -92,13 +93,13 @@ pub(crate) enum ConnectPostgresError { Migration(#[source] Box), #[error("Failed to build postgres connection pool")] - BuildPool(#[source] BuildError), + BuildPool(#[source] PoolError), } #[derive(Debug, thiserror::Error)] pub(crate) enum PostgresError { #[error("Error in db pool")] - Pool(#[source] PoolError), + Pool(#[source] RunError), #[error("Error in database")] Diesel(#[source] diesel::result::Error), @@ -154,15 +155,11 @@ impl PostgresError { pub(super) const fn is_disconnected(&self) -> bool { matches!( self, - Self::Pool( - PoolError::Closed - | PoolError::Backend( - diesel_async::pooled_connection::PoolError::ConnectionError(_) - ), - ) | Self::Diesel(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::ClosedConnection, - _, - )) + Self::Pool(RunError::User(PoolError::ConnectionError(_))) + | Self::Diesel(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ClosedConnection, + _, + )) ) } } @@ -233,11 +230,37 @@ async fn connect_for_migrations( Ok(tup) } -fn build_pool( +#[derive(Debug)] +struct OnConnect; + +impl CustomizeConnection for OnConnect +where + C: Send + 'static, + E: 'static, +{ + fn on_acquire<'life0, 'life1, 'async_trait>( + &'life0 self, + _connection: &'life1 mut C, + ) -> core::pin::Pin< + Box> + core::marker::Send + 'async_trait>, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + Self: 'async_trait, + { + Box::pin(async { + metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_CREATE).increment(1); + Ok(()) + }) + } +} + +async fn build_pool( postgres_url: &Url, tx: flume::Sender, connector: Option, - max_size: usize, + max_size: u32, ) -> Result, ConnectPostgresError> { let mut config = ManagerConfig::default(); config.custom_setup = build_handler(tx, connector); @@ -247,21 +270,12 @@ fn build_pool( config, ); - let pool = Pool::builder(mgr) - .runtime(deadpool::Runtime::Tokio1) - .wait_timeout(Some(Duration::from_secs(10))) - .create_timeout(Some(Duration::from_secs(2))) - .recycle_timeout(Some(Duration::from_secs(2))) - .post_create(Hook::sync_fn(|_, _| { - metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_CREATE).increment(1); - Ok(()) - })) - .post_recycle(Hook::sync_fn(|_, _| { - metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_RECYCLE).increment(1); - Ok(()) - })) + let pool = Pool::builder() .max_size(max_size) - .build() + .connection_timeout(Duration::from_secs(10)) + .connection_customizer(Box::new(OnConnect)) + .build(mgr) + .await .map_err(ConnectPostgresError::BuildPool)?; Ok(pool) @@ -300,20 +314,26 @@ impl PostgresRepo { let (tx, rx) = flume::bounded(10); + let pool = build_pool( + &postgres_url, + tx.clone(), + connector.clone(), + parallelism as u32 * 8, + ) + .await?; + + let notifier_pool = + build_pool(&postgres_url, tx, connector, parallelism.min(4) as u32).await?; + let inner = Arc::new(Inner { health_count: AtomicU64::new(0), - pool: build_pool( - &postgres_url, - tx.clone(), - connector.clone(), - parallelism * 8, - )?, - notifier_pool: build_pool(&postgres_url, tx, connector, parallelism.min(4))?, + pool, + notifier_pool, queue_notifications: DashMap::new(), upload_notifications: DashMap::new(), }); - let handle = crate::sync::abort_on_drop(crate::sync::spawn( + let handle = crate::sync::abort_on_drop(crate::sync::spawn_sendable( "postgres-delegate-notifications", delegate_notifications(rx, inner.clone(), parallelism * 8), )); @@ -326,12 +346,22 @@ impl PostgresRepo { }) } - async fn get_connection(&self) -> Result, PostgresError> { - self.inner.get_connection().await + async fn get_connection( + &self, + ) -> Result, PostgresError> { + self.inner + .get_connection() + .with_poll_timer("postgres-get-connection") + .await } - async fn get_notifier_connection(&self) -> Result, PostgresError> { - self.inner.get_notifier_connection().await + async fn get_notifier_connection( + &self, + ) -> Result, PostgresError> { + self.inner + .get_notifier_connection() + .with_poll_timer("postgres-get-notifier-connection") + .await } } @@ -363,7 +393,9 @@ impl Drop for GetConnectionMetricsGuard { impl Inner { #[tracing::instrument(level = "trace", skip(self))] - async fn get_connection(&self) -> Result, PostgresError> { + async fn get_connection( + &self, + ) -> Result, PostgresError> { let guard = GetConnectionMetricsGuard::guard(); let obj = self.pool.get().await.map_err(PostgresError::Pool)?; @@ -374,7 +406,9 @@ impl Inner { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_notifier_connection(&self) -> Result, PostgresError> { + async fn get_notifier_connection( + &self, + ) -> Result, PostgresError> { let guard = GetConnectionMetricsGuard::guard(); let obj = self @@ -566,10 +600,13 @@ fn spawn_db_notification_task( sender: flume::Sender, mut conn: Connection, ) where - S: tokio_postgres::tls::TlsStream + Unpin + 'static, + S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static, { - crate::sync::spawn("postgres-notifications", async move { - while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { + crate::sync::spawn_sendable("postgres-notifications", async move { + while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)) + .with_poll_timer("poll-message") + .await + { tracing::trace!("db_notification_task: looping"); match res { @@ -1384,6 +1421,7 @@ impl QueueRepo for PostgresRepo { .execute(&mut notifier_conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_LISTEN) .with_timeout(Duration::from_secs(5)) + .with_poll_timer("pop-listen") .await .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; @@ -1404,6 +1442,7 @@ impl QueueRepo for PostgresRepo { .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_REQUEUE) .with_timeout(Duration::from_secs(5)) + .with_poll_timer("pop-reset-jobs") .await .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; @@ -1440,6 +1479,7 @@ impl QueueRepo for PostgresRepo { .get_result(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLAIM) .with_timeout(Duration::from_secs(5)) + .with_poll_timer("pop-claim-job") .await .map_err(|_| PostgresError::DbTimeout)? .optional() @@ -1457,6 +1497,7 @@ impl QueueRepo for PostgresRepo { match notifier .notified() .with_timeout(Duration::from_secs(5)) + .with_poll_timer("pop-wait-notify") .await { Ok(()) => tracing::debug!("Notified"), diff --git a/src/sync.rs b/src/sync.rs index d94d460..590c153 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -108,6 +108,29 @@ where handle } +#[track_caller] +pub(crate) fn spawn_sendable(name: &'static str, future: F) -> tokio::task::JoinHandle +where + F: std::future::Future + Send + 'static, + F::Output: Send + 'static, +{ + let future = future.with_poll_timer(name); + + let span = tracing::trace_span!(parent: None, "spawn task"); + let guard = span.enter(); + + #[cfg(tokio_unstable)] + let handle = tokio::task::Builder::new() + .name(name) + .spawn(future) + .expect("Failed to spawn"); + #[cfg(not(tokio_unstable))] + let handle = tokio::task::spawn_local(future); + + drop(guard); + handle +} + #[track_caller] pub(crate) fn spawn_blocking(name: &str, function: F) -> tokio::task::JoinHandle where