From 29f07743312fcfd88efe0911c75cf6f23b36f21d Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 2 Apr 2022 21:15:39 -0500 Subject: [PATCH] Add ability to claim uploads --- src/backgrounded.rs | 2 +- src/main.rs | 62 +++++++++++++++++++++++++++++++++++++++----- src/queue/process.rs | 2 ++ 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 58b3b04..e696161 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -61,7 +61,7 @@ where let identifier = store.save_async_read(&mut reader).await?; - self.identifier = Some(identifier.clone()); + self.identifier = Some(identifier); Ok(()) } diff --git a/src/main.rs b/src/main.rs index e096f62..66b9a3d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,6 +48,8 @@ mod stream; mod tmp_file; mod validate; +use crate::repo::UploadResult; + use self::{ backgrounded::Backgrounded, config::{Configuration, ImageFormat, Operation}, @@ -60,7 +62,7 @@ use self::{ middleware::{Deadline, Internal}, migrate::LatestDb, queue::queue_generate, - repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo}, + repo::{Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo, UploadId}, serde_str::Serde, store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, stream::{StreamLimit, StreamTimeout}, @@ -165,7 +167,7 @@ async fn upload_backgrounded( queue::queue_ingest(&**repo, identifier, upload_id, None, true).await?; files.push(serde_json::json!({ - "file": upload_id.to_string(), + "upload_id": upload_id.to_string(), })); } @@ -173,12 +175,51 @@ async fn upload_backgrounded( image.result.disarm(); } - Ok(HttpResponse::Created().json(&serde_json::json!({ + Ok(HttpResponse::Accepted().json(&serde_json::json!({ "msg": "ok", - "files": files + "uploads": files }))) } +#[derive(Debug, serde::Deserialize)] +struct ClaimQuery { + upload_id: Serde, +} + +/// Claim a backgrounded upload +#[instrument(name = "Waiting on upload", skip(repo))] +async fn claim_upload( + repo: web::Data, + query: web::Query, +) -> Result { + let upload_id = Serde::into_inner(query.into_inner().upload_id); + + match actix_rt::time::timeout(Duration::from_secs(10), repo.wait(upload_id)).await { + Ok(wait_res) => { + let upload_result = wait_res?; + repo.claim(upload_id).await?; + + match upload_result { + UploadResult::Success { alias, token } => { + Ok(HttpResponse::Ok().json(&serde_json::json!({ + "msg": "ok", + "files": [{ + "file": alias.to_string(), + "delete_token": token.to_string(), + }] + }))) + } + UploadResult::Failure { message } => Ok(HttpResponse::UnprocessableEntity().json( + &serde_json::json!({ + "msg": message, + }), + )), + } + } + Err(_) => Ok(HttpResponse::NoContent().finish()), + } +} + #[derive(Debug, serde::Deserialize)] struct UrlQuery { url: String, @@ -727,9 +768,16 @@ async fn launch( .route(web::post().to(upload::)), ) .service( - web::resource("/backgrounded") - .wrap(backgrounded_form.clone()) - .route(web::post().to(upload_backgrounded::)), + web::scope("/backgrounded") + .service( + web::resource("") + .guard(guard::Post()) + .wrap(backgrounded_form.clone()) + .route(web::post().to(upload_backgrounded::)), + ) + .service( + web::resource("/claim").route(web::get().to(claim_upload::)), + ), ) .service(web::resource("/download").route(web::get().to(download::))) .service( diff --git a/src/queue/process.rs b/src/queue/process.rs index d8be535..6b31a78 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -90,6 +90,8 @@ where let token = session.delete_token().await?; + store.remove(&unprocessed_identifier).await?; + Ok((session, token)) as Result<(Session, DeleteToken), Error> };