Instrument postgres calls

This commit is contained in:
asonix 2024-01-10 19:20:27 -06:00
parent 5e25b2f11d
commit ac6ad4bc2b
4 changed files with 111 additions and 17 deletions

View file

@ -32,8 +32,6 @@ completion-logging = [
]
error-logging = ["background-jobs-core/error-logging"]
[dependencies]
[dependencies.background-jobs-core]
version = "0.17.0"
path = "jobs-core"

View file

@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.24"
background-jobs-core = { version = "0.17.0-beta.1", path = "../jobs-core" }
@ -17,6 +18,7 @@ diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0"
futures-core = "0.3.30"
metrics = "0.22.0"
pin-project-lite = "0.2.13"
refinery = { version = "0.8.11", features = ["postgres", "tokio-postgres"] }
serde_json = "1.0.111"
time = "0.3.31"

View file

@ -0,0 +1,74 @@
use std::{
future::Future,
time::{Duration, Instant},
};
pub(super) trait Timeout: Future {
fn timeout(self, duration: Duration) -> tokio::time::Timeout<Self>
where
Self: Sized,
{
tokio::time::timeout(duration, self)
}
}
pub(super) trait Metrics: Future {
fn metrics(self, name: &'static str) -> MetricsFuture<Self>
where
Self: Sized,
{
MetricsFuture {
future: self,
metrics: MetricsGuard {
name,
start: Instant::now(),
complete: false,
},
}
}
}
impl<F> Metrics for F where F: Future {}
impl<F> Timeout for F where F: Future {}
pin_project_lite::pin_project! {
pub(super) struct MetricsFuture<F> {
#[pin]
future: F,
metrics: MetricsGuard,
}
}
struct MetricsGuard {
name: &'static str,
start: Instant,
complete: bool,
}
impl<F> Future for MetricsFuture<F>
where
F: Future,
{
type Output = F::Output;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
let out = std::task::ready!(this.future.poll(cx));
this.metrics.complete = true;
std::task::Poll::Ready(out)
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::histogram!(self.name, "complete" => self.complete.to_string())
.record(self.start.elapsed().as_secs_f64());
}
}

View file

@ -1,6 +1,9 @@
mod embedded;
mod future;
mod schema;
use future::{Metrics as _, Timeout as _};
use std::{
collections::{BTreeSet, VecDeque},
error::Error,
@ -227,7 +230,10 @@ impl background_jobs_core::Storage for Storage {
.select(PostgresJob::as_select())
.filter(id.eq(job_id))
.get_result(&mut conn)
.metrics("background-jobs.postgres.info")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
};
@ -258,7 +264,10 @@ impl background_jobs_core::Storage for Storage {
diesel::sql_query("LISTEN queue_status_channel;")
.execute(&mut conn)
.metrics("background-jobs.postgres.listen")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
let count = {
@ -275,6 +284,7 @@ impl background_jobs_core::Storage for Storage {
runner_id.eq(Option::<Uuid>::None),
))
.execute(&mut conn)
.metrics("background-jobs.postgres.requeue")
.await
.map_err(PostgresError::Diesel)?
};
@ -316,7 +326,10 @@ impl background_jobs_core::Storage for Storage {
))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.metrics("background-jobs.postgres.claim")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
};
@ -325,33 +338,31 @@ impl background_jobs_core::Storage for Storage {
return Ok(postgres_job.into());
}
let next_queue = {
let sleep_duration = {
use schema::job_queue::dsl::*;
job_queue
.filter(queue.eq(in_queue).and(status.eq(JobStatus::New)))
.select(diesel::dsl::sql::<Interval>("NOW() - next_queue"))
.get_result::<PgInterval>(&mut conn)
.metrics("background-jobs.postgres.next-queue")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.map(|interval| {
if interval.microseconds < 0 {
Duration::from_micros(interval.microseconds.abs_diff(0))
} else {
Duration::from_secs(0)
}
})
.unwrap_or(Duration::from_secs(5))
};
let sleep_duration = next_queue
.map(|interval| {
if interval.microseconds < 0 {
Duration::from_micros(interval.microseconds.abs_diff(0))
} else {
Duration::from_secs(0)
}
})
.unwrap_or(Duration::from_secs(5));
drop(conn);
if tokio::time::timeout(sleep_duration, notifier.notified())
.await
.is_ok()
{
if notifier.notified().timeout(sleep_duration).await.is_ok() {
tracing::debug!("Notified");
} else {
tracing::debug!("Timed out");
@ -369,7 +380,10 @@ impl background_jobs_core::Storage for Storage {
.filter(id.eq(job_id))
.set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id)))
.execute(&mut conn)
.metrics("background-jobs.postgres.heartbeat")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
}
@ -386,7 +400,10 @@ impl background_jobs_core::Storage for Storage {
.filter(id.eq(return_job_info.id))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.metrics("background-jobs.postgres.complete")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
};
@ -492,7 +509,10 @@ impl Storage {
postgres_job
.insert_into(job_queue)
.execute(&mut conn)
.metrics("background-jobs.postgres.insert")
.timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
}