Switch from deadpool to bb8

This commit is contained in:
asonix 2024-03-10 15:59:08 -05:00
parent 4976fcb2eb
commit dff588aafd
5 changed files with 131 additions and 80 deletions

46
Cargo.lock generated
View file

@ -444,6 +444,19 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bb8"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df7c2093d15d6a1d33b1f972e1c5ea3177748742b97a5f392aa83a65262c6780"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot 0.12.1",
"tokio",
]
[[package]] [[package]]
name = "bcder" name = "bcder"
version = "0.7.4" version = "0.7.4"
@ -754,28 +767,6 @@ dependencies = [
"parking_lot_core 0.9.9", "parking_lot_core 0.9.9",
] ]
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.7.8" version = "0.7.8"
@ -831,7 +822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be" checksum = "acada1517534c92d3f382217b485db8a8638f111b0e3f2a2a8e26165050f77be"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"deadpool", "bb8",
"diesel", "diesel",
"futures-util", "futures-util",
"scoped-futures", "scoped-futures",
@ -1049,6 +1040,7 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@ -1841,13 +1833,13 @@ dependencies = [
"async-trait", "async-trait",
"barrel", "barrel",
"base64", "base64",
"bb8",
"blurhash-update", "blurhash-update",
"clap", "clap",
"color-eyre", "color-eyre",
"config", "config",
"console-subscriber", "console-subscriber",
"dashmap", "dashmap",
"deadpool",
"diesel", "diesel",
"diesel-async", "diesel-async",
"diesel-derive-enum", "diesel-derive-enum",
@ -2313,12 +2305,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.8" version = "0.17.8"

View file

@ -22,15 +22,15 @@ actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0
async-trait = "0.1.51" async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] } barrel = { version = "0.7.0", features = ["pg"] }
base64 = "0.21.0" base64 = "0.21.0"
bb8 = "0.8.3"
blurhash-update = "0.1.0" blurhash-update = "0.1.0"
clap = { version = "4.0.2", features = ["derive"] } clap = { version = "4.0.2", features = ["derive"] }
color-eyre = "0.6" color-eyre = "0.6"
config = { version = "0.14.0", default-features = false, features = ["json", "ron", "toml", "yaml"] } config = { version = "0.14.0", default-features = false, features = ["json", "ron", "toml", "yaml"] }
console-subscriber = "0.2" console-subscriber = "0.2"
dashmap = "5.1.0" dashmap = "5.1.0"
deadpool = { version = "0.9.5", features = ["rt_tokio_1"] }
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] } diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.4.1", features = ["postgres", "deadpool"] } diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0" flume = "0.11.0"
futures-core = "0.3" futures-core = "0.3"

View file

@ -346,7 +346,7 @@ where
loop { loop {
tracing::trace!("job_loop: looping"); tracing::trace!("job_loop: looping");
crate::sync::cooperate().await; crate::sync::cooperate().with_poll_timer("cooperate").await;
async { async {
let (job_id, job) = state let (job_id, job) = state
@ -370,6 +370,7 @@ where
state state
.repo .repo
.complete_job(queue, worker_id, job_id, job_result(&res)) .complete_job(queue, worker_id, job_id, job_result(&res))
.with_poll_timer("cleanup-job-complete")
.await?; .await?;
res?; res?;

View file

@ -12,12 +12,13 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use bb8::CustomizeConnection;
use dashmap::DashMap; use dashmap::DashMap;
use diesel::prelude::*; use diesel::prelude::*;
use diesel_async::{ use diesel_async::{
pooled_connection::{ pooled_connection::{
deadpool::{BuildError, Hook, Object, Pool, PoolError}, bb8::{Pool, PooledConnection, RunError},
AsyncDieselConnectionManager, ManagerConfig, AsyncDieselConnectionManager, ManagerConfig, PoolError,
}, },
AsyncConnection, AsyncPgConnection, RunQueryDsl, AsyncConnection, AsyncPgConnection, RunQueryDsl,
}; };
@ -32,7 +33,7 @@ use uuid::Uuid;
use crate::{ use crate::{
details::Details, details::Details,
error_code::{ErrorCode, OwnedErrorCode}, error_code::{ErrorCode, OwnedErrorCode},
future::{WithMetrics, WithTimeout}, future::{WithMetrics, WithPollTimer, WithTimeout},
serde_str::Serde, serde_str::Serde,
stream::LocalBoxStream, stream::LocalBoxStream,
sync::DropHandle, sync::DropHandle,
@ -92,13 +93,13 @@ pub(crate) enum ConnectPostgresError {
Migration(#[source] Box<refinery::Error>), Migration(#[source] Box<refinery::Error>),
#[error("Failed to build postgres connection pool")] #[error("Failed to build postgres connection pool")]
BuildPool(#[source] BuildError), BuildPool(#[source] PoolError),
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum PostgresError { pub(crate) enum PostgresError {
#[error("Error in db pool")] #[error("Error in db pool")]
Pool(#[source] PoolError), Pool(#[source] RunError),
#[error("Error in database")] #[error("Error in database")]
Diesel(#[source] diesel::result::Error), Diesel(#[source] diesel::result::Error),
@ -154,15 +155,11 @@ impl PostgresError {
pub(super) const fn is_disconnected(&self) -> bool { pub(super) const fn is_disconnected(&self) -> bool {
matches!( matches!(
self, self,
Self::Pool( Self::Pool(RunError::User(PoolError::ConnectionError(_)))
PoolError::Closed | Self::Diesel(diesel::result::Error::DatabaseError(
| PoolError::Backend( diesel::result::DatabaseErrorKind::ClosedConnection,
diesel_async::pooled_connection::PoolError::ConnectionError(_) _,
), ))
) | Self::Diesel(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::ClosedConnection,
_,
))
) )
} }
} }
@ -233,11 +230,37 @@ async fn connect_for_migrations(
Ok(tup) Ok(tup)
} }
fn build_pool( #[derive(Debug)]
struct OnConnect;
impl<C, E> CustomizeConnection<C, E> for OnConnect
where
C: Send + 'static,
E: 'static,
{
fn on_acquire<'life0, 'life1, 'async_trait>(
&'life0 self,
_connection: &'life1 mut C,
) -> core::pin::Pin<
Box<dyn core::future::Future<Output = Result<(), E>> + core::marker::Send + 'async_trait>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
Box::pin(async {
metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_CREATE).increment(1);
Ok(())
})
}
}
async fn build_pool(
postgres_url: &Url, postgres_url: &Url,
tx: flume::Sender<Notification>, tx: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>, connector: Option<MakeRustlsConnect>,
max_size: usize, max_size: u32,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> { ) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
let mut config = ManagerConfig::default(); let mut config = ManagerConfig::default();
config.custom_setup = build_handler(tx, connector); config.custom_setup = build_handler(tx, connector);
@ -247,21 +270,12 @@ fn build_pool(
config, config,
); );
let pool = Pool::builder(mgr) let pool = Pool::builder()
.runtime(deadpool::Runtime::Tokio1)
.wait_timeout(Some(Duration::from_secs(10)))
.create_timeout(Some(Duration::from_secs(2)))
.recycle_timeout(Some(Duration::from_secs(2)))
.post_create(Hook::sync_fn(|_, _| {
metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_CREATE).increment(1);
Ok(())
}))
.post_recycle(Hook::sync_fn(|_, _| {
metrics::counter!(crate::init_metrics::POSTGRES_POOL_CONNECTION_RECYCLE).increment(1);
Ok(())
}))
.max_size(max_size) .max_size(max_size)
.build() .connection_timeout(Duration::from_secs(10))
.connection_customizer(Box::new(OnConnect))
.build(mgr)
.await
.map_err(ConnectPostgresError::BuildPool)?; .map_err(ConnectPostgresError::BuildPool)?;
Ok(pool) Ok(pool)
@ -300,20 +314,26 @@ impl PostgresRepo {
let (tx, rx) = flume::bounded(10); let (tx, rx) = flume::bounded(10);
let pool = build_pool(
&postgres_url,
tx.clone(),
connector.clone(),
parallelism as u32 * 8,
)
.await?;
let notifier_pool =
build_pool(&postgres_url, tx, connector, parallelism.min(4) as u32).await?;
let inner = Arc::new(Inner { let inner = Arc::new(Inner {
health_count: AtomicU64::new(0), health_count: AtomicU64::new(0),
pool: build_pool( pool,
&postgres_url, notifier_pool,
tx.clone(),
connector.clone(),
parallelism * 8,
)?,
notifier_pool: build_pool(&postgres_url, tx, connector, parallelism.min(4))?,
queue_notifications: DashMap::new(), queue_notifications: DashMap::new(),
upload_notifications: DashMap::new(), upload_notifications: DashMap::new(),
}); });
let handle = crate::sync::abort_on_drop(crate::sync::spawn( let handle = crate::sync::abort_on_drop(crate::sync::spawn_sendable(
"postgres-delegate-notifications", "postgres-delegate-notifications",
delegate_notifications(rx, inner.clone(), parallelism * 8), delegate_notifications(rx, inner.clone(), parallelism * 8),
)); ));
@ -326,12 +346,22 @@ impl PostgresRepo {
}) })
} }
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> { async fn get_connection(
self.inner.get_connection().await &self,
) -> Result<PooledConnection<'_, AsyncPgConnection>, PostgresError> {
self.inner
.get_connection()
.with_poll_timer("postgres-get-connection")
.await
} }
async fn get_notifier_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> { async fn get_notifier_connection(
self.inner.get_notifier_connection().await &self,
) -> Result<PooledConnection<'_, AsyncPgConnection>, PostgresError> {
self.inner
.get_notifier_connection()
.with_poll_timer("postgres-get-notifier-connection")
.await
} }
} }
@ -363,7 +393,9 @@ impl Drop for GetConnectionMetricsGuard {
impl Inner { impl Inner {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> { async fn get_connection(
&self,
) -> Result<PooledConnection<'_, AsyncPgConnection>, PostgresError> {
let guard = GetConnectionMetricsGuard::guard(); let guard = GetConnectionMetricsGuard::guard();
let obj = self.pool.get().await.map_err(PostgresError::Pool)?; let obj = self.pool.get().await.map_err(PostgresError::Pool)?;
@ -374,7 +406,9 @@ impl Inner {
} }
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
async fn get_notifier_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> { async fn get_notifier_connection(
&self,
) -> Result<PooledConnection<'_, AsyncPgConnection>, PostgresError> {
let guard = GetConnectionMetricsGuard::guard(); let guard = GetConnectionMetricsGuard::guard();
let obj = self let obj = self
@ -566,10 +600,13 @@ fn spawn_db_notification_task<S>(
sender: flume::Sender<Notification>, sender: flume::Sender<Notification>,
mut conn: Connection<Socket, S>, mut conn: Connection<Socket, S>,
) where ) where
S: tokio_postgres::tls::TlsStream + Unpin + 'static, S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
{ {
crate::sync::spawn("postgres-notifications", async move { crate::sync::spawn_sendable("postgres-notifications", async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx))
.with_poll_timer("poll-message")
.await
{
tracing::trace!("db_notification_task: looping"); tracing::trace!("db_notification_task: looping");
match res { match res {
@ -1384,6 +1421,7 @@ impl QueueRepo for PostgresRepo {
.execute(&mut notifier_conn) .execute(&mut notifier_conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_LISTEN) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_LISTEN)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.with_poll_timer("pop-listen")
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
@ -1404,6 +1442,7 @@ impl QueueRepo for PostgresRepo {
.execute(&mut conn) .execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_REQUEUE) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_REQUEUE)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.with_poll_timer("pop-reset-jobs")
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
@ -1440,6 +1479,7 @@ impl QueueRepo for PostgresRepo {
.get_result(&mut conn) .get_result(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLAIM) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLAIM)
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.with_poll_timer("pop-claim-job")
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.optional() .optional()
@ -1457,6 +1497,7 @@ impl QueueRepo for PostgresRepo {
match notifier match notifier
.notified() .notified()
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.with_poll_timer("pop-wait-notify")
.await .await
{ {
Ok(()) => tracing::debug!("Notified"), Ok(()) => tracing::debug!("Notified"),

View file

@ -108,6 +108,29 @@ where
handle handle
} }
#[track_caller]
pub(crate) fn spawn_sendable<F>(name: &'static str, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let future = future.with_poll_timer(name);
let span = tracing::trace_span!(parent: None, "spawn task");
let guard = span.enter();
#[cfg(tokio_unstable)]
let handle = tokio::task::Builder::new()
.name(name)
.spawn(future)
.expect("Failed to spawn");
#[cfg(not(tokio_unstable))]
let handle = tokio::task::spawn_local(future);
drop(guard);
handle
}
#[track_caller] #[track_caller]
pub(crate) fn spawn_blocking<F, Out>(name: &str, function: F) -> tokio::task::JoinHandle<Out> pub(crate) fn spawn_blocking<F, Out>(name: &str, function: F) -> tokio::task::JoinHandle<Out>
where where