From 8540e93469ebda44cef78bbe3b6d7df533aff9cd Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 25 Nov 2023 20:14:18 -0600 Subject: [PATCH] Use async-cpupool --- Cargo.lock | 4 +- Cargo.toml | 3 +- src/main.rs | 11 ++-- src/spawner.rs | 142 ++++++++----------------------------------------- 4 files changed, 33 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 837c342..1531253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1746,9 +1746,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "lru" -version = "0.11.1" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7" dependencies = [ "hashbrown 0.14.2", ] diff --git a/Cargo.toml b/Cargo.toml index ac798f8..cdd5737 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ console-subscriber = { version = "0.2", optional = true } dashmap = "5.1.0" dotenv = "0.15.0" flume = "0.11.0" -lru = "0.11.0" +lru = "0.12.0" metrics = "0.21.0" metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [ "http-listener", @@ -49,6 +49,7 @@ opentelemetry = "0.21" opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } opentelemetry-otlp = "0.14" pin-project-lite = "0.2.9" +# pinned to metrics-util quanta = "0.11.0" rand = "0.8" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]} diff --git a/src/main.rs b/src/main.rs index 427ca29..085aca3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -311,11 +311,11 @@ async fn do_server_main( } }; - let verify_spawner = Spawner::build("verify-cpu", verify_threads)?; - let sign_spawner = Spawner::build("sign-cpu", signature_threads)?; + let verify_spawner = Spawner::build("verify-cpu", verify_threads.try_into()?); + let sign_spawner = Spawner::build("sign-cpu", signature_threads.try_into()?); let key_id = config.generate_url(UrlKind::MainKey).to_string(); - let state = State::build(db.clone(), key_id, sign_spawner, client).await?; + let state = State::build(db.clone(), key_id, sign_spawner.clone(), client).await?; if let Some((token, admin_handle)) = config.telegram_info() { tracing::warn!("Creating telegram handler"); @@ -325,6 +325,8 @@ async fn do_server_main( let keys = config.open_keys()?; let bind_address = config.bind_address(); + let sign_spawner2 = sign_spawner.clone(); + let verify_spawner2 = verify_spawner.clone(); let server = HttpServer::new(move || { let job_server = create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); @@ -410,6 +412,9 @@ async fn do_server_main( server.bind(bind_address)?.run().await?; } + sign_spawner2.close().await; + verify_spawner2.close().await; + tracing::warn!("Server closed"); Ok(()) diff --git a/src/spawner.rs b/src/spawner.rs index 3b611d7..c9f8d1b 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -1,107 +1,31 @@ +use async_cpupool::CpuPool; use http_signature_normalization_actix::{Canceled, Spawn}; -use std::{ - panic::AssertUnwindSafe, - sync::Arc, - thread::JoinHandle, - time::{Duration, Instant}, -}; +use std::time::Duration; -fn spawner_thread( - receiver: flume::Receiver>, - name: &'static str, - id: usize, -) { - let guard = MetricsGuard::guard(name, id); - - while let Ok(f) = receiver.recv() { - let start = Instant::now(); - metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string()); - let res = std::panic::catch_unwind(AssertUnwindSafe(f)); - metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string()); - metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string()); - - if let Err(e) = res { - tracing::warn!("{name} fn panicked: {e:?}"); - } - } - - guard.disarm(); -} - -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct Spawner { - name: &'static str, - sender: Option>>, - threads: Option>>>, -} - -struct MetricsGuard { - name: &'static str, - id: usize, - start: Instant, - armed: bool, -} - -impl MetricsGuard { - fn guard(name: &'static str, id: usize) -> Self { - metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string()); - - Self { - name, - id, - start: Instant::now(), - armed: true, - } - } - - fn disarm(mut self) { - self.armed = false; - } -} - -impl Drop for MetricsGuard { - fn drop(&mut self) { - metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); - tracing::warn!("Stopping {} - {}", self.name, self.id); - } + pool: CpuPool, } impl Spawner { - pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result { - let (sender, receiver) = flume::bounded(8); + pub(crate) fn build(name: &'static str, threads: u16) -> Self { + let pool = CpuPool::configure() + .name(name) + .max_threads(threads) + .build() + .expect("valid configuration"); - tracing::warn!("Launching {threads} {name}s"); + Spawner { pool } + } - let threads = (0..threads) - .map(|i| { - let receiver = receiver.clone(); - std::thread::Builder::new() - .name(format!("{name}-{i}")) - .spawn(move || { - spawner_thread(receiver, name, i); - }) - }) - .collect::, _>>()?; - - Ok(Spawner { - name, - sender: Some(sender), - threads: Some(Arc::new(threads)), - }) + pub(crate) async fn close(self) { + self.pool.close().await; } } -impl Drop for Spawner { - fn drop(&mut self) { - self.sender.take(); - - if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { - tracing::warn!("Joining {}s", self.name); - for thread in threads { - let _ = thread.join(); - } - } +impl std::fmt::Debug for Spawner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Spawner").finish() } } @@ -144,22 +68,9 @@ impl Spawn for Spawner { Func: FnOnce() -> Out + Send + 'static, Out: Send + 'static, { - let sender = self.sender.as_ref().expect("Sender exists").clone(); + let pool = self.pool.clone(); - Box::pin(async move { - let (tx, rx) = flume::bounded(1); - - let _ = sender - .send_async(Box::new(move || { - if tx.try_send((func)()).is_err() { - tracing::warn!("Requestor hung up"); - metrics::increment_counter!("relay.spawner.disconnected"); - } - })) - .await; - - timer(rx.recv_async()).await.map_err(|_| Canceled) - }) + Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) }) } } @@ -171,21 +82,10 @@ impl http_signature_normalization_reqwest::Spawn for Spawner { Func: FnOnce() -> Out + Send + 'static, Out: Send + 'static, { - let sender = self.sender.as_ref().expect("Sender exists").clone(); + let pool = self.pool.clone(); Box::pin(async move { - let (tx, rx) = flume::bounded(1); - - let _ = sender - .send_async(Box::new(move || { - if tx.try_send((func)()).is_err() { - tracing::warn!("Requestor hung up"); - metrics::increment_counter!("relay.spawner.disconnected"); - } - })) - .await; - - timer(rx.recv_async()) + timer(pool.spawn(func)) .await .map_err(|_| http_signature_normalization_reqwest::Canceled) })