Make uploads cancel-safe

This commit is contained in:
Aode (lion) 2021-09-11 19:53:26 -05:00
parent bf1a16d7d3
commit 68e4f1f4c8
3 changed files with 243 additions and 165 deletions

4
Cargo.lock generated
View file

@ -20,9 +20,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-form-data" name = "actix-form-data"
version = "0.6.0-beta.5" version = "0.6.0-beta.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6edb0d647fc6899a8dafc38c4c492f33f5cce500776b16c744f9be40c9fc70" checksum = "c6552c90f3283caa08a8114d49f82cb3eacd6038168bc0ffb199b9304f615be2"
dependencies = [ dependencies = [
"actix-multipart", "actix-multipart",
"actix-rt", "actix-rt",

View file

@ -45,7 +45,7 @@ use self::{
config::{Config, Format}, config::{Config, Format},
error::UploadError, error::UploadError,
middleware::{Deadline, Internal, Tracing}, middleware::{Deadline, Internal, Tracing},
upload_manager::{Details, UploadManager}, upload_manager::{Details, UploadManager, UploadManagerSession},
validate::{image_webp, video_mp4}, validate::{image_webp, video_mp4},
}; };
@ -252,7 +252,7 @@ fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> {
/// Handle responding to succesful uploads /// Handle responding to succesful uploads
#[instrument(skip(value, manager))] #[instrument(skip(value, manager))]
async fn upload( async fn upload(
value: Value, value: Value<UploadManagerSession>,
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> { ) -> Result<HttpResponse, UploadError> {
let images = value let images = value
@ -262,15 +262,14 @@ async fn upload(
.ok_or(UploadError::NoFiles)?; .ok_or(UploadError::NoFiles)?;
let mut files = Vec::new(); let mut files = Vec::new();
for image in images.into_iter().filter_map(|i| i.file()) { let images = images
if let Some(alias) = image .into_iter()
.saved_as .filter_map(|i| i.file())
.as_ref() .collect::<Vec<_>>();
.and_then(|s| s.file_name()) for image in &images {
.and_then(|s| s.to_str()) if let Some(alias) = image.result.alias() {
{
info!("Uploaded {} as {:?}", image.filename, alias); info!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = manager.delete_token(alias.to_owned()).await?; let delete_token = image.result.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?; let name = manager.from_alias(alias.to_owned()).await?;
let mut path = manager.image_dir(); let mut path = manager.image_dir();
@ -300,6 +299,9 @@ async fn upload(
} }
} }
for image in images {
image.result.succeed();
}
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
"files": files "files": files
@ -329,9 +331,10 @@ async fn download(
let stream = Box::pin(once(fut)); let stream = Box::pin(once(fut));
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let alias = manager.upload(stream).await?; let session = manager.session().upload(stream).await?;
let alias = session.alias().unwrap().to_owned();
drop(permit); drop(permit);
let delete_token = manager.delete_token(alias.clone()).await?; let delete_token = session.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?; let name = manager.from_alias(alias.to_owned()).await?;
let mut path = manager.image_dir(); let mut path = manager.image_dir();
@ -349,6 +352,7 @@ async fn download(
new_details new_details
}; };
session.succeed();
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
"files": [{ "files": [{
@ -802,11 +806,7 @@ async fn main() -> Result<(), anyhow::Error> {
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager.upload(stream).await.map(|alias| { let res = manager.session().upload(stream).await;
let mut path = PathBuf::new();
path.push(alias);
Some(path)
});
drop(permit); drop(permit);
drop(entered); drop(entered);
@ -836,13 +836,9 @@ async fn main() -> Result<(), anyhow::Error> {
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager let res = manager
.session()
.import(filename, content_type, validate_imports, stream) .import(filename, content_type, validate_imports, stream)
.await .await;
.map(|alias| {
let mut path = PathBuf::new();
path.push(alias);
Some(path)
});
drop(permit); drop(permit);
drop(entered); drop(entered);

View file

@ -33,6 +33,55 @@ pub struct UploadManager {
inner: Arc<UploadManagerInner>, inner: Arc<UploadManagerInner>,
} }
pub struct UploadManagerSession {
manager: UploadManager,
alias: Option<String>,
finished: bool,
}
impl UploadManagerSession {
pub(crate) fn succeed(mut self) {
self.finished = true;
}
pub(crate) fn alias(&self) -> Option<&str> {
self.alias.as_deref()
}
}
impl Drop for UploadManagerSession {
fn drop(&mut self) {
if self.finished {
return;
}
if let Some(alias) = self.alias.take() {
let manager = self.manager.clone();
let span = Span::current();
actix_rt::spawn(async move {
let entered = span.entered();
// undo alias -> hash mapping
debug!("Remove alias -> hash mapping");
if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) {
// undo alias -> id mapping
debug!("Remove alias -> id mapping");
let key = alias_id_key(&alias);
if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) {
// undo hash/id -> alias mapping
debug!("Remove hash/id -> alias mapping");
let id = String::from_utf8_lossy(&id);
let key = alias_key(&hash, &id);
let _ = manager.inner.main_tree.remove(&key);
}
let _ = manager.check_delete_files(hash).await;
}
drop(entered);
});
}
}
}
pub struct Hasher<I, D> { pub struct Hasher<I, D> {
inner: I, inner: I,
hasher: D, hasher: D,
@ -446,6 +495,10 @@ impl UploadManager {
}) })
.await??; .await??;
self.check_delete_files(hash).await
}
async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), UploadError> {
// -- CHECK IF ANY OTHER ALIASES EXIST -- // -- CHECK IF ANY OTHER ALIASES EXIST --
let main_tree = self.inner.main_tree.clone(); let main_tree = self.inner.main_tree.clone();
let (start, end) = alias_key_bounds(&hash); let (start, end) = alias_key_bounds(&hash);
@ -491,123 +544,6 @@ impl UploadManager {
Ok(()) Ok(())
} }
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self, alias: String) -> Result<String, UploadError> {
debug!("Generating delete token");
use rand::distributions::{Alphanumeric, Distribution};
let rng = rand::thread_rng();
let s: String = Alphanumeric
.sample_iter(rng)
.take(10)
.map(char::from)
.collect();
let delete_token = s.clone();
debug!("Saving delete token");
let alias_tree = self.inner.alias_tree.clone();
let key = delete_key(&alias);
let res = web::block(move || {
alias_tree.compare_and_swap(
key.as_bytes(),
None as Option<sled::IVec>,
Some(s.as_bytes()),
)
})
.await??;
if let Err(sled::CompareAndSwapError {
current: Some(ivec),
..
}) = res
{
let s = String::from_utf8(ivec.to_vec())?;
debug!("Returning existing delete token, {}", s);
return Ok(s);
}
debug!("Returning new delete token, {}", delete_token);
Ok(delete_token)
}
/// Upload the file while preserving the filename, optionally validating the uploaded image
#[instrument(skip(self, stream))]
pub(crate) async fn import<E>(
&self,
alias: String,
content_type: mime::Mime,
validate: bool,
mut stream: UploadStream<E>,
) -> Result<String, UploadError>
where
UploadError: From<E>,
E: Unpin + 'static,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (content_type, validated_reader) =
crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone())
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Storing alias");
self.add_existing_alias(&hash, &alias).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
// Return alias to file
Ok(alias)
}
/// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(&self, mut stream: UploadStream<E>) -> Result<String, UploadError>
where
UploadError: From<E>,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (content_type, validated_reader) =
crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone())
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
let alias = self.add_alias(&hash, content_type.clone()).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
// Return alias to file
Ok(alias)
}
/// Fetch the real on-disk filename given an alias /// Fetch the real on-disk filename given an alias
#[instrument(skip(self))] #[instrument(skip(self))]
pub(crate) async fn from_alias(&self, alias: String) -> Result<String, UploadError> { pub(crate) async fn from_alias(&self, alias: String) -> Result<String, UploadError> {
@ -628,6 +564,14 @@ impl UploadManager {
Ok(filename) Ok(filename)
} }
pub(crate) fn session(&self) -> UploadManagerSession {
UploadManagerSession {
manager: self.clone(),
alias: None,
finished: false,
}
}
// Find image variants and remove them from the DB and the disk // Find image variants and remove them from the DB and the disk
#[instrument(skip(self))] #[instrument(skip(self))]
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> { async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> {
@ -682,6 +626,138 @@ impl UploadManager {
} }
Ok(()) Ok(())
} }
}
impl UploadManagerSession {
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<String, UploadError> {
let alias = self
.alias
.clone()
.ok_or_else(|| UploadError::MissingAlias)?;
debug!("Generating delete token");
use rand::distributions::{Alphanumeric, Distribution};
let rng = rand::thread_rng();
let s: String = Alphanumeric
.sample_iter(rng)
.take(10)
.map(char::from)
.collect();
let delete_token = s.clone();
debug!("Saving delete token");
let alias_tree = self.manager.inner.alias_tree.clone();
let key = delete_key(&alias);
let res = web::block(move || {
alias_tree.compare_and_swap(
key.as_bytes(),
None as Option<sled::IVec>,
Some(s.as_bytes()),
)
})
.await??;
if let Err(sled::CompareAndSwapError {
current: Some(ivec),
..
}) = res
{
let s = String::from_utf8(ivec.to_vec())?;
debug!("Returning existing delete token, {}", s);
return Ok(s);
}
debug!("Returning new delete token, {}", delete_token);
Ok(delete_token)
}
/// Upload the file while preserving the filename, optionally validating the uploaded image
#[instrument(skip(self, stream))]
pub(crate) async fn import<E>(
mut self,
alias: String,
content_type: mime::Mime,
validate: bool,
mut stream: UploadStream<E>,
) -> Result<Self, UploadError>
where
UploadError: From<E>,
E: Unpin + 'static,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
self.manager.inner.format.clone(),
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Storing alias");
self.alias = Some(alias.clone());
self.add_existing_alias(&hash, &alias).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
// Return alias to file
Ok(self)
}
/// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(
mut self,
mut stream: UploadStream<E>,
) -> Result<Self, UploadError>
where
UploadError: From<E>,
{
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
debug!("Validating bytes");
let (content_type, validated_reader) = crate::validate::validate_image_bytes(
bytes_mut.freeze(),
self.manager.inner.format.clone(),
)
.await?;
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
self.add_alias(&hash, content_type.clone()).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
// Return alias to file
Ok(self)
}
// check duplicates & store image if new // check duplicates & store image if new
async fn save_upload( async fn save_upload(
@ -699,7 +775,7 @@ impl UploadManager {
} }
// -- WRITE NEW FILE -- // -- WRITE NEW FILE --
let mut real_path = self.image_dir(); let mut real_path = self.manager.image_dir();
real_path.push(name); real_path.push(name);
crate::safe_move_file(tmpfile, real_path).await?; crate::safe_move_file(tmpfile, real_path).await?;
@ -714,7 +790,7 @@ impl UploadManager {
hash: Hash, hash: Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<(Dup, String), UploadError> { ) -> Result<(Dup, String), UploadError> {
let main_tree = self.inner.main_tree.clone(); let main_tree = self.manager.inner.main_tree.clone();
let filename = self.next_file(content_type).await?; let filename = self.next_file(content_type).await?;
let filename2 = filename.clone(); let filename2 = filename.clone();
@ -739,7 +815,7 @@ impl UploadManager {
return Ok((Dup::Exists, name)); return Ok((Dup::Exists, name));
} }
let fname_tree = self.inner.filename_tree.clone(); let fname_tree = self.manager.inner.filename_tree.clone();
let filename2 = filename.clone(); let filename2 = filename.clone();
debug!("Saving filename -> hash relation"); debug!("Saving filename -> hash relation");
web::block(move || fname_tree.insert(filename2, hash.inner)).await??; web::block(move || fname_tree.insert(filename2, hash.inner)).await??;
@ -750,7 +826,7 @@ impl UploadManager {
// generate a short filename that isn't already in-use // generate a short filename that isn't already in-use
#[instrument(skip(self, content_type))] #[instrument(skip(self, content_type))]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> { async fn next_file(&self, content_type: mime::Mime) -> Result<String, UploadError> {
let image_dir = self.image_dir(); let image_dir = self.manager.image_dir();
use rand::distributions::{Alphanumeric, Distribution}; use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10; let mut limit: usize = 10;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
@ -783,9 +859,9 @@ impl UploadManager {
#[instrument(skip(self, hash, alias))] #[instrument(skip(self, hash, alias))]
async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> {
self.save_alias(hash, alias).await??; self.save_alias_hash_mapping(hash, alias).await??;
self.store_alias(hash, alias).await?; self.store_hash_id_alias_mapping(hash, alias).await?;
Ok(()) Ok(())
} }
@ -795,30 +871,40 @@ impl UploadManager {
// This will help if multiple 'users' upload the same file, and one of them wants to delete it // This will help if multiple 'users' upload the same file, and one of them wants to delete it
#[instrument(skip(self, hash, content_type))] #[instrument(skip(self, hash, content_type))]
async fn add_alias( async fn add_alias(
&self, &mut self,
hash: &Hash, hash: &Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<String, UploadError> { ) -> Result<(), UploadError> {
let alias = self.next_alias(hash, content_type).await?; let alias = self.next_alias(hash, content_type).await?;
self.store_alias(hash, &alias).await?; self.store_hash_id_alias_mapping(hash, &alias).await?;
Ok(alias) Ok(())
} }
// Add a pre-defined alias to an existin file // Add a pre-defined alias to an existin file
// //
// DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files
#[instrument(skip(self, hash))] #[instrument(skip(self, hash))]
async fn store_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { async fn store_hash_id_alias_mapping(
&self,
hash: &Hash,
alias: &str,
) -> Result<(), UploadError> {
let alias = alias.to_string(); let alias = alias.to_string();
loop { loop {
debug!("hash -> alias save loop"); debug!("hash -> alias save loop");
let db = self.inner.db.clone(); let db = self.manager.inner.db.clone();
let id = web::block(move || db.generate_id()).await??.to_string(); let id = web::block(move || db.generate_id()).await??.to_string();
let alias_tree = self.manager.inner.alias_tree.clone();
let key = alias_id_key(&alias);
let id2 = id.clone();
debug!("Saving alias -> id mapping");
web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??;
let key = alias_key(&hash.inner, &id); let key = alias_key(&hash.inner, &id);
let main_tree = self.inner.main_tree.clone(); let main_tree = self.manager.inner.main_tree.clone();
let alias2 = alias.clone(); let alias2 = alias.clone();
debug!("Saving hash/id -> alias mapping"); debug!("Saving hash/id -> alias mapping");
let res = web::block(move || { let res = web::block(move || {
@ -827,11 +913,6 @@ impl UploadManager {
.await??; .await??;
if res.is_ok() { if res.is_ok() {
let alias_tree = self.inner.alias_tree.clone();
let key = alias_id_key(&alias);
debug!("Saving alias -> id mapping");
web::block(move || alias_tree.insert(key.as_bytes(), id.as_bytes())).await??;
break; break;
} }
@ -844,7 +925,7 @@ impl UploadManager {
// Generate an alias to the file // Generate an alias to the file
#[instrument(skip(self, hash, content_type))] #[instrument(skip(self, hash, content_type))]
async fn next_alias( async fn next_alias(
&self, &mut self,
hash: &Hash, hash: &Hash,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<String, UploadError> { ) -> Result<String, UploadError> {
@ -859,8 +940,9 @@ impl UploadManager {
.map(char::from) .map(char::from)
.collect(); .collect();
let alias = file_name(s, content_type.clone())?; let alias = file_name(s, content_type.clone())?;
self.alias = Some(alias.clone());
let res = self.save_alias(hash, &alias).await?; let res = self.save_alias_hash_mapping(hash, &alias).await?;
if res.is_ok() { if res.is_ok() {
return Ok(alias); return Ok(alias);
@ -873,16 +955,16 @@ impl UploadManager {
// Save an alias to the database // Save an alias to the database
#[instrument(skip(self, hash))] #[instrument(skip(self, hash))]
async fn save_alias( async fn save_alias_hash_mapping(
&self, &self,
hash: &Hash, hash: &Hash,
alias: &str, alias: &str,
) -> Result<Result<(), UploadError>, UploadError> { ) -> Result<Result<(), UploadError>, UploadError> {
let tree = self.inner.alias_tree.clone(); let tree = self.manager.inner.alias_tree.clone();
let vec = hash.inner.clone(); let vec = hash.inner.clone();
let alias = alias.to_string(); let alias = alias.to_string();
debug!("Saving alias"); debug!("Saving alias -> hash mapping");
let res = web::block(move || { let res = web::block(move || {
tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec)) tree.compare_and_swap(alias.as_bytes(), None as Option<sled::IVec>, Some(vec))
}) })