Allow naming spawner threads

This commit is contained in:
asonix 2023-07-27 10:53:01 -05:00
parent 5de244b848
commit f24685e700
2 changed files with 26 additions and 17 deletions

View file

@ -259,8 +259,8 @@ async fn do_server_main(
let keys = config.open_keys()?; let keys = config.open_keys()?;
let spawner = Spawner::build(config.signature_threads())?; let spawner = Spawner::build("signature-thread", config.signature_threads())?;
let verify_spawner = Spawner::build((config.signature_threads() / 2).max(1))?; let verify_spawner = Spawner::build("verify-thread", (config.signature_threads() / 8).max(1))?;
let bind_address = config.bind_address(); let bind_address = config.bind_address();
let server = HttpServer::new(move || { let server = HttpServer::new(move || {

View file

@ -6,18 +6,22 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
fn signature_thread(receiver: flume::Receiver<Box<dyn FnOnce() + Send>>, id: usize) { fn spawner_thread(
let guard = MetricsGuard::guard(id); receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
name: &'static str,
id: usize,
) {
let guard = MetricsGuard::guard(name, id);
while let Ok(f) = receiver.recv() { while let Ok(f) = receiver.recv() {
let start = Instant::now(); let start = Instant::now();
metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string()); metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(f)); let res = std::panic::catch_unwind(AssertUnwindSafe(f));
metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string()); metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string()); metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string());
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Signature fn panicked: {e:?}"); tracing::warn!("{name} fn panicked: {e:?}");
} }
} }
@ -26,21 +30,24 @@ fn signature_thread(receiver: flume::Receiver<Box<dyn FnOnce() + Send>>, id: usi
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct Spawner { pub(crate) struct Spawner {
name: &'static str,
sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>, sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>, threads: Option<Arc<Vec<JoinHandle<()>>>>,
} }
struct MetricsGuard { struct MetricsGuard {
name: &'static str,
id: usize, id: usize,
start: Instant, start: Instant,
armed: bool, armed: bool,
} }
impl MetricsGuard { impl MetricsGuard {
fn guard(id: usize) -> Self { fn guard(name: &'static str, id: usize) -> Self {
metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string()); metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string());
Self { Self {
name,
id, id,
start: Instant::now(), start: Instant::now(),
armed: true, armed: true,
@ -54,30 +61,31 @@ impl MetricsGuard {
impl Drop for MetricsGuard { impl Drop for MetricsGuard {
fn drop(&mut self) { fn drop(&mut self) {
metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string()); metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
tracing::warn!("Stopping signature thread"); tracing::warn!("Stopping {} - {}", self.name, self.id);
} }
} }
impl Spawner { impl Spawner {
pub(crate) fn build(threads: usize) -> std::io::Result<Self> { pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result<Self> {
let (sender, receiver) = flume::bounded(8); let (sender, receiver) = flume::bounded(8);
tracing::warn!("Launching {threads} signature threads"); tracing::warn!("Launching {threads} {name}s");
let threads = (0..threads) let threads = (0..threads)
.map(|i| { .map(|i| {
let receiver = receiver.clone(); let receiver = receiver.clone();
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("signature-thread-{i}")) .name(format!("{name}-{i}"))
.spawn(move || { .spawn(move || {
signature_thread(receiver, i); spawner_thread(receiver, name, i);
}) })
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
Ok(Spawner { Ok(Spawner {
name,
sender: Some(sender), sender: Some(sender),
threads: Some(Arc::new(threads)), threads: Some(Arc::new(threads)),
}) })
@ -89,6 +97,7 @@ impl Drop for Spawner {
self.sender.take(); self.sender.take();
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) { if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
tracing::warn!("Joining {}s", self.name);
for thread in threads { for thread in threads {
let _ = thread.join(); let _ = thread.join();
} }