diff --git a/Cargo.toml b/Cargo.toml index 6a79363..43b4c78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,7 @@ flume = "0.10.14" oneshot = "0.1.5" rand = "0.8.5" redb = { version = "1.0.4", features = ["logging"] } +tracing = "0.1.37" + +[dev-dependencies] +tracing-subscriber = { version = "0.3.17", features = ["env-filter", "fmt"] } diff --git a/examples/run.rs b/examples/run.rs index bdee70c..501c6a8 100644 --- a/examples/run.rs +++ b/examples/run.rs @@ -1,9 +1,29 @@ use std::time::Instant; use rand::{thread_rng, Rng}; +use tracing::{metadata::LevelFilter, subscriber::set_global_default}; +use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Layer, Registry}; use vectordb::Vector; +fn init_tracing() -> Result<(), Box> { + let format_layer = tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ); + + let subscriber = Registry::default().with(format_layer); + + set_global_default(subscriber)?; + + Ok(()) +} + fn main() -> Result<(), Box> { + init_tracing()?; + let db = vectordb::VectorDb::open("./vectordb", 2)?; let total_chunks = 32; @@ -14,14 +34,12 @@ fn main() -> Result<(), Box> { .map(|_| thread_rng().gen::<[f32; 7]>().into()) .collect::>(); - println!("Inserting vectors"); + tracing::info!("Inserting vectors"); let now = Instant::now(); db.insert_vectors_blocking(vectors)?; - println!("Inserting vectors - Took {:?}", now.elapsed()); + tracing::info!("Inserting vectors - Took {:?}", now.elapsed()); } - println!(); - let existing_vector: Vector<7> = thread_rng().gen::<[f32; 7]>().into(); db.insert_vector_blocking(existing_vector.clone())?; @@ -31,75 +49,97 @@ fn main() -> Result<(), Box> { .chain((0..5).map(|_| thread_rng().gen::<[f32; 7]>().into())) .collect::>(); - for v in &test_vecs { - println!("Finding similarities"); - let now = Instant::now(); + let span = tracing::info_span!("Similarities"); + let guard = span.enter(); + for (i, v) in test_vecs.iter().enumerate() { + let span = tracing::info_span!("test", id = %i); + let guard = span.enter(); + let similarities = db.find_similarities_blocking(v.clone(), None, 5)?; - println!("Finding similarities - Took {:?}", now.elapsed()); for id in similarities { let similar = db.get_vector_blocking(id)?.expect("Vector exists"); - println!( + tracing::info!( "similar to {}: {}", id, v.squared_euclidean_distance(&similar) ); } + + drop(guard); + drop(span); } + drop(guard); + drop(span); - println!(); + let span = tracing::info_span!("Furthest similarities"); + let guard = span.enter(); + for (i, v) in test_vecs.iter().enumerate() { + let span = tracing::info_span!("test", id = %i); + let guard = span.enter(); - for v in &test_vecs { - println!("Finding furthest similarities"); - let now = Instant::now(); let similarities = db.find_furthest_similarities_blocking(v.clone(), None, 5)?; - println!("Finding furthest similarities - Took {:?}", now.elapsed()); for id in similarities { let similar = db.get_vector_blocking(id)?.expect("Vector exists"); - println!( + tracing::info!( "similar to {}: {}", id, v.squared_euclidean_distance(&similar) ); } + + drop(guard); + drop(span); } + drop(guard); + drop(span); - println!(); + let span = tracing::info_span!("Dissimilarities"); + let guard = span.enter(); + for (i, v) in test_vecs.iter().enumerate() { + let span = tracing::info_span!("test", id = %i); + let guard = span.enter(); - for v in &test_vecs { - println!("Finding dissimilarities"); - let now = Instant::now(); let similarities = db.find_dissimilarities_blocking(v.clone(), None, 5)?; - println!("Finding dissimilarities - Took {:?}", now.elapsed()); for id in similarities { let similar = db.get_vector_blocking(id)?.expect("Vector exists"); - println!( + tracing::info!( "dissimilar to {}: {}", id, v.squared_euclidean_distance(&similar) ); } + + drop(guard); + drop(span); } + drop(guard); + drop(span); - println!(); + let span = tracing::info_span!("Closest dissimilarities"); + let guard = span.enter(); + for (i, v) in test_vecs.iter().enumerate() { + let span = tracing::info_span!("test", id = %i); + let guard = span.enter(); - for v in &test_vecs { - println!("Finding closest dissimilarities"); - let now = Instant::now(); let similarities = db.find_closest_dissimilarities_blocking(v.clone(), None, 5)?; - println!("Finding closest dissimilarities - Took {:?}", now.elapsed()); for id in similarities { let similar = db.get_vector_blocking(id)?.expect("Vector exists"); - println!( + tracing::info!( "dissimilar to {}: {}", id, v.squared_euclidean_distance(&similar) ); } + + drop(guard); + drop(span); } + drop(guard); + drop(span); Ok(()) } diff --git a/src/hypertree.rs b/src/hypertree.rs index 323076c..3297cbc 100644 --- a/src/hypertree.rs +++ b/src/hypertree.rs @@ -18,6 +18,7 @@ pub(super) struct HypertreeMeta { struct MaintenanceMessage { responder: oneshot::Sender>, + follows_from: tracing::Span, } struct FindSimilaritiesCommand { @@ -26,11 +27,13 @@ struct FindSimilaritiesCommand { limit: usize, similarity_style: SimilarityStyle, responder: oneshot::Sender>, + parent: tracing::Span, } struct AddToIndexCommand { vectors: Arc<[(InternalVectorId, Vector)]>, responder: oneshot::Sender>, + parent: tracing::Span, } impl HypertreeMeta { @@ -57,7 +60,10 @@ impl HypertreeMeta { if self .maintenance - .send_blocking(MaintenanceMessage { responder }) + .send_blocking(MaintenanceMessage { + responder, + follows_from: tracing::Span::current(), + }) .is_err() { return Err(TreeError::Closed); @@ -74,7 +80,11 @@ impl HypertreeMeta { if self .add_to_index - .send_blocking(AddToIndexCommand { vectors, responder }) + .send_blocking(AddToIndexCommand { + vectors, + responder, + parent: tracing::Span::current(), + }) .is_err() { return Err(TreeError::Closed); @@ -100,6 +110,7 @@ impl HypertreeMeta { limit, similarity_style, responder, + parent: tracing::Span::current(), }) .is_err() { @@ -115,7 +126,16 @@ fn maintenance_runner( rx: flume::Receiver>, stopper: flume::Receiver<()>, ) { - crate::with_stopper(rx, stopper, |MaintenanceMessage { responder }| { + crate::with_stopper(rx, stopper, |command| { + let MaintenanceMessage { + responder, + follows_from, + } = command; + + let span = tracing::debug_span!("MaintenanceMessage"); + span.follows_from(follows_from); + let guard = span.enter(); + let res = std::panic::catch_unwind(move || { if hypertree.rebuild_hypertree()? { hypertree.cleanup()?; @@ -126,13 +146,15 @@ fn maintenance_runner( match res { Ok(res) => { if responder.send(res).is_err() { - eprintln!("Requester disconnected"); + tracing::warn!("Requester disconnected"); } } Err(e) => { - eprintln!("operation panicked {e:?}"); + tracing::error!("operation panicked {e:?}"); } } + + drop(guard); }) } @@ -148,8 +170,12 @@ fn similarities_runner( limit, similarity_style, responder, + parent, } = command; + let span = tracing::debug_span!(parent: parent, "FindSimilarities"); + let guard = span.enter(); + let res = std::panic::catch_unwind(move || { hypertree.find_similarities(&vector, threshold, limit, similarity_style) }); @@ -157,13 +183,15 @@ fn similarities_runner( match res { Ok(res) => { if responder.send(res).is_err() { - eprintln!("Requester disconnected"); + tracing::warn!("Requester disconnected"); } } Err(e) => { - eprintln!("operation panicked: {e:?}"); + tracing::error!("operation panicked: {e:?}"); } } + + drop(guard); }) } @@ -172,18 +200,29 @@ fn add_to_index_runner( rx: flume::Receiver>, stopper: flume::Receiver<()>, ) { - crate::with_stopper(rx, stopper, |AddToIndexCommand { vectors, responder }| { + crate::with_stopper(rx, stopper, |command| { + let AddToIndexCommand { + vectors, + responder, + parent, + } = command; + + let span = tracing::debug_span!(parent: parent, "AddToIndex"); + let guard = span.enter(); + let res = std::panic::catch_unwind(move || hypertree.add_many_to_index(&vectors)); match res { Ok(res) => { if responder.send(res).is_err() { - eprintln!("Requester disconnected"); + tracing::warn!("Requester disconnected"); } } Err(e) => { - eprintln!("operation panicked: {e:?}"); + tracing::error!("operation panicked: {e:?}"); } } + + drop(guard); }) } diff --git a/src/hypertree/db.rs b/src/hypertree/db.rs index 4ef9311..8c85b61 100644 --- a/src/hypertree/db.rs +++ b/src/hypertree/db.rs @@ -81,7 +81,7 @@ struct HypertreeBytesRange<'a, const N: usize> { struct SetOnDrop<'a>(&'a AtomicBool); -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(crate) enum SimilarityStyle { Similar, FurthestSimilar, @@ -130,6 +130,7 @@ const fn hyperplane_byte_length(n: usize) -> usize { } #[allow(clippy::too_many_arguments)] +#[tracing::instrument(level = "trace", skip_all, fields(vector_id = ?insert_vector_id, vector = ?insert_vector))] fn do_add_to_index<'db, 'txn, const N: usize>( ids_table: &mut Table<'db, 'txn, &'static str, u128>, internal_vector_table: &mut Table<'db, 'txn, InternalVectorId, VectorBytes>, @@ -163,7 +164,7 @@ fn do_add_to_index<'db, 'txn, const N: usize>( let Some((hyperplane_list, bucket_id)) = bucket_opt else { // TODO: maybe should error? - todo!("Didn't find bucket") + unreachable!("Didn't find bucket") }; bucket_multimap.insert(bucket_id, insert_vector_id)?; @@ -445,7 +446,7 @@ where }; let size = bucket.count(); - println!("candidates from bucket {}, size {}", bucket_id, size); + tracing::debug!("candidates from bucket {}, size {}", bucket_id, size); let bucket = match bucket_multimap.get(bucket_id) { Ok(bucket) => bucket, @@ -639,6 +640,7 @@ impl HypertreeRepo { }) } + #[tracing::instrument(level = "trace", skip(self))] pub(super) fn find_similarities( &self, query_vector: &Vector, @@ -665,6 +667,7 @@ impl HypertreeRepo { ) } + #[tracing::instrument(level = "trace", skip_all)] pub(super) fn add_many_to_index( &self, vectors: &[(InternalVectorId, Vector)], @@ -720,6 +723,7 @@ impl HypertreeRepo { Ok(()) } + #[tracing::instrument(level = "trace", skip(self))] fn force_rebuild_hypertree(&self) -> Result<(), TreeError> { let start = std::time::Instant::now(); @@ -736,17 +740,17 @@ impl HypertreeRepo { let vector_len = internal_vector_table.len()?; if vector_len == 0 { - println!("Not rebuilding - no vectors"); + tracing::trace!("Not rebuilding - no vectors"); return Ok(()); } if vector_len < u64::try_from(self.max_bucket_size).expect("bucket size is reasonable") / 2 { - println!("Not rebuilding - small vector count"); + tracing::trace!("Not rebuilding - small vector count"); return Ok(()); } - println!("Rebuilding hypertree: {vector_len} vectors"); + tracing::debug!("Rebuilding hypertree: {vector_len} vectors"); rebuild_table.insert(REBUILD_KEY, vector_len)?; @@ -782,7 +786,7 @@ impl HypertreeRepo { drop(hyperplane_table); drop(bucket_multimap); - println!( + tracing::debug!( "Rebuilding hypertree: {vector_len} vectors - took {:?}", start.elapsed() ); @@ -792,6 +796,7 @@ impl HypertreeRepo { Ok(()) } + #[tracing::instrument(level = "trace", skip(self))] pub(super) fn rebuild_hypertree(&self) -> Result { if self .rebuilding @@ -826,6 +831,7 @@ impl HypertreeRepo { Ok(true) } + #[tracing::instrument(level = "trace", skip(self))] pub(crate) fn cleanup(&self) -> Result<(), TreeError> { let cleanup_guard = SetOnDrop(&self.cleanup); self.cleanup diff --git a/src/lib.rs b/src/lib.rs index 02b24a9..7ffe5d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,16 +35,19 @@ enum ReaderCommand { limit: usize, similarity_style: SimilarityStyle, responder: oneshot::Sender, TreeError>>, + parent: tracing::Span, }, GetVectors { vector_ids: Vec, responder: oneshot::Sender>, TreeError>>, + parent: tracing::Span, }, } struct InsertVectorsCommand { vectors: Vec>, responder: oneshot::Sender, TreeError>>, + parent: tracing::Span, } #[derive(Debug)] @@ -113,6 +116,7 @@ const fn vector_table_definition( } impl VectorDb { + #[tracing::instrument(level = "debug", skip(db_directory), fields(directory = ?db_directory.as_ref()))] pub fn open>( db_directory: P, target_hypertree_count: usize, @@ -131,18 +135,21 @@ impl VectorDb { }) } + #[tracing::instrument(level = "debug", skip(self))] pub fn insert_vector_blocking(&self, vector: Vector) -> Result { let vec = self.inner.insert_vectors_blocking(vec![vector])?; Ok(vec[0]) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn insert_vector(&self, vector: Vector) -> Result { let vec = self.inner.insert_vectors(vec![vector]).await?; Ok(vec[0]) } + #[tracing::instrument(level = "debug", skip(self, vectors))] pub fn insert_vectors_blocking( &self, vectors: Vec>, @@ -150,6 +157,7 @@ impl VectorDb { self.inner.insert_vectors_blocking(vectors) } + #[tracing::instrument(level = "debug", skip(self, vectors))] pub async fn insert_vectors( &self, vectors: Vec>, @@ -157,6 +165,7 @@ impl VectorDb { self.inner.insert_vectors(vectors).await } + #[tracing::instrument(level = "debug", skip(self))] pub fn find_similarities_blocking( &self, vector: Vector, @@ -167,6 +176,7 @@ impl VectorDb { .find_similarities_blocking(vector, threshold, limit, SimilarityStyle::Similar) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn find_similarities( &self, vector: Vector, @@ -178,6 +188,7 @@ impl VectorDb { .await } + #[tracing::instrument(level = "debug", skip(self))] pub fn find_furthest_similarities_blocking( &self, vector: Vector, @@ -192,6 +203,7 @@ impl VectorDb { ) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn find_furthest_similarities( &self, vector: Vector, @@ -203,6 +215,7 @@ impl VectorDb { .await } + #[tracing::instrument(level = "debug", skip(self))] pub fn find_dissimilarities_blocking( &self, vector: Vector, @@ -213,6 +226,7 @@ impl VectorDb { .find_similarities_blocking(vector, threshold, limit, SimilarityStyle::Dissimilar) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn find_dissimilarities( &self, vector: Vector, @@ -224,6 +238,7 @@ impl VectorDb { .await } + #[tracing::instrument(level = "debug", skip(self))] pub fn find_closest_dissimilarities_blocking( &self, vector: Vector, @@ -238,6 +253,7 @@ impl VectorDb { ) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn find_closest_dissimilarities( &self, vector: Vector, @@ -249,18 +265,21 @@ impl VectorDb { .await } + #[tracing::instrument(level = "debug", skip(self))] pub async fn get_vector(&self, vector_id: VectorId) -> Result>, TreeError> { let mut hm = self.inner.get_vectors(vec![vector_id]).await?; Ok(hm.remove(&vector_id)) } + #[tracing::instrument(level = "debug", skip(self))] pub fn get_vector_blocking(&self, vector_id: VectorId) -> Result>, TreeError> { let mut hm = self.inner.get_vectors_blocking(vec![vector_id])?; Ok(hm.remove(&vector_id)) } + #[tracing::instrument(level = "debug", skip(self, vector_ids))] pub async fn get_vectors( &self, vector_ids: Vec, @@ -268,6 +287,7 @@ impl VectorDb { self.inner.get_vectors(vector_ids).await } + #[tracing::instrument(level = "debug", skip(self, vector_ids))] pub fn get_vectors_blocking( &self, vector_ids: Vec, @@ -295,20 +315,21 @@ impl VectorMeta { let insert_vector = Pool::build(std::sync::Arc::clone(&state), reaper, insert_vectors_runner); - let maintenance = Thread::spawn(move |stopper| { - let mut index = 0; + let maintenance = + Thread::build(String::from("vectordb-maintenance")).spawn(move |stopper| { + let mut index = 0; - with_stopper(maintenance_receiver, stopper, |()| { - if let Err(e) = state.hypertrees[index].perform_maintenance() { - eprintln!("Error peforming maintenance: {e:?}"); - } + with_stopper(maintenance_receiver, stopper, |()| { + if let Err(e) = state.hypertrees[index].perform_maintenance() { + tracing::warn!("Error peforming maintenance: {e:?}"); + } - index += 1; - if index >= state.hypertrees.len() { - index = 0; - } - }) - }); + index += 1; + if index >= state.hypertrees.len() { + index = 0; + } + }) + }); Self { reader, @@ -322,7 +343,11 @@ impl VectorMeta { if self .insert_vector - .send_async(InsertVectorsCommand { vectors, responder }) + .send_async(InsertVectorsCommand { + vectors, + responder, + parent: tracing::Span::current(), + }) .await .is_err() { @@ -337,7 +362,11 @@ impl VectorMeta { if self .insert_vector - .send_blocking(InsertVectorsCommand { vectors, responder }) + .send_blocking(InsertVectorsCommand { + vectors, + responder, + parent: tracing::Span::current(), + }) .is_err() { return Err(TreeError::Closed); @@ -357,6 +386,7 @@ impl VectorMeta { .send_async(ReaderCommand::GetVectors { vector_ids, responder, + parent: tracing::Span::current(), }) .await .is_err() @@ -378,6 +408,7 @@ impl VectorMeta { .send_blocking(ReaderCommand::GetVectors { vector_ids, responder, + parent: tracing::Span::current(), }) .is_err() { @@ -404,6 +435,7 @@ impl VectorMeta { limit, similarity_style, responder, + parent: tracing::Span::current(), }) .await .is_err() @@ -431,6 +463,7 @@ impl VectorMeta { limit, similarity_style, responder, + parent: tracing::Span::current(), }) .is_err() { @@ -596,7 +629,11 @@ fn reader_runner( limit, similarity_style, responder, + parent, } => { + let span = tracing::debug_span!(parent: parent, "FindSimilarities"); + let guard = span.enter(); + let res = std::panic::catch_unwind(|| { repo.find_similarities(vector, threshold, limit, similarity_style) }); @@ -604,30 +641,38 @@ fn reader_runner( match res { Ok(res) => { if responder.send(res).is_err() { - eprintln!("Requester disconnected"); + tracing::warn!("Requester disconnected"); } } Err(e) => { - eprintln!("operation panicked: {e:?}"); + tracing::error!("operation panicked: {e:?}"); } } + + drop(guard); } ReaderCommand::GetVectors { vector_ids, responder, + parent, } => { + let span = tracing::debug_span!(parent: parent, "GetVectors"); + let guard = span.enter(); + let res = std::panic::catch_unwind(|| repo.get_vectors(&vector_ids)); match res { Ok(res) => { if responder.send(res).is_err() { - eprintln!("Requester disconnected"); + tracing::warn!("Requester disconnected"); } } Err(e) => { - eprintln!("operation panicked: {e:?}"); + tracing::error!("operation panicked: {e:?}"); } } + + drop(guard); } }) } @@ -637,24 +682,31 @@ fn insert_vectors_runner( rx: flume::Receiver>, stopper: flume::Receiver<()>, ) { - with_stopper( - rx, - stopper, - |InsertVectorsCommand { vectors, responder }| { - let res = std::panic::catch_unwind(|| repo.insert_many_vectors(vectors)); + with_stopper(rx, stopper, |command| { + let InsertVectorsCommand { + vectors, + responder, + parent, + } = command; - match res { - Ok(res) => { - if responder.send(res).is_err() { - eprintln!("Requester disconnected"); - } - } - Err(e) => { - eprintln!("operation panicked: {e:?}"); + let span = tracing::debug_span!(parent: parent, "InsertVectors"); + let guard = span.enter(); + + let res = std::panic::catch_unwind(|| repo.insert_many_vectors(vectors)); + + match res { + Ok(res) => { + if responder.send(res).is_err() { + tracing::warn!("Requester disconnected"); } } - }, - ) + Err(e) => { + tracing::error!("operation panicked: {e:?}"); + } + } + + drop(guard); + }) } fn with_stopper( diff --git a/src/pool.rs b/src/pool.rs index c18b22d..cef3c9f 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -32,7 +32,7 @@ where let handler_rx = rx.clone(); let thread_state = Arc::clone(&state); - let thread = Thread::spawn(move |stopper| { + let thread = Thread::build(String::from("vectordb-pool")).spawn(move |stopper| { (func)(&thread_state, handler_rx, stopper); }); @@ -94,7 +94,7 @@ where let handler_rx = self.rx.clone(); let func = self.func; - let thread = Thread::spawn(move |stopper| { + let thread = Thread::build(String::from("vectordb-pool")).spawn(move |stopper| { (func)(&state, handler_rx, stopper); }); diff --git a/src/reaper.rs b/src/reaper.rs index ef6fdd6..fe511da 100644 --- a/src/reaper.rs +++ b/src/reaper.rs @@ -12,13 +12,15 @@ impl Reaper { pub(super) fn new() -> Self { let (sender, rx) = flume::unbounded(); - let thread = Arc::new(Thread::spawn(move |stopper| { - crate::with_stopper(rx, stopper, |thread: Thread| { - if let Err(e) = thread.shutdown() { - eprintln!("Source thread panicked: {e:?}"); - } - }) - })); + let thread = Arc::new(Thread::build(String::from("vectordb-reaper")).spawn( + move |stopper| { + crate::with_stopper(rx, stopper, |thread: Thread| { + if let Err(e) = thread.shutdown() { + tracing::error!("Source thread panicked: {e:?}"); + } + }) + }, + )); Self { _thread: thread, diff --git a/src/thread.rs b/src/thread.rs index 269d298..8c24e8a 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -6,15 +6,30 @@ pub(super) struct Thread { stopper: flume::Sender<()>, } -impl Thread { - pub(super) fn spawn(func: F) -> Self +pub(super) struct ThreadBuilder { + name: String, +} + +impl ThreadBuilder { + pub(super) fn spawn(self, func: F) -> Thread where F: FnOnce(flume::Receiver<()>) + Send + 'static, { let (stopper, rx) = flume::bounded(1); - let handle = AssertUnwindSafe(Some(std::thread::spawn(move || (func)(rx)))); + let handle = AssertUnwindSafe(Some( + std::thread::Builder::new() + .name(self.name) + .spawn(move || (func)(rx)) + .expect("Spawned thread"), + )); - Self { handle, stopper } + Thread { handle, stopper } + } +} + +impl Thread { + pub(super) fn build(name: String) -> ThreadBuilder { + ThreadBuilder { name } } pub(super) fn shutdown(mut self) -> Result<(), Box> {