2023-11-26 02:14:18 +00:00
|
|
|
use async_cpupool::CpuPool;
|
2023-07-27 15:19:20 +00:00
|
|
|
use http_signature_normalization_actix::{Canceled, Spawn};
|
2023-11-26 02:14:18 +00:00
|
|
|
use std::time::Duration;
|
2023-07-27 15:19:20 +00:00
|
|
|
|
2023-11-26 02:14:18 +00:00
|
|
|
#[derive(Clone)]
|
2023-07-27 15:19:20 +00:00
|
|
|
pub(crate) struct Spawner {
|
2023-11-26 02:14:18 +00:00
|
|
|
pool: CpuPool,
|
2023-07-27 15:19:20 +00:00
|
|
|
}
|
|
|
|
|
2023-11-26 02:14:18 +00:00
|
|
|
impl Spawner {
|
2023-11-26 03:17:59 +00:00
|
|
|
pub(crate) fn build(name: &'static str, threads: u16) -> anyhow::Result<Self> {
|
2023-11-26 02:14:18 +00:00
|
|
|
let pool = CpuPool::configure()
|
|
|
|
.name(name)
|
|
|
|
.max_threads(threads)
|
2023-11-26 03:17:59 +00:00
|
|
|
.build()?;
|
2023-11-26 02:14:18 +00:00
|
|
|
|
2023-11-26 03:17:59 +00:00
|
|
|
Ok(Spawner { pool })
|
2023-07-27 15:19:20 +00:00
|
|
|
}
|
|
|
|
|
2023-11-26 02:14:18 +00:00
|
|
|
pub(crate) async fn close(self) {
|
|
|
|
self.pool.close().await;
|
2023-07-27 15:19:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-26 02:14:18 +00:00
|
|
|
impl std::fmt::Debug for Spawner {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("Spawner").finish()
|
2023-07-27 15:19:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn timer<Fut>(fut: Fut) -> Fut::Output
|
|
|
|
where
|
|
|
|
Fut: std::future::Future,
|
|
|
|
{
|
|
|
|
let id = uuid::Uuid::new_v4();
|
|
|
|
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.start");
|
|
|
|
|
|
|
|
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
|
|
|
|
|
|
|
|
// pass the first tick (instant)
|
|
|
|
interval.tick().await;
|
|
|
|
|
|
|
|
let mut fut = std::pin::pin!(fut);
|
|
|
|
|
|
|
|
let mut counter = 0;
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
out = &mut fut => {
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.end");
|
|
|
|
return out;
|
|
|
|
}
|
|
|
|
_ = interval.tick() => {
|
|
|
|
counter += 1;
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.pending");
|
|
|
|
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Spawn for Spawner {
|
|
|
|
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
|
|
|
|
|
|
|
|
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
|
|
|
where
|
|
|
|
Func: FnOnce() -> Out + Send + 'static,
|
|
|
|
Out: Send + 'static,
|
|
|
|
{
|
2023-11-26 02:14:18 +00:00
|
|
|
let pool = self.pool.clone();
|
2023-07-27 15:19:20 +00:00
|
|
|
|
2023-11-26 02:14:18 +00:00
|
|
|
Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) })
|
2023-07-27 15:19:20 +00:00
|
|
|
}
|
|
|
|
}
|
2023-08-17 22:09:35 +00:00
|
|
|
|
|
|
|
impl http_signature_normalization_reqwest::Spawn for Spawner {
|
|
|
|
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, http_signature_normalization_reqwest::Canceled>> + Send>> where T: Send;
|
|
|
|
|
|
|
|
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
|
|
|
where
|
|
|
|
Func: FnOnce() -> Out + Send + 'static,
|
|
|
|
Out: Send + 'static,
|
|
|
|
{
|
2023-11-26 02:14:18 +00:00
|
|
|
let pool = self.pool.clone();
|
2023-08-17 22:09:35 +00:00
|
|
|
|
|
|
|
Box::pin(async move {
|
2023-11-26 02:14:18 +00:00
|
|
|
timer(pool.spawn(func))
|
2023-08-17 22:09:35 +00:00
|
|
|
.await
|
|
|
|
.map_err(|_| http_signature_normalization_reqwest::Canceled)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|