Port prune_missing from 0.4.x
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
asonix 2023-12-12 16:54:41 -06:00
parent 3ef4097ceb
commit f1c5a56353
4 changed files with 132 additions and 1 deletions

View file

@ -602,7 +602,16 @@ A secure API key can be generated by any password generator.
- `?timestamp={timestamp}` this fetches results older than the specified timestamp for easily
searching into the data. the `timestamp` should be formatted according to RFC3339
- `?limit={limit}` specifies how many results to return per page
- `POST /internal/prune_missing?force={force}` Spawn a background task that will check every hash in
the database for an associated media file, deleting any record that is missing its media.
WARNING: This operation is very destructive. Please take backups before invoking it.
This endpoint can be hit repeatedly to check the progress of the preparations. The returned
`progress` value represents how many records have been marked for pruning.
Optionally, the `force` query parameter can be passed with a value of `true` in order to make
pict-rs spawn another task if the current one seems stuck.
Additionally, all endpoints support setting deadlines, after which the request will cease
processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an

View file

@ -1445,6 +1445,50 @@ fn srv_head(
builder
}
#[derive(serde::Serialize)]
struct PruneResponse {
complete: bool,
progress: u64,
total: u64,
}
#[derive(Debug, serde::Deserialize)]
struct PruneQuery {
force: bool,
}
#[tracing::instrument(name = "Prune missing identifiers", skip(repo))]
async fn prune_missing(
repo: web::Data<ArcRepo>,
query: Option<web::Query<PruneQuery>>,
) -> Result<HttpResponse, Error> {
let total = repo.size().await?;
let progress = if let Some(progress) = repo.get("prune-missing-queued").await? {
progress
.as_ref()
.try_into()
.map(u64::from_be_bytes)
.unwrap_or(0)
} else {
0
};
let complete = repo.get("prune-missing-complete").await?.is_some();
let started = repo.get("prune-missing-started").await?.is_some();
if !started || query.is_some_and(|q| q.force) {
queue::prune_missing(&repo).await?;
}
Ok(HttpResponse::Ok().json(PruneResponse {
complete,
progress,
total,
}))
}
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))]
async fn clean_variants(
repo: web::Data<ArcRepo>,
@ -1729,6 +1773,7 @@ fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
.service(web::resource("/identifier").route(web::get().to(identifier::<S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found)))
.service(web::resource("/hashes").route(web::get().to(page)))
.service(web::resource("/prune_missing").route(web::post().to(prune_missing)))
.configure(extra_config),
);
}

View file

@ -43,6 +43,7 @@ enum Cleanup {
AllVariants,
OutdatedVariants,
OutdatedProxies,
Prune,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
@ -118,6 +119,12 @@ pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
Ok(())
}
pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
pub(crate) async fn queue_ingest(
repo: &ArcRepo,
identifier: &Arc<str>,

View file

@ -1,6 +1,7 @@
use std::sync::Arc;
use streem::IntoStreamer;
use tracing::{Instrument, Span};
use crate::{
config::Configuration,
@ -19,7 +20,7 @@ pub(super) fn perform<'a, S>(
job: serde_json::Value,
) -> LocalBoxFuture<'a, Result<(), Error>>
where
S: Store,
S: Store + 'static,
{
Box::pin(async move {
match serde_json::from_value(job) {
@ -43,6 +44,7 @@ where
Cleanup::AllVariants => all_variants(repo).await?,
Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?,
Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?,
Cleanup::Prune => prune(repo, store).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
@ -212,3 +214,71 @@ async fn hash_variant(
Ok(())
}
#[tracing::instrument(skip_all)]
async fn prune<S>(repo: &ArcRepo, store: &S) -> Result<(), Error>
where
S: Store + 'static,
{
repo.set("prune-missing-started", b"1".to_vec().into())
.await?;
let hash_stream = std::pin::pin!(repo.hashes());
let mut hash_stream = hash_stream.into_streamer();
let mut count: u64 = 0;
while let Some(hash) = hash_stream.try_next().await? {
let repo = repo.clone();
let store = store.clone();
let current_span = Span::current();
let span = tracing::info_span!(parent: current_span, "error-boundary");
let res = crate::sync::spawn(
"prune-missing",
async move {
let mut count = count;
if let Some(ident) = repo.identifier(hash.clone()).await? {
match store.len(&ident).await {
Err(e) if e.is_not_found() => {
super::cleanup_hash(&repo, hash).await?;
count += 1;
repo.set(
"prune-missing-queued",
Vec::from(count.to_be_bytes()).into(),
)
.await?;
}
_ => (),
}
}
Ok(count) as Result<u64, Error>
}
.instrument(span),
)
.await;
match res {
Ok(Ok(updated)) => count = updated,
Ok(Err(e)) => {
tracing::warn!("Prune missing identifier failed - {e:?}");
}
Err(_) => {
tracing::warn!("Prune missing identifier panicked.");
}
}
count += 1;
}
repo.set("prune-missing-complete", b"1".to_vec().into())
.await?;
Ok(())
}