Don't enter across await points

This commit is contained in:
Aode (lion) 2021-09-16 17:51:20 -05:00
parent 31fd819545
commit 06e402ec72
3 changed files with 87 additions and 87 deletions

40
Cargo.lock generated
View file

@ -790,9 +790,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.54" version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1866b355d9c878e5e607473cbe3f63282c0b7aad2db1dbebf55076c686918254" checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
@ -811,9 +811,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.101" version = "0.2.102"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103"
[[package]] [[package]]
name = "local-channel" name = "local-channel"
@ -1329,9 +1329,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.67" version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950" checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -1430,9 +1430,9 @@ checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.1" version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
dependencies = [ dependencies = [
"libc", "libc",
"winapi", "winapi",
@ -1931,9 +1931,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.77" version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"wasm-bindgen-macro", "wasm-bindgen-macro",
@ -1941,9 +1941,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-backend" name = "wasm-bindgen-backend"
version = "0.2.77" version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34c405b4f0658583dba0c1c7c9b694f3cac32655db463b56c254a1c75269523" checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"lazy_static", "lazy_static",
@ -1956,9 +1956,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.77" version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d5a6580be83b19dc570a8f9c324251687ab2184e57086f71625feb57ec77c8" checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [ dependencies = [
"quote", "quote",
"wasm-bindgen-macro-support", "wasm-bindgen-macro-support",
@ -1966,9 +1966,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro-support" name = "wasm-bindgen-macro-support"
version = "0.2.77" version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3775a030dc6f5a0afd8a84981a21cc92a781eb429acef9ecce476d0c9113e92" checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1979,15 +1979,15 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-shared" name = "wasm-bindgen-shared"
version = "0.2.77" version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c279e376c7a8e8752a8f1eaa35b7b0bee6bb9fb0cdacfa97cc3f1f289c87e2b4" checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.54" version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a84d70d1ec7d2da2d26a5bd78f4bca1b8c3254805363ce743b7a05bc30d195a" checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"wasm-bindgen", "wasm-bindgen",

View file

@ -31,6 +31,7 @@ use tracing::{debug, error, info, instrument, subscriber::set_global_default, Sp
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_error::ErrorLayer; use tracing_error::ErrorLayer;
use tracing_futures::Instrument;
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry};
@ -495,17 +496,17 @@ async fn process(
if exists.is_new() { if exists.is_new() {
// Save the transcoded file in another task // Save the transcoded file in another task
debug!("Spawning storage task"); debug!("Spawning storage task");
let span = Span::current();
let manager2 = manager.clone(); let manager2 = manager.clone();
let name = name.clone(); let name = name.clone();
actix_rt::spawn(async move { actix_rt::spawn(
let entered = span.enter(); async move {
if let Err(e) = manager2.store_variant(updated_path, name).await { if let Err(e) = manager2.store_variant(updated_path, name).await {
error!("Error storing variant, {}", e); error!("Error storing variant, {}", e);
return; return;
}
} }
drop(entered); .instrument(Span::current()),
}); );
} }
} }
@ -528,27 +529,27 @@ async fn process(
Details::from_bytes(bytes.clone()).await? Details::from_bytes(bytes.clone()).await?
}; };
let span = tracing::Span::current();
let details2 = details.clone(); let details2 = details.clone();
let bytes2 = bytes.clone(); let bytes2 = bytes.clone();
actix_rt::spawn(async move { actix_rt::spawn(
let entered = span.enter(); async move {
if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await { if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e); tracing::warn!("Error saving thumbnail: {}", e);
return; return;
}
if let Err(e) = manager
.store_variant_details(thumbnail_path.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await {
tracing::warn!("Error saving variant info: {}", e);
}
} }
if let Err(e) = manager .instrument(Span::current()),
.store_variant_details(thumbnail_path.clone(), name.clone(), &details2) );
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await {
tracing::warn!("Error saving variant info: {}", e);
}
drop(entered);
});
Ok((details, bytes)) as Result<(Details, web::Bytes), Error> Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
}; };
@ -825,18 +826,17 @@ async fn main() -> Result<(), anyhow::Error> {
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let manager = manager2.clone(); let manager = manager2.clone();
async move { let span = tracing::info_span!("file-upload", ?filename);
let span = tracing::info_span!("file-upload", ?filename);
let entered = span.enter();
async move {
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager.session().upload(stream).await; let res = manager.session().upload(stream).await;
drop(permit); drop(permit);
drop(entered);
res res
} }
.instrument(span)
})), })),
); );
@ -854,10 +854,9 @@ async fn main() -> Result<(), anyhow::Error> {
Field::array(Field::file(move |filename, content_type, stream| { Field::array(Field::file(move |filename, content_type, stream| {
let manager = manager2.clone(); let manager = manager2.clone();
async move { let span = tracing::info_span!("file-import", ?filename);
let span = tracing::info_span!("file-import", ?filename);
let entered = span.enter();
async move {
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager let res = manager
@ -866,9 +865,9 @@ async fn main() -> Result<(), anyhow::Error> {
.await; .await;
drop(permit); drop(permit);
drop(entered);
res res
} }
.instrument(span)
})), })),
); );

View file

@ -15,6 +15,7 @@ use std::{
}; };
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tracing::{debug, error, info, instrument, warn, Span}; use tracing::{debug, error, info, instrument, warn, Span};
use tracing_futures::Instrument;
// TREE STRUCTURE // TREE STRUCTURE
// - Alias Tree // - Alias Tree
@ -57,27 +58,27 @@ impl Drop for UploadManagerSession {
if let Some(alias) = self.alias.take() { if let Some(alias) = self.alias.take() {
let manager = self.manager.clone(); let manager = self.manager.clone();
let span = Span::current(); actix_rt::spawn(
actix_rt::spawn(async move { async move {
let entered = span.entered(); // undo alias -> hash mapping
// undo alias -> hash mapping debug!("Remove alias -> hash mapping");
debug!("Remove alias -> hash mapping"); if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) {
if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) { // undo alias -> id mapping
// undo alias -> id mapping debug!("Remove alias -> id mapping");
debug!("Remove alias -> id mapping"); let key = alias_id_key(&alias);
let key = alias_id_key(&alias); if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) {
if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) { // undo hash/id -> alias mapping
// undo hash/id -> alias mapping debug!("Remove hash/id -> alias mapping");
debug!("Remove hash/id -> alias mapping"); let id = String::from_utf8_lossy(&id);
let id = String::from_utf8_lossy(&id); let key = alias_key(&hash, &id);
let key = alias_key(&hash, &id); let _ = manager.inner.main_tree.remove(&key);
let _ = manager.inner.main_tree.remove(&key); }
}
let _ = manager.check_delete_files(hash).await; let _ = manager.check_delete_files(hash).await;
}
} }
drop(entered); .instrument(Span::current()),
}); );
} }
} }
} }
@ -515,21 +516,21 @@ impl UploadManager {
// -- DELETE FILES -- // -- DELETE FILES --
let this = self.clone(); let this = self.clone();
debug!("Spawning cleanup task"); debug!("Spawning cleanup task");
let span = Span::current(); actix_rt::spawn(
actix_rt::spawn(async move { async move {
let entered = span.enter(); if let Err(e) = this
if let Err(e) = this .cleanup_files(FilenameIVec::new(filename.clone()))
.cleanup_files(FilenameIVec::new(filename.clone())) .await
.await {
{ error!("Error removing files from fs, {}", e);
error!("Error removing files from fs, {}", e); }
info!(
"Files deleted for {:?}",
String::from_utf8(filename.to_vec())
);
} }
info!( .instrument(Span::current()),
"Files deleted for {:?}", );
String::from_utf8(filename.to_vec())
);
drop(entered);
});
Ok(()) Ok(())
} }