Add PoolBuilder, set better thread names
This commit is contained in:
parent
756d989195
commit
2d5c5e9061
|
@ -40,13 +40,24 @@ impl<const N: usize> HypertreeMeta<N> {
|
|||
pub(super) fn build(hypertree: db::HypertreeRepo<N>, reaper: Reaper) -> Self {
|
||||
let hypertree = Arc::new(hypertree);
|
||||
|
||||
let maintenance = Pool::build(Arc::clone(&hypertree), reaper.clone(), maintenance_runner);
|
||||
let parallelism = std::thread::available_parallelism()
|
||||
.map(usize::from)
|
||||
.unwrap_or(1);
|
||||
|
||||
let add_to_index = Pool::build(Arc::clone(&hypertree), reaper.clone(), add_to_index_runner);
|
||||
let maintenance = Pool::builder(Arc::clone(&hypertree), reaper.clone(), maintenance_runner)
|
||||
.with_name(String::from("vectordb-hypertree-maintenance"))
|
||||
.finish();
|
||||
|
||||
let find_similarities = Pool::build(hypertree, reaper, similarities_runner)
|
||||
let add_to_index =
|
||||
Pool::builder(Arc::clone(&hypertree), reaper.clone(), add_to_index_runner)
|
||||
.with_name(String::from("vectordb-hypertree-index"))
|
||||
.finish();
|
||||
|
||||
let find_similarities = Pool::builder(hypertree, reaper, similarities_runner)
|
||||
.with_lower_limit(2)
|
||||
.with_upper_limit(16);
|
||||
.with_upper_limit(16 * parallelism)
|
||||
.with_name(String::from("vectordb-hypertree-similarities"))
|
||||
.finish();
|
||||
|
||||
Self {
|
||||
maintenance,
|
||||
|
|
10
src/lib.rs
10
src/lib.rs
|
@ -308,12 +308,16 @@ impl<const N: usize> VectorMeta<N> {
|
|||
.map(usize::from)
|
||||
.unwrap_or(1);
|
||||
|
||||
let reader = Pool::build(std::sync::Arc::clone(&state), reaper.clone(), reader_runner)
|
||||
let reader = Pool::builder(std::sync::Arc::clone(&state), reaper.clone(), reader_runner)
|
||||
.with_lower_limit(2)
|
||||
.with_upper_limit(16 * parallelism);
|
||||
.with_upper_limit(16 * parallelism)
|
||||
.with_name(String::from("vectordb-reader"))
|
||||
.finish();
|
||||
|
||||
let insert_vector =
|
||||
Pool::build(std::sync::Arc::clone(&state), reaper, insert_vectors_runner);
|
||||
Pool::builder(std::sync::Arc::clone(&state), reaper, insert_vectors_runner)
|
||||
.with_name(String::from("vectordb-inserter"))
|
||||
.finish();
|
||||
|
||||
let maintenance =
|
||||
Thread::build(String::from("vectordb-maintenance")).spawn(move |stopper| {
|
||||
|
|
87
src/pool.rs
87
src/pool.rs
|
@ -9,48 +9,70 @@ pub(super) struct Pool<State, Message> {
|
|||
threads: std::sync::Mutex<Vec<Thread>>,
|
||||
state: Arc<State>,
|
||||
reaper: Reaper,
|
||||
func: fn(&State, flume::Receiver<Message>, flume::Receiver<()>),
|
||||
tx: flume::Sender<Message>,
|
||||
rx: flume::Receiver<Message>,
|
||||
size: AtomicUsize,
|
||||
lower_limit: usize,
|
||||
upper_limit: usize,
|
||||
func: fn(&State, flume::Receiver<Message>, flume::Receiver<()>),
|
||||
}
|
||||
|
||||
impl<State, Message> Pool<State, Message>
|
||||
where
|
||||
State: Send + Sync + 'static,
|
||||
Message: Send + 'static,
|
||||
{
|
||||
pub(super) fn build(
|
||||
state: Arc<State>,
|
||||
reaper: Reaper,
|
||||
func: fn(&State, flume::Receiver<Message>, flume::Receiver<()>),
|
||||
) -> Self {
|
||||
pub(super) struct PoolBuilder<State, Message> {
|
||||
state: Arc<State>,
|
||||
reaper: Reaper,
|
||||
func: fn(&State, flume::Receiver<Message>, flume::Receiver<()>),
|
||||
lower_limit: usize,
|
||||
upper_limit: usize,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
impl<State, Message> PoolBuilder<State, Message> {
|
||||
pub(super) fn finish(self) -> Pool<State, Message>
|
||||
where
|
||||
State: Send + Sync + 'static,
|
||||
Message: Send + 'static,
|
||||
{
|
||||
let (tx, rx) = flume::bounded(8);
|
||||
|
||||
let handler_rx = rx.clone();
|
||||
let thread_state = Arc::clone(&state);
|
||||
|
||||
let thread = Thread::build(String::from("vectordb-pool")).spawn(move |stopper| {
|
||||
(func)(&thread_state, handler_rx, stopper);
|
||||
});
|
||||
let func = self.func;
|
||||
|
||||
let size = AtomicUsize::new(1);
|
||||
let threads = (0..self.lower_limit)
|
||||
.map(|_| {
|
||||
let thread_state = Arc::clone(&self.state);
|
||||
let handler_rx = handler_rx.clone();
|
||||
let name = self
|
||||
.name
|
||||
.clone()
|
||||
.unwrap_or_else(|| String::from("vectordb-pool"));
|
||||
|
||||
Thread::build(name).spawn(move |stopper| {
|
||||
(func)(&thread_state, handler_rx, stopper);
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let size = AtomicUsize::new(self.lower_limit);
|
||||
|
||||
Pool {
|
||||
threads: std::sync::Mutex::new(vec![thread]),
|
||||
state,
|
||||
reaper,
|
||||
threads: std::sync::Mutex::new(threads),
|
||||
state: self.state,
|
||||
reaper: self.reaper,
|
||||
func: self.func,
|
||||
tx,
|
||||
rx,
|
||||
size,
|
||||
lower_limit: 1,
|
||||
upper_limit: 1,
|
||||
func,
|
||||
lower_limit: self.lower_limit,
|
||||
upper_limit: self.upper_limit,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn with_name(mut self, name: String) -> Self {
|
||||
self.name = Some(name);
|
||||
self
|
||||
}
|
||||
|
||||
pub(super) fn with_lower_limit(mut self, lower_limit: usize) -> Self {
|
||||
self.lower_limit = lower_limit;
|
||||
self
|
||||
|
@ -60,6 +82,27 @@ where
|
|||
self.upper_limit = upper_limit;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<State, Message> Pool<State, Message>
|
||||
where
|
||||
State: Send + Sync + 'static,
|
||||
Message: Send + 'static,
|
||||
{
|
||||
pub(super) fn builder(
|
||||
state: Arc<State>,
|
||||
reaper: Reaper,
|
||||
func: fn(&State, flume::Receiver<Message>, flume::Receiver<()>),
|
||||
) -> PoolBuilder<State, Message> {
|
||||
PoolBuilder {
|
||||
state,
|
||||
reaper,
|
||||
func,
|
||||
lower_limit: 1,
|
||||
upper_limit: 1,
|
||||
name: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn send_blocking(&self, message: Message) -> Result<(), Message> {
|
||||
self.prepare_send();
|
||||
|
|
Loading…
Reference in a new issue