From f1c5a563538073ff8f04a6a2c5e0e737783fcb42 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 12 Dec 2023 16:54:41 -0600 Subject: [PATCH] Port prune_missing from 0.4.x --- README.md | 9 ++++++ src/lib.rs | 45 +++++++++++++++++++++++++++ src/queue.rs | 7 +++++ src/queue/cleanup.rs | 72 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 132 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 875f36a..a721676 100644 --- a/README.md +++ b/README.md @@ -602,7 +602,16 @@ A secure API key can be generated by any password generator. - `?timestamp={timestamp}` this fetches results older than the specified timestamp for easily searching into the data. the `timestamp` should be formatted according to RFC3339 - `?limit={limit}` specifies how many results to return per page +- `POST /internal/prune_missing?force={force}` Spawn a background task that will check every hash in + the database for an associated media file, deleting any record that is missing its media. + WARNING: This operation is very destructive. Please take backups before invoking it. + + This endpoint can be hit repeatedly to check the progress of the preparations. The returned + `progress` value represents how many records have been marked for pruning. + + Optionally, the `force` query parameter can be passed with a value of `true` in order to make + pict-rs spawn another task if the current one seems stuck. Additionally, all endpoints support setting deadlines, after which the request will cease processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an diff --git a/src/lib.rs b/src/lib.rs index 8f8d6a2..b5be2ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1445,6 +1445,50 @@ fn srv_head( builder } +#[derive(serde::Serialize)] +struct PruneResponse { + complete: bool, + progress: u64, + total: u64, +} + +#[derive(Debug, serde::Deserialize)] +struct PruneQuery { + force: bool, +} + +#[tracing::instrument(name = "Prune missing identifiers", skip(repo))] +async fn prune_missing( + repo: web::Data, + query: Option>, +) -> Result { + let total = repo.size().await?; + + let progress = if let Some(progress) = repo.get("prune-missing-queued").await? { + progress + .as_ref() + .try_into() + .map(u64::from_be_bytes) + .unwrap_or(0) + } else { + 0 + }; + + let complete = repo.get("prune-missing-complete").await?.is_some(); + + let started = repo.get("prune-missing-started").await?.is_some(); + + if !started || query.is_some_and(|q| q.force) { + queue::prune_missing(&repo).await?; + } + + Ok(HttpResponse::Ok().json(PruneResponse { + complete, + progress, + total, + })) +} + #[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))] async fn clean_variants( repo: web::Data, @@ -1729,6 +1773,7 @@ fn configure_endpoints( .service(web::resource("/identifier").route(web::get().to(identifier::))) .service(web::resource("/set_not_found").route(web::post().to(set_not_found))) .service(web::resource("/hashes").route(web::get().to(page))) + .service(web::resource("/prune_missing").route(web::post().to(prune_missing))) .configure(extra_config), ); } diff --git a/src/queue.rs b/src/queue.rs index 1db5778..0e26199 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -43,6 +43,7 @@ enum Cleanup { AllVariants, OutdatedVariants, OutdatedProxies, + Prune, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -118,6 +119,12 @@ pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> { Ok(()) } +pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> { + let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?; + repo.push(CLEANUP_QUEUE, job).await?; + Ok(()) +} + pub(crate) async fn queue_ingest( repo: &ArcRepo, identifier: &Arc, diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 41d4c5d..85e7cac 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use streem::IntoStreamer; +use tracing::{Instrument, Span}; use crate::{ config::Configuration, @@ -19,7 +20,7 @@ pub(super) fn perform<'a, S>( job: serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> where - S: Store, + S: Store + 'static, { Box::pin(async move { match serde_json::from_value(job) { @@ -43,6 +44,7 @@ where Cleanup::AllVariants => all_variants(repo).await?, Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?, Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?, + Cleanup::Prune => prune(repo, store).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); @@ -212,3 +214,71 @@ async fn hash_variant( Ok(()) } + +#[tracing::instrument(skip_all)] +async fn prune(repo: &ArcRepo, store: &S) -> Result<(), Error> +where + S: Store + 'static, +{ + repo.set("prune-missing-started", b"1".to_vec().into()) + .await?; + + let hash_stream = std::pin::pin!(repo.hashes()); + let mut hash_stream = hash_stream.into_streamer(); + + let mut count: u64 = 0; + + while let Some(hash) = hash_stream.try_next().await? { + let repo = repo.clone(); + let store = store.clone(); + + let current_span = Span::current(); + + let span = tracing::info_span!(parent: current_span, "error-boundary"); + + let res = crate::sync::spawn( + "prune-missing", + async move { + let mut count = count; + + if let Some(ident) = repo.identifier(hash.clone()).await? { + match store.len(&ident).await { + Err(e) if e.is_not_found() => { + super::cleanup_hash(&repo, hash).await?; + + count += 1; + + repo.set( + "prune-missing-queued", + Vec::from(count.to_be_bytes()).into(), + ) + .await?; + } + _ => (), + } + } + + Ok(count) as Result + } + .instrument(span), + ) + .await; + + match res { + Ok(Ok(updated)) => count = updated, + Ok(Err(e)) => { + tracing::warn!("Prune missing identifier failed - {e:?}"); + } + Err(_) => { + tracing::warn!("Prune missing identifier panicked."); + } + } + + count += 1; + } + + repo.set("prune-missing-complete", b"1".to_vec().into()) + .await?; + + Ok(()) +}