Add internal endpoint to trigger upgrade preparation job
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2023-12-05 16:58:52 -06:00
parent 388cb4019f
commit 668b68a23f
4 changed files with 137 additions and 40 deletions

View file

@ -510,6 +510,15 @@ A secure API key can be generated by any password generator.
$ cp -r exports/2023-07-08T22:26:21.194126713Z sled-repo
```
4. Starting pict-rs
- `POST /internal/prepare_upgrade?force={force}` Spawn a background task that will attempt to prepare the database
for the 0.5 upgrade. This process will attempt to generate metadata for all original uploads if
needed.
This endpoint can be hit repeatedly to check the progress of the preparations.
Optionally, the `force` query parameter can be passed with a value of `true` in order to make
pict-rs spawn another task if the current one seems stuck.
Additionally, all endpoints support setting deadlines, after which the request will cease
processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an

View file

@ -114,14 +114,22 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
return Err(UploadError::MissingAlias.into());
};
let details = repo.details(&identifier).await?;
ensure_details_identifier(repo, store, &identifier, details_hint(alias)).await
}
async fn ensure_details_identifier<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,
identifier: &S::Identifier,
hint: Option<ValidInputType>,
) -> Result<Details, Error> {
let details = repo.details(identifier).await?;
if let Some(details) = details {
tracing::debug!("details exist");
Ok(details)
} else {
tracing::debug!("generating new details from {:?}", identifier);
let hint = details_hint(alias);
let new_details = Details::from_store(
store.clone(),
identifier.clone(),
@ -130,7 +138,7 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
)
.await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
repo.relate_details(identifier, &new_details).await?;
tracing::debug!("stored");
Ok(new_details)
}
@ -693,25 +701,9 @@ async fn process<R: FullRepo, S: Store + 'static>(
.await?;
if let Some(identifier) = identifier_opt {
let details = repo.details(&identifier).await?;
let hint = Some(ValidInputType::from_format(format));
let details = if let Some(details) = details {
tracing::debug!("details exist");
details
} else {
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(
(**store).clone(),
identifier.clone(),
Some(ValidInputType::from_format(format)),
CONFIG.media.process_timeout,
)
.await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
new_details
};
let details = ensure_details_identifier(&repo, &store, &identifier, hint).await?;
return ranged_file_resp(&store, identifier, range, details, not_found).await;
}
@ -790,25 +782,9 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
.await?;
if let Some(identifier) = identifier_opt {
let details = repo.details(&identifier).await?;
let hint = Some(ValidInputType::from_format(format));
let details = if let Some(details) = details {
tracing::debug!("details exist");
details
} else {
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(
(**store).clone(),
identifier.clone(),
Some(ValidInputType::from_format(format)),
CONFIG.media.process_timeout,
)
.await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
new_details
};
let details = ensure_details_identifier(&repo, &store, &identifier, hint).await?;
return ranged_file_head_resp(&store, identifier, range, details).await;
}
@ -1043,6 +1019,50 @@ fn srv_head(
builder
}
#[derive(serde::Serialize)]
struct UpgradeResponse {
complete: bool,
progress: u64,
total: u64,
}
#[derive(Debug, serde::Deserialize)]
struct UpgradeQuery {
force: bool,
}
#[tracing::instrument(name = "Prepare for 0.5 upgrade", skip(repo))]
async fn prepare_upgrade<R: FullRepo>(
repo: web::Data<R>,
query: Option<web::Query<UpgradeQuery>>,
) -> Result<HttpResponse, Error> {
let total = repo.size().await?;
let progress = if let Some(progress) = repo.get("upgrade-preparations-progress").await? {
progress
.as_ref()
.try_into()
.map(u64::from_be_bytes)
.unwrap_or(0)
} else {
0
};
let complete = repo.get("upgrade-preparations-complete").await?.is_some();
let started = repo.get("upgrade-preparations-started").await?.is_some();
if !started || query.is_some_and(|q| q.force) {
queue::queue_prepare_upgrade(&repo).await?;
}
Ok(HttpResponse::Ok().json(UpgradeResponse {
complete,
progress,
total,
}))
}
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))]
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> {
queue::cleanup_all_variants(&repo).await?;
@ -1242,6 +1262,9 @@ fn configure_endpoints<
.service(web::resource("/aliases").route(web::get().to(aliases::<R>)))
.service(web::resource("/identifier").route(web::get().to(identifier::<R, S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found::<R>)))
.service(
web::resource("/prepare_upgrade").route(web::post().to(prepare_upgrade::<R>)),
)
.configure(extra_config),
);
}

View file

@ -76,6 +76,7 @@ enum Process {
process_path: PathBuf,
process_args: Vec<String>,
},
PrepareUpgrade,
}
pub(crate) async fn cleanup_alias<R: QueueRepo>(
@ -124,6 +125,12 @@ pub(crate) async fn cleanup_all_variants<R: QueueRepo>(repo: &R) -> Result<(), E
Ok(())
}
pub(crate) async fn queue_prepare_upgrade<R: QueueRepo>(repo: &R) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::PrepareUpgrade)?;
repo.push(PROCESS_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
identifier: Vec<u8>,

View file

@ -7,7 +7,7 @@ use crate::{
serde_str::Serde,
store::{Identifier, Store},
};
use futures_util::TryStreamExt;
use futures_util::{StreamExt, TryStreamExt};
use reqwest_middleware::ClientWithMiddleware;
use std::path::PathBuf;
use url::Url;
@ -62,6 +62,7 @@ where
)
.await?
}
Process::PrepareUpgrade => prepare_upgrade(repo, store).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
@ -225,3 +226,60 @@ async fn generate<R: FullRepo, S: Store + 'static>(
Ok(())
}
#[tracing::instrument(skip_all)]
async fn prepare_upgrade<R: FullRepo + 'static, S: Store + 'static>(
repo: &R,
store: &S,
) -> Result<(), Error> {
repo.set("upgrade-preparations-started", b"1".to_vec().into())
.await?;
let mut hashes = std::pin::pin!(repo.hashes().await);
let mut count: u64 = 0;
while let Some(res) = hashes.next().await {
match res {
Ok(hash) => {
let repo = repo.clone();
let store = store.clone();
let res = actix_rt::spawn(async move {
if let Some(identifier) = repo.identifier(hash).await? {
crate::ensure_details_identifier(&repo, &store, &identifier, None).await?;
}
repo.set(
"upgrade-preparations-progress",
Vec::from(count.to_be_bytes()).into(),
)
.await?;
Ok(()) as Result<(), Error>
})
.await;
match res {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::warn!("Failed to ensure details for a hash: {e}");
}
Err(_) => {
tracing::warn!("Panic while ensuring details for a hash");
}
}
}
Err(e) => {
tracing::warn!("Skipping upgrade check for a hash: {e}");
}
}
count += 1;
}
repo.set("upgrade-preparations-complete", b"1".to_vec().into())
.await?;
Ok(())
}