diff --git a/defaults.toml b/defaults.toml index 639764c..91606e0 100644 --- a/defaults.toml +++ b/defaults.toml @@ -21,13 +21,7 @@ max_height = 10000 max_area = 40000000 max_file_size = 40 enable_silent_video = true -filters = [ - 'blur', - 'crop', - 'identity', - 'resize', - 'thumbnail', -] +filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] skip_validate_imports = false cache_duration = 168 diff --git a/dev.toml b/dev.toml index 04e9397..7a642a0 100644 --- a/dev.toml +++ b/dev.toml @@ -21,14 +21,9 @@ max_height = 10000 max_area = 40000000 max_file_size = 40 enable_silent_video = true -filters = [ - 'blur', - 'crop', - 'identity', - 'resize', - 'thumbnail', -] +filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] skip_validate_imports = false +cache_duration = 168 [repo] type = 'sled' diff --git a/src/ingest.rs b/src/ingest.rs index 613bac3..0bd0a2f 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -115,6 +115,7 @@ where S: Store, { pub(crate) fn disarm(&mut self) { + let _ = self.hash.take(); let _ = self.alias.take(); let _ = self.identifier.take(); } diff --git a/src/repo.rs b/src/repo.rs index 4cac295..2510715 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -56,6 +56,7 @@ pub(crate) trait FullRepo: + Clone + Debug { + #[tracing::instrument] async fn identifier_from_alias( &self, alias: &Alias, @@ -64,11 +65,13 @@ pub(crate) trait FullRepo: self.identifier(hash).await } + #[tracing::instrument] async fn aliases_from_alias(&self, alias: &Alias) -> Result, Error> { let hash = self.hash(alias).await?; self.aliases(hash).await } + #[tracing::instrument] async fn still_identifier_from_alias( &self, alias: &Alias, @@ -83,11 +86,13 @@ pub(crate) trait FullRepo: } } + #[tracing::instrument] async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { let hash = self.hash(alias).await?; CachedRepo::create(self, hash).await } + #[tracing::instrument] async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { let hash = self.hash(alias).await?; let hashes = CachedRepo::update(self, hash).await?; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 3269a7d..5034625 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -132,13 +132,15 @@ impl From for UploadResult { } } -#[derive(serde::Serialize, serde::Deserialize)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] struct Bucket { + // each Vec represents a unique image hash inner: HashSet>, } #[async_trait::async_trait(?Send)] impl CachedRepo for SledRepo { + #[tracing::instrument(skip(hash))] async fn create(&self, hash: Self::Bytes) -> Result<(), Error> { let now = DateTime::now(); let bytes = serde_json::to_vec(&now)?; @@ -149,20 +151,26 @@ impl CachedRepo for SledRepo { let mut old = cache_inverse.get(bytes.clone())?; loop { - let new: Option> = if let Some(old) = old.as_ref() { - let mut bucket = serde_cbor::from_slice::(old)?; - bucket.inner.insert(hash.as_ref().to_vec()); - let vec = serde_cbor::to_vec(&bucket)?; - Some(vec) + let mut bucket = if let Some(old) = old.as_ref() { + // unsure of whether to bail on deserialize error or fail with empty bucket + if let Ok(bucket) = serde_cbor::from_slice::(old) { + bucket + } else { + Bucket { + inner: HashSet::new(), + } + } } else { - let mut bucket = Bucket { + Bucket { inner: HashSet::new(), - }; - bucket.inner.insert(hash.as_ref().to_vec()); - let vec = serde_cbor::to_vec(&bucket)?; - Some(vec) + } }; + bucket.inner.insert(hash.as_ref().to_vec()); + tracing::info!("Inserting new {:?}", bucket); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + let new = Some(bucket_bytes); + let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?; if let Err(CompareAndSwapError { current, .. }) = res { @@ -177,26 +185,67 @@ impl CachedRepo for SledRepo { Ok(()) } + #[tracing::instrument(skip(hash))] async fn update(&self, hash: Self::Bytes) -> Result, Error> { let now = DateTime::now(); - let bytes = serde_json::to_vec(&now)?; + let now_bytes = serde_json::to_vec(&now)?; let to_clean = now.min_cache_date(); let to_clean_bytes = serde_json::to_vec(&to_clean)?; let cache_inverse = self.cache_inverse.clone(); let hashes = b!(self.cache, { - let prev_value = cache - .fetch_and_update(hash.clone(), |prev_value| prev_value.map(|_| bytes.clone()))?; + let previous_datetime_opt = cache + .fetch_and_update(hash.clone(), |previous_datetime_opt| { + previous_datetime_opt.map(|_| now_bytes.clone()) + })?; - if let Some(prev_value) = prev_value { - let mut old = cache_inverse.get(prev_value.clone())?; + if let Some(previous_datetime_bytes) = previous_datetime_opt { + // Insert cached media into new date bucket + let mut old = cache_inverse.get(now_bytes.clone())?; + loop { + let mut bucket = if let Some(bucket_bytes) = old.as_ref() { + if let Ok(bucket) = serde_cbor::from_slice::(bucket_bytes) { + bucket + } else { + Bucket { + inner: HashSet::new(), + } + } + } else { + Bucket { + inner: HashSet::new(), + } + }; + + bucket.inner.insert(hash.as_ref().to_vec()); + tracing::info!("Inserting new {:?}", bucket); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + let new = Some(bucket_bytes); + + if let Err(CompareAndSwapError { current, .. }) = + cache_inverse.compare_and_swap(now_bytes.clone(), old, new)? + { + old = current; + } else { + break; + } + } + + // Remove cached media from old date bucket + let mut old = cache_inverse.get(previous_datetime_bytes.clone())?; loop { let new = if let Some(bucket_bytes) = old.as_ref() { if let Ok(mut bucket) = serde_cbor::from_slice::(bucket_bytes) { bucket.inner.remove(hash.as_ref()); - let bucket_bytes = serde_cbor::to_vec(&bucket)?; - Some(bucket_bytes) + if bucket.inner.is_empty() { + tracing::info!("Removed old {:?}", bucket); + None + } else { + tracing::info!("Inserting old {:?}", bucket); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + Some(bucket_bytes) + } } else { None } @@ -205,7 +254,7 @@ impl CachedRepo for SledRepo { }; if let Err(CompareAndSwapError { current, .. }) = - cache_inverse.compare_and_swap(prev_value.clone(), old, new)? + cache_inverse.compare_and_swap(previous_datetime_bytes.clone(), old, new)? { old = current; } else { @@ -219,17 +268,34 @@ impl CachedRepo for SledRepo { for (date_bytes, bucket_bytes) in cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) { + if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { + tracing::info!("Checking {}", datetime); + } else { + tracing::warn!("Invalid date bytes"); + } if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { + tracing::info!("Read for deletion: {:?}", bucket); for hash in bucket.inner { // Best effort cleanup let _ = cache.remove(&hash); hashes.push(hash.into()); } + } else { + tracing::warn!("Invalid bucket"); } cache_inverse.remove(date_bytes)?; } + #[cfg(debug)] + for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) { + if let Ok(datetime) = serde_json::from_slice::(&date_bytes) { + tracing::info!("Not cleaning for {}", datetime); + } else { + tracing::warn!("Invalid date bytes"); + } + } + Ok(hashes) as Result<_, Error> });