Do backgrounded uploads
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Aode (lion) 2022-05-27 18:27:44 -05:00
parent 9ee5e8084b
commit 72b4508c46
9 changed files with 594 additions and 257 deletions

488
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs-aggregator"
description = "A simple image aggregation service for pict-rs"
version = "0.1.34"
version = "0.2.0-alpha.1"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -18,7 +18,7 @@ default = []
actix-rt = "2.6.0"
actix-web = { version = "4.0.0", default-features = false }
awc = { version = "3.0.0", default-features = false }
bcrypt = "0.12"
bcrypt = "0.13"
console-subscriber = "0.1"
mime = "0.3"
minify-html = "0.8.0"
@ -42,7 +42,7 @@ tracing-subscriber = { version = "0.3", features = [
"fmt",
] }
url = { version = "2.2", features = ["serde"] }
uuid = { version = "0.8.1", features = ["serde", "v4"] }
uuid = { version = "1", features = ["serde", "v4"] }
[dependencies.tracing-actix-web]

View file

@ -1,4 +1,6 @@
use crate::pict::{Extension, Images};
use std::time::Duration;
use crate::pict::{Extension, Images, Upload, Uploads};
use actix_web::{
body::BodyStream, http::StatusCode, web, HttpRequest, HttpResponse, ResponseError,
};
@ -26,6 +28,47 @@ impl Connection {
Connection { upstream, client }
}
#[tracing::instrument(skip_all)]
pub(crate) async fn claim(&self, upload: Upload) -> Result<Images, UploadError> {
let mut attempts = 0;
const CLAIM_ATTEMPT_LIMIT: usize = 10;
loop {
match self.client.get(self.claim_url(&upload)).send().await {
Ok(mut res) => {
match res.status() {
StatusCode::OK => {
return res.json::<Images>().await.map_err(|_| UploadError::Json);
}
StatusCode::UNPROCESSABLE_ENTITY => {
let images =
res.json::<Images>().await.map_err(|_| UploadError::Json)?;
tracing::warn!("{}", images.msg());
return Err(UploadError::Status);
}
StatusCode::NO_CONTENT => {
// continue
}
_ => {
return Err(UploadError::Status);
}
}
}
Err(_) => {
attempts += 1;
if attempts > CLAIM_ATTEMPT_LIMIT {
return Err(UploadError::Status);
}
tokio::time::sleep(Duration::from_secs(1)).await;
// continue
}
}
}
}
pub(crate) async fn thumbnail(
&self,
size: u16,
@ -53,7 +96,7 @@ impl Connection {
&self,
req: &HttpRequest,
body: web::Payload,
) -> Result<Images, UploadError> {
) -> Result<Uploads, UploadError> {
let client_request = self.client.request_from(self.upload_url(), req.head());
let mut client_request = if let Some(addr) = req.head().peer_addr {
@ -69,9 +112,9 @@ impl Connection {
.await
.map_err(|_| UploadError::Request)?;
let images = res.json::<Images>().await.map_err(|_| UploadError::Json)?;
let uploads = res.json::<Uploads>().await.map_err(|_| UploadError::Json)?;
Ok(images)
Ok(uploads)
}
pub(crate) async fn delete(&self, file: &str, token: &str) -> Result<(), UploadError> {
@ -89,9 +132,17 @@ impl Connection {
Ok(())
}
fn claim_url(&self, upload: &Upload) -> String {
let mut url = self.upstream.clone();
url.set_path("/image/backgrounded/claim");
url.set_query(Some(&format!("upload_id={}", upload.id())));
url.to_string()
}
fn upload_url(&self) -> String {
let mut url = self.upstream.clone();
url.set_path("/image");
url.set_path("/image/backgrounded");
url.to_string()
}

View file

@ -241,23 +241,29 @@ impl State {
self.scoped(&format!("static/{}", file))
}
fn thumbnail_path(&self, entry: &Entry, size: u16, extension: pict::Extension) -> String {
fn thumbnail_path(&self, filename: &str, size: u16, extension: pict::Extension) -> String {
self.scoped(&format!(
"image/thumbnail.{}?src={}&size={}",
extension, entry.filename, size
extension, filename, size
))
}
fn srcset(&self, entry: &Entry, extension: pict::Extension) -> String {
connection::VALID_SIZES
.iter()
.map(|size| format!("{} {}w", self.thumbnail_path(entry, *size, extension), size,))
.collect::<Vec<String>>()
.join(", ")
fn srcset(&self, filename: &str, extension: pict::Extension) -> String {
let mut sizes = Vec::new();
for size in connection::VALID_SIZES {
sizes.push(format!(
"{} {}w",
self.thumbnail_path(filename, *size, extension),
size,
))
}
sizes.join(", ")
}
fn image_path(&self, entry: &Entry) -> String {
self.scoped(&format!("image/full/{}", entry.filename))
fn image_path(&self, filename: &str) -> String {
self.scoped(&format!("image/full/{}", filename))
}
}
@ -463,12 +469,25 @@ pub struct Collection {
description: String,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub enum EntryKind {
Pending {
upload_id: String,
},
Ready {
filename: String,
delete_token: String,
},
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Entry {
title: String,
description: String,
filename: String,
delete_token: String,
#[serde(flatten)]
file_info: EntryKind,
}
#[derive(Clone, Debug, serde::Deserialize)]
@ -527,7 +546,7 @@ impl CollectionPath {
}
}
#[derive(Debug, serde::Deserialize)]
#[derive(Clone, Debug, serde::Deserialize)]
struct EntryPath {
collection: Uuid,
entry: Uuid,
@ -540,6 +559,36 @@ struct MoveEntryPath {
direction: Direction,
}
impl Entry {
pub(crate) fn filename(&self) -> Option<&str> {
if let EntryKind::Ready { filename, .. } = &self.file_info {
Some(&filename)
} else {
None
}
}
pub(crate) fn file_parts(&self) -> Option<(&str, &str)> {
if let EntryKind::Ready {
filename,
delete_token,
} = &self.file_info
{
Some((&filename, &delete_token))
} else {
None
}
}
pub(crate) fn upload_id(&self) -> Option<&str> {
if let EntryKind::Pending { upload_id } = &self.file_info {
Some(&upload_id)
} else {
None
}
}
}
impl EntryPath {
fn key(&self) -> String {
format!("{}/entry/{}", self.collection, self.entry)
@ -572,13 +621,13 @@ async fn upload(
conn: web::Data<Connection>,
state: web::Data<State>,
) -> Result<HttpResponse, StateError> {
let images = conn.upload(&req, pl).await.stateful(&state)?;
let uploads = conn.upload(&req, pl).await.stateful(&state)?;
if images.is_err() {
return Err(ErrorKind::UploadString(images.message().to_owned())).stateful(&state);
if uploads.is_err() {
return Err(ErrorKind::UploadString(uploads.message().to_owned())).stateful(&state);
}
let image = images
let upload = uploads
.files()
.next()
.ok_or_else(|| ErrorKind::UploadString("Missing file".to_owned()))
@ -587,8 +636,9 @@ async fn upload(
let entry = Entry {
title: String::new(),
description: String::new(),
filename: image.file().to_owned(),
delete_token: image.delete_token().to_owned(),
file_info: EntryKind::Pending {
upload_id: upload.id().to_owned(),
},
};
let entry_path = EntryPath {
@ -596,6 +646,45 @@ async fn upload(
entry: Uuid::new_v4(),
};
let entry_path2 = entry_path.clone();
let state2 = state.clone();
let upload = upload.clone();
actix_rt::spawn(async move {
match conn.claim(upload).await {
Ok(images) => {
if let Some(image) = images.files().next() {
let res = store::GetEntry {
entry_path: &entry_path2,
}
.exec(&state2.store)
.await;
if let Ok(mut entry) = res {
entry.file_info = EntryKind::Ready {
filename: image.file().to_owned(),
delete_token: image.delete_token().to_owned(),
};
let _ = store::UpdateEntry {
entry_path: &entry_path2,
entry: &entry,
}
.exec(&state2.store)
.await;
}
}
}
Err(e) => {
tracing::warn!("{}", e);
let _ = store::DeleteEntry {
entry_path: &entry_path2,
}
.exec(&state2.store)
.await;
}
}
});
store::CreateEntry {
entry_path: &entry_path,
entry: &entry,
@ -840,9 +929,13 @@ async fn delete_entry(
.stateful(&state);
}
conn.delete(&entry.filename, &entry.delete_token)
.await
.stateful(&state)?;
if let EntryKind::Ready {
filename,
delete_token,
} = &entry.file_info
{
conn.delete(filename, delete_token).await.stateful(&state)?;
}
store::DeleteEntry {
entry_path: &entry_path,
@ -937,11 +1030,16 @@ async fn delete_collection(
.iter()
.map(|(_, entry)| {
let (tx, rx) = tokio::sync::oneshot::channel();
let entry: Entry = entry.clone();
let conn = conn.clone();
actix_rt::spawn(async move {
let _ = tx.send(conn.delete(&entry.filename, &entry.delete_token).await);
});
if let EntryKind::Ready {
filename,
delete_token,
} = entry.file_info.clone()
{
let conn = conn.clone();
actix_rt::spawn(async move {
let _ = tx.send(conn.delete(&filename, &delete_token).await);
});
}
rx
})
.collect::<Vec<_>>();

View file

@ -39,11 +39,7 @@ pub(crate) struct Images {
}
impl Images {
pub(crate) fn is_err(&self) -> bool {
self.files.is_none()
}
pub(crate) fn message(&self) -> &str {
pub(crate) fn msg(&self) -> &str {
&self.msg
}
@ -51,3 +47,34 @@ impl Images {
self.files.iter().flat_map(|v| v.iter())
}
}
#[derive(Clone, serde::Deserialize)]
pub(crate) struct Upload {
upload_id: String,
}
impl Upload {
pub(crate) fn id(&self) -> &str {
&self.upload_id
}
}
#[derive(serde::Deserialize)]
pub(crate) struct Uploads {
msg: String,
uploads: Option<Vec<Upload>>,
}
impl Uploads {
pub(crate) fn is_err(&self) -> bool {
self.uploads.is_none()
}
pub(crate) fn message(&self) -> &str {
&self.msg
}
pub(crate) fn files(&self) -> impl Iterator<Item = &Upload> {
self.uploads.iter().flat_map(|v| v.iter())
}
}

View file

@ -40,6 +40,10 @@ pub(crate) struct MoveEntry<'a> {
pub(crate) move_entry_path: &'a MoveEntryPath,
}
pub(crate) struct GetEntry<'a> {
pub(crate) entry_path: &'a EntryPath,
}
pub(crate) struct UpdateEntry<'a> {
pub(crate) entry_path: &'a EntryPath,
pub(crate) entry: &'a Entry,
@ -79,6 +83,12 @@ impl<'a> MoveEntry<'a> {
}
}
impl<'a> GetEntry<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<Entry, Error> {
store.get_entry(self).await
}
}
impl<'a> UpdateEntry<'a> {
pub(crate) async fn exec(self, store: &Store) -> Result<(), Error> {
store.update_entry(self).await
@ -225,6 +235,22 @@ impl Store {
Ok(())
}
async fn get_entry(&self, config: GetEntry<'_>) -> Result<Entry, Error> {
let entry_key = config.entry_path.key();
let tree = self.tree.clone();
let entry = web::block(move || tree.get(entry_key.as_bytes())).await??;
if let Some(entry) = entry {
let entry = serde_json::from_slice(&entry)?;
Ok(entry)
} else {
Err(Error::Missing)
}
}
async fn update_entry(&self, config: UpdateEntry<'_>) -> Result<(), Error> {
let entry_key = config.entry_path.key();
let entry_value = serde_json::to_string(&config.entry)?;
@ -364,6 +390,9 @@ pub(crate) enum Error {
#[error("Panic in blocking operation")]
Blocking,
#[error("Requested entry is missing")]
Missing,
}
impl From<actix_web::error::BlockingError> for Error {

View file

@ -54,8 +54,13 @@ statics::file_upload_js};
<form method="POST" action="@state.update_entry_path(collection_id, *id, token)">
@:text_input("title", Some("Image Title"), Some(&entry.title))
@:text_area("description", Some("Image Description"), Some(&entry.description))
<input type="hidden" name="filename" value="@entry.filename" />
<input type="hidden" name="delete_token" value="@entry.delete_token" />
@if let Some(upload_id) = entry.upload_id() {
<input type="hidden" name="upload_id" value="@upload_id" />
}
@if let Some((filename, delete_token)) = entry.file_parts() {
<input type="hidden" name="filename" value="@filename" />
<input type="hidden" name="delete_token" value="@delete_token" />
}
<div class="button-group button-space">
@:button("Update Image", ButtonKind::Submit)
@:button_link("Delete Image", &state.delete_entry_path(collection_id, *id, token, false),

View file

@ -2,20 +2,14 @@
@(entry: &Entry, state: &State)
@if let Some(filename) = entry.filename() {
<div class="image-box">
<picture>
<source
type="image/webp"
srcset="@state.srcset(entry, Extension::Webp)"
/>
<source
type="image/jpeg"
srcset="@state.srcset(entry, Extension::Jpg)"
/>
<img
src="@state.image_path(entry)"
title="@entry.title"
alt="@entry.description"
/>
</picture>
<picture>
<source type="image/webp" srcset="@state.srcset(filename, Extension::Webp)" />
<source type="image/jpeg" srcset="@state.srcset(filename, Extension::Jpg)" />
<img src="@state.image_path(filename)" title="@entry.title" alt="@entry.description" />
</picture>
</div>
} else {
<span>Pending</span>
}

View file

@ -5,30 +5,31 @@
@(id: Uuid, collection: &Collection, entries: &[(Uuid, Entry)], state: &State)
@:layout(state, &collection.title, Some(&collection.description), {
<meta property="og:url" content="@state.public_collection_path(id)" />
@for (_, entry) in entries {
<meta property="og:image" content="@state.image_path(entry)" />
}
<meta property="og:url" content="@state.public_collection_path(id)" />
@for (_, entry) in entries {
@if let Some(filename) = entry.filename() {
<meta property="og:image" content="@state.image_path(filename)" />
}
}
}, {
<section>
<article>
<div class="content-group">
<h3>@collection.title</h3>
</div>
<div class="content-group">
<p class="subtitle">@collection.description</p>
</div>
</article>
<ul>
@for (_, entry) in entries {
<li class="content-group even">
<article>
@:image(entry, state)
</article>
</li>
}
</ul>
<article>
<div class="content-group">
<h3>@collection.title</h3>
</div>
<div class="content-group">
<p class="subtitle">@collection.description</p>
</div>
</article>
<ul>
@for (_, entry) in entries {
<li class="content-group even">
<article>
@:image(entry, state)
</article>
</li>
}
</ul>
</section>
@:return_home(state)
})