Enable naming cpupools

This commit is contained in:
asonix 2023-11-24 20:21:07 -06:00
parent fb2881fa84
commit db07b9b855
2 changed files with 23 additions and 9 deletions

View file

@ -7,4 +7,5 @@ edition = "2021"
[dependencies]
flume = "0.11.0"
metrics = "0.21.1"
tracing = "0.1.40"

View file

@ -6,6 +6,7 @@ use std::{
#[derive(Debug)]
pub struct Config {
name: String,
buffer_size: NonZeroUsize,
min_threads: NonZeroU16,
max_threads: NonZeroU16,
@ -14,12 +15,18 @@ pub struct Config {
impl Config {
pub fn new() -> Self {
Config {
name: String::from("cpupool"),
buffer_size: 8usize.try_into().expect("valid nonzero usize"),
min_threads: 1u16.try_into().expect("valid nonzero u16"),
max_threads: 2u16.try_into().expect("valid nonzero u16"),
}
}
pub fn name(mut self, name: String) -> Self {
self.name = name;
self
}
pub fn buffer_size(mut self, buffer_size: NonZeroUsize) -> Self {
self.buffer_size = buffer_size;
self
@ -160,7 +167,7 @@ impl CpuPool {
.thread_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let thread = spawn(thread_id, self.state.receiver.clone());
let thread = spawn(&self.state.name, thread_id, self.state.receiver.clone());
self.state.threads.lock().unwrap().push(thread);
}
@ -204,6 +211,7 @@ impl Default for CpuPool {
type SendFn = Box<dyn FnOnce() + Send>;
struct CpuPoolState {
name: String,
min_threads: NonZeroU16,
max_threads: NonZeroU16,
current_threads: AtomicU64,
@ -216,6 +224,7 @@ struct CpuPoolState {
impl CpuPoolState {
fn new(
Config {
name,
buffer_size,
min_threads,
max_threads,
@ -225,14 +234,18 @@ impl CpuPoolState {
let start_threads = u64::from(u16::from(min_threads));
let threads = ThreadVec::new(start_threads, usize::from(u16::from(max_threads)), |i| {
spawn(i, receiver.clone())
});
let threads = ThreadVec::new(
&name,
start_threads,
usize::from(u16::from(max_threads)),
|name, i| spawn(name, i, receiver.clone()),
);
let current_threads = AtomicU64::new(start_threads);
let thread_id = AtomicU64::new(start_threads);
CpuPoolState {
name,
min_threads,
max_threads,
current_threads,
@ -253,14 +266,14 @@ struct ThreadVec {
}
impl ThreadVec {
fn new<F>(start_threads: u64, max_threads: usize, spawn: F) -> Self
fn new<F>(name: &str, start_threads: u64, max_threads: usize, spawn: F) -> Self
where
F: Fn(u64) -> Thread,
F: Fn(&str, u64) -> Thread,
{
let mut threads = Vec::with_capacity(max_threads);
for i in 0..start_threads {
threads.push((spawn)(i));
threads.push((spawn)(name, i));
}
Self { threads }
@ -321,7 +334,7 @@ impl Drop for SendOnDrop {
}
}
fn spawn(i: u64, receiver: flume::Receiver<SendFn>) -> Thread {
fn spawn(name: &str, i: u64, receiver: flume::Receiver<SendFn>) -> Thread {
let (signal, signal_rx) = flume::bounded(1);
let (closed_tx, closed) = flume::bounded(1);
@ -329,7 +342,7 @@ fn spawn(i: u64, receiver: flume::Receiver<SendFn>) -> Thread {
let closed_tx = SendOnDrop { sender: closed_tx };
let handle = std::thread::Builder::new()
.name(format!("cpupool-{i}"))
.name(format!("{name}-{i}"))
.spawn(move || run(receiver, signal_rx, closed_tx))
.expect("Failed to spawn new thread");