From ba68bcbde3ac7eefae5bc6bfd7d14b62bf89ca47 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 25 Sep 2021 15:23:05 -0500 Subject: [PATCH] Add better span information to commands, spawned tasks --- Cargo.lock | 16 +++++ Cargo.toml | 1 + src/magick.rs | 7 --- src/main.rs | 82 +++++++++++++++++-------- src/stream.rs | 137 ++++++++++++++++++++++++++---------------- src/upload_manager.rs | 18 +++++- 6 files changed, 175 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 898a636..b1deccd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1185,6 +1185,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-actix-web", + "tracing-awc", "tracing-error", "tracing-futures", "tracing-log", @@ -2061,6 +2062,21 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing-awc" +version = "0.1.0-beta.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d97e7ee4c4b5414ec091e5d6be8194f87c680332f549dd2a73e4c506d0a9b84a" +dependencies = [ + "actix-http", + "awc", + "bytes", + "futures-core", + "serde", + "tracing", + "tracing-futures", +] + [[package]] name = "tracing-core" version = "0.1.20" diff --git a/Cargo.toml b/Cargo.toml index 2f105b7..9b861ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ time = { version = "0.3.0", features = ["serde"] } tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" +tracing-awc = "0.1.0-beta.4" tracing-error = "0.1.2" tracing-futures = "0.2.4" tracing-log = "0.1.2" diff --git a/src/magick.rs b/src/magick.rs index 7cf5db6..ebf0d48 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -146,13 +146,6 @@ pub(crate) fn process_image_write_read( let convert_args = ["convert", "-"]; let last_arg = format!("{}:-", format.to_magick_format()); - tracing::info!( - "Spawning command: {} {:?} {:?} {}", - command, - convert_args, - args, - last_arg - ); let process = Process::spawn( Command::new(command) .args(convert_args) diff --git a/src/main.rs b/src/main.rs index e6cf7ab..8d80686 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,10 @@ use futures_util::{ Stream, }; use once_cell::sync::Lazy; -use opentelemetry::{sdk::{Resource, propagation::TraceContextPropagator}, KeyValue}; +use opentelemetry::{ + sdk::{propagation::TraceContextPropagator, Resource}, + KeyValue, +}; use opentelemetry_otlp::WithExportConfig; use std::{ collections::HashSet, @@ -22,6 +25,7 @@ use std::{ time::SystemTime, }; use structopt::StructOpt; +use tracing_awc::Propagate; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{ @@ -88,6 +92,7 @@ type OutcomeSender = Sender<(Details, web::Bytes)>; type ProcessMap = DashMap>; struct CancelSafeProcessor { + span: Span, path: PathBuf, receiver: Option>, fut: F, @@ -100,19 +105,29 @@ where pub(crate) fn new(path: PathBuf, fut: F) -> Self { let entry = PROCESS_MAP.entry(path.clone()); - let receiver = match entry { + let (receiver, span) = match entry { Entry::Vacant(vacant) => { vacant.insert(Vec::new()); - None + let span = tracing::info_span!( + "Processing image", + path = &tracing::field::debug(&path), + completed = &tracing::field::Empty, + ); + (None, span) } Entry::Occupied(mut occupied) => { let (tx, rx) = tokio::sync::oneshot::channel(); occupied.get_mut().push(tx); - Some(rx) + let span = tracing::info_span!( + "Waiting for processed image", + path = &tracing::field::debug(&path), + ); + (Some(rx), span) } }; CancelSafeProcessor { + span, path, receiver, fut, @@ -127,30 +142,35 @@ where type Output = Result<(Details, web::Bytes), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(ref mut rx) = self.receiver { - Pin::new(rx) - .poll(cx) - .map(|res| res.map_err(|_| UploadError::Canceled.into())) - } else { - Pin::new(&mut self.fut).poll(cx).map(|res| { - let opt = PROCESS_MAP.remove(&self.path); - res.map(|tup| { - if let Some((_, vec)) = opt { - for sender in vec { - let _ = sender.send(tup.clone()); + let span = self.span.clone(); + + span.in_scope(|| { + if let Some(ref mut rx) = self.receiver { + Pin::new(rx) + .poll(cx) + .map(|res| res.map_err(|_| UploadError::Canceled.into())) + } else { + Pin::new(&mut self.fut).poll(cx).map(|res| { + let opt = PROCESS_MAP.remove(&self.path); + res.map(|tup| { + if let Some((_, vec)) = opt { + for sender in vec { + let _ = sender.send(tup.clone()); + } } - } - tup + tup + }) }) - }) - } + } + }) } } impl Drop for CancelSafeProcessor { fn drop(&mut self) { if self.receiver.is_none() { - PROCESS_MAP.remove(&self.path); + let completed = PROCESS_MAP.remove(&self.path).is_none(); + self.span.record("completed", &completed); } } } @@ -330,7 +350,7 @@ async fn download( manager: web::Data, query: web::Query, ) -> Result { - let mut res = client.get(&query.url).send().await?; + let mut res = client.get(&query.url).propagate().send().await?; if !res.status().is_success() { return Err(UploadError::Download(res.status()).into()); @@ -453,7 +473,7 @@ async fn process_details( } /// Process files -#[instrument(name = "Processing image", skip(manager, whitelist))] +#[instrument(name = "Serving processed image", skip(manager, whitelist))] async fn process( range: Option, query: web::Query, @@ -497,6 +517,13 @@ async fn process( debug!("Spawning storage task"); let manager2 = manager.clone(); let name = name.clone(); + let span = tracing::info_span!( + parent: None, + "Storing variant info", + path = &tracing::field::debug(&updated_path), + name = &tracing::field::display(&name), + ); + span.follows_from(Span::current()); actix_rt::spawn( async move { if let Err(e) = manager2.store_variant(updated_path, name).await { @@ -504,7 +531,7 @@ async fn process( return; } } - .instrument(Span::current()), + .instrument(span), ); } } @@ -528,6 +555,13 @@ async fn process( Details::from_bytes(bytes.clone()).await? }; + let save_span = tracing::info_span!( + parent: None, + "Saving variant information", + path = tracing::field::debug(&thumbnail_path), + name = tracing::field::display(&name), + ); + save_span.follows_from(Span::current()); let details2 = details.clone(); let bytes2 = bytes.clone(); actix_rt::spawn( @@ -547,7 +581,7 @@ async fn process( tracing::warn!("Error saving variant info: {}", e); } } - .instrument(Span::current()), + .instrument(save_span), ); Ok((details, bytes)) as Result<(Details, web::Bytes), Error> diff --git a/src/stream.rs b/src/stream.rs index b6e95ba..e8c1ad8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -14,17 +14,20 @@ use tokio::{ sync::oneshot::{channel, Receiver}, }; use tokio_util::codec::{BytesCodec, FramedRead}; -use tracing::instrument; +use tracing::Instrument; +use tracing::Span; #[derive(Debug)] struct StatusError; pub(crate) struct Process { child: Child, + span: Span, } pub(crate) struct ProcessRead { inner: I, + span: Span, err_recv: Receiver, err_closed: bool, handle: JoinHandle<()>, @@ -33,21 +36,28 @@ pub(crate) struct ProcessRead { struct BytesFreezer(S); impl Process { - fn new(child: Child) -> Self { - Process { child } - } - - #[instrument(name = "Spawning command")] pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result { - tracing::info!("Spawning"); Self::spawn(Command::new(command).args(args)) } + fn spawn_span(&self) -> Span { + let span = tracing::info_span!(parent: None, "Spawned command writer",); + + span.follows_from(self.span.clone()); + + span + } + pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result { - cmd.stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map(Process::new) + let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped()); + + let span = tracing::info_span!( + "Spawning Command", + command = &tracing::field::debug(&cmd), + exception.message = &tracing::field::Empty, + exception.details = &tracing::field::Empty, + ); + cmd.spawn().map(|child| Process { child, span }) } pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option { @@ -56,30 +66,34 @@ impl Process { let (tx, rx) = channel(); + let span = self.spawn_span(); let mut child = self.child; + let handle = actix_rt::spawn( + async move { + if let Err(e) = stdin.write_all_buf(&mut input).await { + let _ = tx.send(e); + return; + } + drop(stdin); - let handle = actix_rt::spawn(async move { - if let Err(e) = stdin.write_all_buf(&mut input).await { - let _ = tx.send(e); - return; - } - drop(stdin); - - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = - tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx + .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + } + Err(e) => { + let _ = tx.send(e); } } - Err(e) => { - let _ = tx.send(e); - } } - }); + .instrument(span), + ); Some(Box::pin(ProcessRead { inner: stdout, + span: self.span, err_recv: rx, err_closed: false, handle, @@ -95,30 +109,34 @@ impl Process { let (tx, rx) = channel(); + let span = self.spawn_span(); let mut child = self.child; + let handle = actix_rt::spawn( + async move { + if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { + let _ = tx.send(e); + return; + } + drop(stdin); - let handle = actix_rt::spawn(async move { - if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await { - let _ = tx.send(e); - return; - } - drop(stdin); - - match child.wait().await { - Ok(status) => { - if !status.success() { - let _ = - tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx + .send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError)); + } + } + Err(e) => { + let _ = tx.send(e); } } - Err(e) => { - let _ = tx.send(e); - } } - }); + .instrument(span), + ); Some(Box::pin(ProcessRead { inner: stdout, + span: self.span, err_recv: rx, err_closed: false, handle, @@ -141,20 +159,33 @@ where cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - if !self.err_closed { - if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { - self.err_closed = true; - if let Ok(err) = res { - return Poll::Ready(Err(err)); + let span = self.span.clone(); + span.in_scope(|| { + if !self.err_closed { + if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { + self.err_closed = true; + if let Ok(err) = res { + let display = format!("{}", err); + let debug = format!("{:?}", err); + span.record("exception.message", &display.as_str()); + span.record("exception.details", &debug.as_str()); + return Poll::Ready(Err(err)); + } } } - } - if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { - return Poll::Ready(res); - } + if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { + if let Err(err) = &res { + let display = format!("{}", err); + let debug = format!("{:?}", err); + span.record("exception.message", &display.as_str()); + span.record("exception.details", &debug.as_str()); + } + return Poll::Ready(res); + } - Poll::Pending + Poll::Pending + }) } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 5de5199..393b31a 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -58,6 +58,12 @@ impl Drop for UploadManagerSession { if let Some(alias) = self.alias.take() { let manager = self.manager.clone(); + let cleanup_span = tracing::info_span!( + parent: None, + "Upload cleanup", + alias = &tracing::field::display(&alias), + ); + cleanup_span.follows_from(Span::current()); actix_rt::spawn( async move { // undo alias -> hash mapping @@ -77,7 +83,7 @@ impl Drop for UploadManagerSession { let _ = manager.check_delete_files(hash).await; } } - .instrument(Span::current()), + .instrument(cleanup_span), ); } } @@ -195,6 +201,7 @@ pub(crate) struct Details { } impl Details { + #[tracing::instrument("Details from bytes", skip(input))] pub(crate) async fn from_bytes(input: web::Bytes) -> Result { let details = crate::magick::details_bytes(input).await?; @@ -205,6 +212,7 @@ impl Details { )) } + #[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))] pub(crate) async fn from_path

(path: P) -> Result where P: AsRef, @@ -515,6 +523,12 @@ impl UploadManager { // -- DELETE FILES -- let this = self.clone(); + let cleanup_span = tracing::info_span!( + parent: None, + "Cleanup", + filename = &tracing::field::display(String::from_utf8_lossy(&filename)), + ); + cleanup_span.follows_from(Span::current()); debug!("Spawning cleanup task"); actix_rt::spawn( async move { @@ -529,7 +543,7 @@ impl UploadManager { String::from_utf8(filename.to_vec()) ); } - .instrument(Span::current()), + .instrument(cleanup_span), ); Ok(())