jobs-postgres: spawn with task names

This commit is contained in:
asonix 2024-01-10 21:14:39 -06:00
parent f7f6f901f8
commit 72f4fc1cdc

View file

@ -56,16 +56,38 @@ struct DropHandle<T> {
handle: JoinHandle<T>, handle: JoinHandle<T>,
} }
fn spawn<F: Future + Send + 'static>(name: &str, future: F) -> DropHandle<F::Output> fn spawn<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<DropHandle<F::Output>>
where where
F::Output: Send, F::Output: Send,
{ {
DropHandle { Ok(DropHandle {
handle: tokio::task::Builder::new() handle: spawn_detach(name, future)?,
.name(name) })
.spawn(future) }
.expect("Spawned task"),
} #[cfg(tokio_unstable)]
fn spawn_detach<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<JoinHandle<F::Output>>
where
F::Output: Send,
{
tokio::task::Builder::new().name(name).spawn(future)
}
#[cfg(not(tokio_unstable))]
fn spawn_detach<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<JoinHandle<F::Output>>
where
F::Output: Send,
{
Ok(tokio::spawn(future))
} }
#[derive(Debug)] #[derive(Debug)]
@ -78,6 +100,9 @@ pub enum ConnectPostgresError {
/// Error constructing the connection pool /// Error constructing the connection pool
BuildPool(BuildError), BuildPool(BuildError),
/// Error spawning tokio task
SpawnTask(std::io::Error),
} }
#[derive(Debug)] #[derive(Debug)]
@ -441,14 +466,23 @@ impl background_jobs_core::Storage for Storage {
} }
impl Storage { impl Storage {
pub async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> { pub async fn connect(
postgres_url: Url,
migration_table: Option<&str>,
) -> Result<Self, ConnectPostgresError> {
let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls) let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls)
.await .await
.map_err(ConnectPostgresError::ConnectForMigration)?; .map_err(ConnectPostgresError::ConnectForMigration)?;
let handle = spawn("postgres-migrations", conn); let handle = spawn("postgres-migrations", conn)?;
embedded::migrations::runner() let mut runner = embedded::migrations::runner();
if let Some(table_name) = migration_table {
runner.set_migration_table_name(table_name);
}
runner
.run_async(&mut client) .run_async(&mut client)
.await .await
.map_err(ConnectPostgresError::Migration)?; .map_err(ConnectPostgresError::Migration)?;
@ -495,7 +529,7 @@ impl Storage {
let handle = spawn( let handle = spawn(
"postgres-delegate-notifications", "postgres-delegate-notifications",
delegate_notifications(rx, inner.clone(), parallelism * 8), delegate_notifications(rx, inner.clone(), parallelism * 8),
); )?;
let drop_handle = Arc::new(handle); let drop_handle = Arc::new(handle);
@ -556,7 +590,7 @@ impl<'a> JobNotifierState<'a> {
.or_insert_with(|| Arc::new(Notify::const_new())) .or_insert_with(|| Arc::new(Notify::const_new()))
.notify_one(); .notify_one();
metrics::counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1); metrics::counter!("background-jobs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1);
} }
} }
@ -574,7 +608,7 @@ async fn delegate_notifications(
while let Ok(notification) = receiver.recv_async().await { while let Ok(notification) = receiver.recv_async().await {
tracing::trace!("delegate_notifications: looping"); tracing::trace!("delegate_notifications: looping");
metrics::counter!("pict-rs.postgres.notification").increment(1); metrics::counter!("background-jobs.postgres.notification").increment(1);
match notification.channel() { match notification.channel() {
"queue_status_channel" => { "queue_status_channel" => {
@ -623,7 +657,7 @@ fn spawn_db_notification_task(
sender: flume::Sender<Notification>, sender: flume::Sender<Notification>,
mut conn: Connection<Socket, NoTlsStream>, mut conn: Connection<Socket, NoTlsStream>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
tokio::task::Builder::new().name("postgres-notifications").spawn(async move { spawn_detach("postgres-notifications", async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
tracing::trace!("db_notification_task: looping"); tracing::trace!("db_notification_task: looping");
@ -699,6 +733,12 @@ impl From<BuildError> for ConnectPostgresError {
} }
} }
impl From<std::io::Error> for ConnectPostgresError {
fn from(value: std::io::Error) -> Self {
Self::SpawnTask(value)
}
}
impl std::fmt::Display for ConnectPostgresError { impl std::fmt::Display for ConnectPostgresError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
@ -707,6 +747,7 @@ impl std::fmt::Display for ConnectPostgresError {
write!(f, "Failed to connect to postgres for migrations") write!(f, "Failed to connect to postgres for migrations")
} }
Self::Migration(_) => write!(f, "Failed to run migrations"), Self::Migration(_) => write!(f, "Failed to run migrations"),
Self::SpawnTask(_) => write!(f, "Failed to spawn task"),
} }
} }
} }
@ -717,6 +758,7 @@ impl std::error::Error for ConnectPostgresError {
Self::BuildPool(e) => Some(e), Self::BuildPool(e) => Some(e),
Self::ConnectForMigration(e) => Some(e), Self::ConnectForMigration(e) => Some(e),
Self::Migration(e) => Some(e), Self::Migration(e) => Some(e),
Self::SpawnTask(e) => Some(e),
} }
} }
} }