Remove dedicated hyperthread maintenance threads

This commit is contained in:
asonix 2023-07-05 18:16:34 -05:00
parent 97f5ef8fa8
commit 39b3fed29c

View file

@ -11,16 +11,11 @@ type SimilaritiesResult<const N: usize> =
#[derive(Debug)]
pub(super) struct HypertreeMeta<const N: usize> {
maintenance: Pool<db::HypertreeRepo<N>, MaintenanceMessage<N>>,
hypertree: Arc<db::HypertreeRepo<N>>,
add_to_index: Pool<db::HypertreeRepo<N>, AddToIndexCommand<N>>,
find_similarities: Pool<db::HypertreeRepo<N>, FindSimilaritiesCommand<N>>,
}
struct MaintenanceMessage<const N: usize> {
responder: oneshot::Sender<Result<(), TreeError>>,
follows_from: tracing::Span,
}
struct FindSimilaritiesCommand<const N: usize> {
vector: Arc<Vector<N>>,
threshold: Option<f32>,
@ -44,43 +39,30 @@ impl<const N: usize> HypertreeMeta<N> {
.map(usize::from)
.unwrap_or(1);
let maintenance = Pool::builder(Arc::clone(&hypertree), reaper.clone(), maintenance_runner)
.with_name(String::from("vectordb-hypertree-maintenance"))
.finish();
let add_to_index =
Pool::builder(Arc::clone(&hypertree), reaper.clone(), add_to_index_runner)
.with_name(String::from("vectordb-hypertree-index"))
.finish();
let find_similarities = Pool::builder(hypertree, reaper, similarities_runner)
let find_similarities = Pool::builder(Arc::clone(&hypertree), reaper, similarities_runner)
.with_lower_limit(2)
.with_upper_limit(16 * parallelism)
.with_name(String::from("vectordb-hypertree-similarities"))
.finish();
Self {
maintenance,
hypertree,
add_to_index,
find_similarities,
}
}
pub(super) fn perform_maintenance(&self) -> Result<(), TreeError> {
let (responder, rx) = oneshot::channel();
if self
.maintenance
.send_blocking(MaintenanceMessage {
responder,
follows_from: tracing::Span::current(),
})
.is_err()
{
return Err(TreeError::Closed);
if self.hypertree.rebuild_hypertree()? {
self.hypertree.cleanup()?;
}
rx.recv()?
Ok(())
}
pub(super) fn add_many_to_index(
@ -132,43 +114,6 @@ impl<const N: usize> HypertreeMeta<N> {
}
}
fn maintenance_runner<const N: usize>(
hypertree: &db::HypertreeRepo<N>,
rx: flume::Receiver<MaintenanceMessage<N>>,
stopper: flume::Receiver<()>,
) {
crate::with_stopper(rx, stopper, |command| {
let MaintenanceMessage {
responder,
follows_from,
} = command;
let span = tracing::debug_span!("MaintenanceMessage");
span.follows_from(follows_from);
let guard = span.enter();
let res = std::panic::catch_unwind(move || {
if hypertree.rebuild_hypertree()? {
hypertree.cleanup()?;
}
Ok(())
});
match res {
Ok(res) => {
if responder.send(res).is_err() {
tracing::warn!("Requester disconnected");
}
}
Err(e) => {
tracing::error!("operation panicked {e:?}");
}
}
drop(guard);
})
}
fn similarities_runner<const N: usize>(
hypertree: &db::HypertreeRepo<N>,
rx: flume::Receiver<FindSimilaritiesCommand<N>>,