diff --git a/src/collector.rs b/src/collector.rs index 6e79bcb..382610c 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -15,6 +15,10 @@ const MINUTES: u64 = 60 * SECONDS; const HOURS: u64 = 60 * MINUTES; const DAYS: u64 = 24 * HOURS; +pub(crate) fn recordable(len: usize) -> u32 { + ((len as u64) % u64::from(u32::MAX)) as u32 +} + type DistributionMap = BTreeMap, Summary>; #[derive(Clone)] @@ -295,11 +299,20 @@ impl Inner { .entry(labels) .or_insert_with(Summary::with_defaults); - histogram.get_inner().clear_with(|samples| { + let h = histogram.get_inner().clear_with(|samples| { for sample in samples { entry.add(*sample); } - }) + }); + + let mut total_len = 0; + for dist_map in d.values() { + total_len += dist_map.len(); + } + + metrics::gauge!("relay.collector.distributions.size").set(recordable(total_len)); + + h } let d = self.distributions.read().unwrap().clone(); @@ -358,6 +371,7 @@ impl MemoryCollector { ) { let mut d = self.inner.descriptions.write().unwrap(); d.entry(key.as_str().to_owned()).or_insert(description); + metrics::gauge!("relay.collector.descriptions.size").set(recordable(d.len())); } pub(crate) fn install(&self) -> Result<(), SetRecorderError> { diff --git a/src/data/last_online.rs b/src/data/last_online.rs index 889d804..d6ac28b 100644 --- a/src/data/last_online.rs +++ b/src/data/last_online.rs @@ -9,10 +9,10 @@ pub(crate) struct LastOnline { impl LastOnline { pub(crate) fn mark_seen(&self, iri: &IriStr) { if let Some(authority) = iri.authority_str() { - self.domains - .lock() - .unwrap() - .insert(authority.to_string(), OffsetDateTime::now_utc()); + let mut guard = self.domains.lock().unwrap(); + guard.insert(authority.to_string(), OffsetDateTime::now_utc()); + metrics::gauge!("relay.last-online.size",) + .set(crate::collector::recordable(guard.len())); } } diff --git a/src/data/state.rs b/src/data/state.rs index d3b699d..8b841a1 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -73,7 +73,9 @@ impl State { } pub(crate) fn cache(&self, object_id: IriString, actor_id: IriString) { - self.object_cache.write().unwrap().put(object_id, actor_id); + let mut guard = self.object_cache.write().unwrap(); + guard.put(object_id, actor_id); + metrics::gauge!("relay.object-cache.size").set(crate::collector::recordable(guard.len())); } pub(crate) fn is_connected(&self, iri: &IriString) -> bool { diff --git a/src/db.rs b/src/db.rs index ba3b7fb..03c591c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -7,7 +7,7 @@ use rsa::{ pkcs8::{DecodePrivateKey, EncodePrivateKey}, RsaPrivateKey, }; -use sled::{Batch, Tree}; +use sled::{transaction::TransactionError, Batch, Transactional, Tree}; use std::{ collections::{BTreeMap, HashMap}, sync::{ @@ -283,10 +283,15 @@ impl Db { pub(crate) async fn check_health(&self) -> Result<(), Error> { let next = self.inner.healthz_counter.fetch_add(1, Ordering::Relaxed); self.unblock(move |inner| { - inner + let res = inner .healthz .insert("healthz", &next.to_be_bytes()[..]) - .map_err(Error::from) + .map_err(Error::from); + + metrics::gauge!("relay.db.healthz.size") + .set(crate::collector::recordable(inner.healthz.len())); + + res }) .await?; self.inner.healthz.flush_async().await?; @@ -349,6 +354,9 @@ impl Db { .actor_id_info .insert(actor_id.as_str().as_bytes(), vec)?; + metrics::gauge!("relay.db.actor-id-info.size") + .set(crate::collector::recordable(inner.actor_id_info.len())); + Ok(()) }) .await @@ -383,6 +391,9 @@ impl Db { .actor_id_instance .insert(actor_id.as_str().as_bytes(), vec)?; + metrics::gauge!("relay.db.actor-id-instance.size") + .set(crate::collector::recordable(inner.actor_id_instance.len())); + Ok(()) }) .await @@ -417,6 +428,9 @@ impl Db { .actor_id_contact .insert(actor_id.as_str().as_bytes(), vec)?; + metrics::gauge!("relay.db.actor-id-contact.size") + .set(crate::collector::recordable(inner.actor_id_contact.len())); + Ok(()) }) .await @@ -447,6 +461,12 @@ impl Db { inner .media_url_media_id .insert(url.as_str().as_bytes(), id.as_bytes())?; + + metrics::gauge!("relay.db.media-id-media-url.size") + .set(crate::collector::recordable(inner.media_id_media_url.len())); + metrics::gauge!("relay.db.media-url-media-id.size") + .set(crate::collector::recordable(inner.media_url_media_id.len())); + Ok(()) }) .await @@ -538,6 +558,14 @@ impl Db { inner .actor_id_actor .insert(actor.id.as_str().as_bytes(), vec)?; + + metrics::gauge!("relay.db.public-key-actor-id.size").set(crate::collector::recordable( + inner.public_key_id_actor_id.len(), + )); + + metrics::gauge!("relay.db.actor-id-actor.size").set(crate::collector::recordable( + inner.public_key_id_actor_id.len(), + )); Ok(()) }) .await @@ -550,6 +578,10 @@ impl Db { .connected_actor_ids .remove(actor_id.as_str().as_bytes())?; + metrics::gauge!("relay.db.connected-actor-ids.size").set(crate::collector::recordable( + inner.connected_actor_ids.len(), + )); + Ok(()) }) .await @@ -562,6 +594,10 @@ impl Db { .connected_actor_ids .insert(actor_id.as_str().as_bytes(), actor_id.as_str().as_bytes())?; + metrics::gauge!("relay.db.connected-actor-ids.size").set(crate::collector::recordable( + inner.connected_actor_ids.len(), + )); + Ok(()) }) .await @@ -569,30 +605,60 @@ impl Db { pub(crate) async fn add_blocks(&self, domains: Vec) -> Result<(), Error> { self.unblock(move |inner| { + let mut connected_batch = Batch::default(); + let mut blocked_batch = Batch::default(); + let mut allowed_batch = Batch::default(); + for connected in inner.connected_by_domain(&domains) { - inner - .connected_actor_ids - .remove(connected.as_str().as_bytes())?; + connected_batch.remove(connected.as_str().as_bytes()); } for authority in &domains { - inner - .blocked_domains - .insert(domain_key(authority), authority.as_bytes())?; - inner.allowed_domains.remove(domain_key(authority))?; + blocked_batch.insert(domain_key(authority), authority.as_bytes()); + allowed_batch.remove(domain_key(authority)); } - Ok(()) + let res = ( + &inner.connected_actor_ids, + &inner.blocked_domains, + &inner.allowed_domains, + ) + .transaction(|(connected, blocked, allowed)| { + inner.connected_actor_ids.apply_batch(&connected_batch)?; + inner.blocked_domains.apply_batch(&blocked_batch)?; + inner.allowed_domains.apply_batch(&allowed_batch)?; + Ok(()) + }); + + metrics::gauge!("relay.db.connected-actor-ids.size").set(crate::collector::recordable( + inner.connected_actor_ids.len(), + )); + metrics::gauge!("relay.db.blocked-domains.size") + .set(crate::collector::recordable(inner.blocked_domains.len())); + metrics::gauge!("relay.db.allowed-domains.size") + .set(crate::collector::recordable(inner.allowed_domains.len())); + + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => Err(e.into()), + } }) .await } pub(crate) async fn remove_blocks(&self, domains: Vec) -> Result<(), Error> { self.unblock(move |inner| { + let mut blocked_batch = Batch::default(); + for authority in &domains { - inner.blocked_domains.remove(domain_key(authority))?; + blocked_batch.remove(domain_key(authority)); } + inner.blocked_domains.apply_batch(blocked_batch)?; + + metrics::gauge!("relay.db.blocked-domains.size") + .set(crate::collector::recordable(inner.blocked_domains.len())); + Ok(()) }) .await @@ -600,12 +666,17 @@ impl Db { pub(crate) async fn add_allows(&self, domains: Vec) -> Result<(), Error> { self.unblock(move |inner| { + let mut allowed_batch = Batch::default(); + for authority in &domains { - inner - .allowed_domains - .insert(domain_key(authority), authority.as_bytes())?; + allowed_batch.insert(domain_key(authority), authority.as_bytes()); } + inner.allowed_domains.apply_batch(allowed_batch)?; + + metrics::gauge!("relay.db.allowed-domains.size") + .set(crate::collector::recordable(inner.allowed_domains.len())); + Ok(()) }) .await @@ -614,17 +685,30 @@ impl Db { pub(crate) async fn remove_allows(&self, domains: Vec) -> Result<(), Error> { self.unblock(move |inner| { if inner.restricted_mode { + let mut connected_batch = Batch::default(); + for connected in inner.connected_by_domain(&domains) { - inner - .connected_actor_ids - .remove(connected.as_str().as_bytes())?; + connected_batch.remove(connected.as_str().as_bytes()); } + + inner.connected_actor_ids.apply_batch(connected_batch)?; + + metrics::gauge!("relay.db.connected-actor-ids.size").set( + crate::collector::recordable(inner.connected_actor_ids.len()), + ); } + let mut allowed_batch = Batch::default(); + for authority in &domains { - inner.allowed_domains.remove(domain_key(authority))?; + allowed_batch.remove(domain_key(authority)); } + inner.allowed_domains.apply_batch(allowed_batch)?; + + metrics::gauge!("relay.db.allowed-domains.size") + .set(crate::collector::recordable(inner.allowed_domains.len())); + Ok(()) }) .await @@ -665,6 +749,10 @@ impl Db { inner .settings .insert("private-key".as_bytes(), pem_pkcs8.as_bytes())?; + + metrics::gauge!("relay.db.settings.size") + .set(crate::collector::recordable(inner.settings.len())); + Ok(()) }) .await diff --git a/src/jobs.rs b/src/jobs.rs index 7c19efe..9427399 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -40,7 +40,12 @@ fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { object } +pub(crate) fn build_storage() -> MetricsStorage> { + MetricsStorage::wrap(Storage::new(TokioTimer)) +} + pub(crate) fn create_workers( + storage: MetricsStorage>, state: State, actors: ActorCache, media: MediaCache, @@ -48,18 +53,15 @@ pub(crate) fn create_workers( ) -> std::io::Result { let deliver_concurrency = config.deliver_concurrency(); - let queue_handle = WorkerConfig::new( - MetricsStorage::wrap(Storage::new(TokioTimer)), - move |queue_handle| { - JobState::new( - state.clone(), - actors.clone(), - JobServer::new(queue_handle), - media.clone(), - config.clone(), - ) - }, - ) + let queue_handle = WorkerConfig::new(storage, move |queue_handle| { + JobState::new( + state.clone(), + actors.clone(), + JobServer::new(queue_handle), + media.clone(), + config.clone(), + ) + }) .register::() .register::() .register::() diff --git a/src/main.rs b/src/main.rs index 3bd94ad..67c4379 100644 --- a/src/main.rs +++ b/src/main.rs @@ -321,10 +321,16 @@ async fn server_main( let sign_spawner2 = sign_spawner.clone(); let verify_spawner2 = verify_spawner.clone(); let config2 = config.clone(); + let job_store = jobs::build_storage(); let server = HttpServer::new(move || { - let job_server = - create_workers(state.clone(), actors.clone(), media.clone(), config.clone()) - .expect("Failed to create job server"); + let job_server = create_workers( + job_store.clone(), + state.clone(), + actors.clone(), + media.clone(), + config.clone(), + ) + .expect("Failed to create job server"); let app = App::new() .app_data(web::Data::new(db.clone()))