diff --git a/jobs-postgres/src/lib.rs b/jobs-postgres/src/lib.rs index 1cc13c0..b6c0d41 100644 --- a/jobs-postgres/src/lib.rs +++ b/jobs-postgres/src/lib.rs @@ -56,16 +56,38 @@ struct DropHandle { handle: JoinHandle, } -fn spawn(name: &str, future: F) -> DropHandle +fn spawn( + name: &str, + future: F, +) -> std::io::Result> where F::Output: Send, { - DropHandle { - handle: tokio::task::Builder::new() - .name(name) - .spawn(future) - .expect("Spawned task"), - } + Ok(DropHandle { + handle: spawn_detach(name, future)?, + }) +} + +#[cfg(tokio_unstable)] +fn spawn_detach( + name: &str, + future: F, +) -> std::io::Result> +where + F::Output: Send, +{ + tokio::task::Builder::new().name(name).spawn(future) +} + +#[cfg(not(tokio_unstable))] +fn spawn_detach( + name: &str, + future: F, +) -> std::io::Result> +where + F::Output: Send, +{ + Ok(tokio::spawn(future)) } #[derive(Debug)] @@ -78,6 +100,9 @@ pub enum ConnectPostgresError { /// Error constructing the connection pool BuildPool(BuildError), + + /// Error spawning tokio task + SpawnTask(std::io::Error), } #[derive(Debug)] @@ -441,14 +466,23 @@ impl background_jobs_core::Storage for Storage { } impl Storage { - pub async fn connect(postgres_url: Url) -> Result { + pub async fn connect( + postgres_url: Url, + migration_table: Option<&str>, + ) -> Result { let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls) .await .map_err(ConnectPostgresError::ConnectForMigration)?; - let handle = spawn("postgres-migrations", conn); + let handle = spawn("postgres-migrations", conn)?; - embedded::migrations::runner() + let mut runner = embedded::migrations::runner(); + + if let Some(table_name) = migration_table { + runner.set_migration_table_name(table_name); + } + + runner .run_async(&mut client) .await .map_err(ConnectPostgresError::Migration)?; @@ -495,7 +529,7 @@ impl Storage { let handle = spawn( "postgres-delegate-notifications", delegate_notifications(rx, inner.clone(), parallelism * 8), - ); + )?; let drop_handle = Arc::new(handle); @@ -556,7 +590,7 @@ impl<'a> JobNotifierState<'a> { .or_insert_with(|| Arc::new(Notify::const_new())) .notify_one(); - metrics::counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1); + metrics::counter!("background-jobs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1); } } @@ -574,7 +608,7 @@ async fn delegate_notifications( while let Ok(notification) = receiver.recv_async().await { tracing::trace!("delegate_notifications: looping"); - metrics::counter!("pict-rs.postgres.notification").increment(1); + metrics::counter!("background-jobs.postgres.notification").increment(1); match notification.channel() { "queue_status_channel" => { @@ -623,7 +657,7 @@ fn spawn_db_notification_task( sender: flume::Sender, mut conn: Connection, ) -> std::io::Result<()> { - tokio::task::Builder::new().name("postgres-notifications").spawn(async move { + spawn_detach("postgres-notifications", async move { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { tracing::trace!("db_notification_task: looping"); @@ -699,6 +733,12 @@ impl From for ConnectPostgresError { } } +impl From for ConnectPostgresError { + fn from(value: std::io::Error) -> Self { + Self::SpawnTask(value) + } +} + impl std::fmt::Display for ConnectPostgresError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -707,6 +747,7 @@ impl std::fmt::Display for ConnectPostgresError { write!(f, "Failed to connect to postgres for migrations") } Self::Migration(_) => write!(f, "Failed to run migrations"), + Self::SpawnTask(_) => write!(f, "Failed to spawn task"), } } } @@ -717,6 +758,7 @@ impl std::error::Error for ConnectPostgresError { Self::BuildPool(e) => Some(e), Self::ConnectForMigration(e) => Some(e), Self::Migration(e) => Some(e), + Self::SpawnTask(e) => Some(e), } } }