Add better span information to commands, spawned tasks

This commit is contained in:
Aode (lion) 2021-09-25 15:23:05 -05:00
parent 5be9c604de
commit ba68bcbde3
6 changed files with 175 additions and 86 deletions

16
Cargo.lock generated
View file

@ -1185,6 +1185,7 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-actix-web",
"tracing-awc",
"tracing-error",
"tracing-futures",
"tracing-log",
@ -2061,6 +2062,21 @@ dependencies = [
"syn",
]
[[package]]
name = "tracing-awc"
version = "0.1.0-beta.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97e7ee4c4b5414ec091e5d6be8194f87c680332f549dd2a73e4c506d0a9b84a"
dependencies = [
"actix-http",
"awc",
"bytes",
"futures-core",
"serde",
"tracing",
"tracing-futures",
]
[[package]]
name = "tracing-core"
version = "0.1.20"

View file

@ -35,6 +35,7 @@ time = { version = "0.3.0", features = ["serde"] }
tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] }
tokio-util = { version = "0.6", default-features = false, features = ["codec"] }
tracing = "0.1.15"
tracing-awc = "0.1.0-beta.4"
tracing-error = "0.1.2"
tracing-futures = "0.2.4"
tracing-log = "0.1.2"

View file

@ -146,13 +146,6 @@ pub(crate) fn process_image_write_read(
let convert_args = ["convert", "-"];
let last_arg = format!("{}:-", format.to_magick_format());
tracing::info!(
"Spawning command: {} {:?} {:?} {}",
command,
convert_args,
args,
last_arg
);
let process = Process::spawn(
Command::new(command)
.args(convert_args)

View file

@ -11,7 +11,10 @@ use futures_util::{
Stream,
};
use once_cell::sync::Lazy;
use opentelemetry::{sdk::{Resource, propagation::TraceContextPropagator}, KeyValue};
use opentelemetry::{
sdk::{propagation::TraceContextPropagator, Resource},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use std::{
collections::HashSet,
@ -22,6 +25,7 @@ use std::{
time::SystemTime,
};
use structopt::StructOpt;
use tracing_awc::Propagate;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{
@ -88,6 +92,7 @@ type OutcomeSender = Sender<(Details, web::Bytes)>;
type ProcessMap = DashMap<PathBuf, Vec<OutcomeSender>>;
struct CancelSafeProcessor<F> {
span: Span,
path: PathBuf,
receiver: Option<Receiver<(Details, web::Bytes)>>,
fut: F,
@ -100,19 +105,29 @@ where
pub(crate) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
let receiver = match entry {
let (receiver, span) = match entry {
Entry::Vacant(vacant) => {
vacant.insert(Vec::new());
None
let span = tracing::info_span!(
"Processing image",
path = &tracing::field::debug(&path),
completed = &tracing::field::Empty,
);
(None, span)
}
Entry::Occupied(mut occupied) => {
let (tx, rx) = tokio::sync::oneshot::channel();
occupied.get_mut().push(tx);
Some(rx)
let span = tracing::info_span!(
"Waiting for processed image",
path = &tracing::field::debug(&path),
);
(Some(rx), span)
}
};
CancelSafeProcessor {
span,
path,
receiver,
fut,
@ -127,30 +142,35 @@ where
type Output = Result<(Details, web::Bytes), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut rx) = self.receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
Pin::new(&mut self.fut).poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(&self.path);
res.map(|tup| {
if let Some((_, vec)) = opt {
for sender in vec {
let _ = sender.send(tup.clone());
let span = self.span.clone();
span.in_scope(|| {
if let Some(ref mut rx) = self.receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
Pin::new(&mut self.fut).poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(&self.path);
res.map(|tup| {
if let Some((_, vec)) = opt {
for sender in vec {
let _ = sender.send(tup.clone());
}
}
}
tup
tup
})
})
})
}
}
})
}
}
impl<F> Drop for CancelSafeProcessor<F> {
fn drop(&mut self) {
if self.receiver.is_none() {
PROCESS_MAP.remove(&self.path);
let completed = PROCESS_MAP.remove(&self.path).is_none();
self.span.record("completed", &completed);
}
}
}
@ -330,7 +350,7 @@ async fn download(
manager: web::Data<UploadManager>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> {
let mut res = client.get(&query.url).send().await?;
let mut res = client.get(&query.url).propagate().send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
@ -453,7 +473,7 @@ async fn process_details(
}
/// Process files
#[instrument(name = "Processing image", skip(manager, whitelist))]
#[instrument(name = "Serving processed image", skip(manager, whitelist))]
async fn process(
range: Option<range::RangeHeader>,
query: web::Query<ProcessQuery>,
@ -497,6 +517,13 @@ async fn process(
debug!("Spawning storage task");
let manager2 = manager.clone();
let name = name.clone();
let span = tracing::info_span!(
parent: None,
"Storing variant info",
path = &tracing::field::debug(&updated_path),
name = &tracing::field::display(&name),
);
span.follows_from(Span::current());
actix_rt::spawn(
async move {
if let Err(e) = manager2.store_variant(updated_path, name).await {
@ -504,7 +531,7 @@ async fn process(
return;
}
}
.instrument(Span::current()),
.instrument(span),
);
}
}
@ -528,6 +555,13 @@ async fn process(
Details::from_bytes(bytes.clone()).await?
};
let save_span = tracing::info_span!(
parent: None,
"Saving variant information",
path = tracing::field::debug(&thumbnail_path),
name = tracing::field::display(&name),
);
save_span.follows_from(Span::current());
let details2 = details.clone();
let bytes2 = bytes.clone();
actix_rt::spawn(
@ -547,7 +581,7 @@ async fn process(
tracing::warn!("Error saving variant info: {}", e);
}
}
.instrument(Span::current()),
.instrument(save_span),
);
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>

View file

@ -14,17 +14,20 @@ use tokio::{
sync::oneshot::{channel, Receiver},
};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::instrument;
use tracing::Instrument;
use tracing::Span;
#[derive(Debug)]
struct StatusError;
pub(crate) struct Process {
child: Child,
span: Span,
}
pub(crate) struct ProcessRead<I> {
inner: I,
span: Span,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
handle: JoinHandle<()>,
@ -33,21 +36,28 @@ pub(crate) struct ProcessRead<I> {
struct BytesFreezer<S>(S);
impl Process {
fn new(child: Child) -> Self {
Process { child }
}
#[instrument(name = "Spawning command")]
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
tracing::info!("Spawning");
Self::spawn(Command::new(command).args(args))
}
fn spawn_span(&self) -> Span {
let span = tracing::info_span!(parent: None, "Spawned command writer",);
span.follows_from(self.span.clone());
span
}
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.map(Process::new)
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
let span = tracing::info_span!(
"Spawning Command",
command = &tracing::field::debug(&cmd),
exception.message = &tracing::field::Empty,
exception.details = &tracing::field::Empty,
);
cmd.spawn().map(|child| Process { child, span })
}
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option<impl AsyncRead + Unpin> {
@ -56,30 +66,34 @@ impl Process {
let (tx, rx) = channel();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e);
return;
}
drop(stdin);
let handle = actix_rt::spawn(async move {
if let Err(e) = stdin.write_all_buf(&mut input).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ =
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
Err(e) => {
let _ = tx.send(e);
}
}
});
.instrument(span),
);
Some(Box::pin(ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle,
@ -95,30 +109,34 @@ impl Process {
let (tx, rx) = channel();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
let handle = actix_rt::spawn(async move {
if let Err(e) = tokio::io::copy(&mut input_reader, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ =
tx.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
Err(e) => {
let _ = tx.send(e);
}
}
});
.instrument(span),
);
Some(Box::pin(ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle,
@ -141,20 +159,33 @@ where
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if !self.err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) {
self.err_closed = true;
if let Ok(err) = res {
return Poll::Ready(Err(err));
let span = self.span.clone();
span.in_scope(|| {
if !self.err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) {
self.err_closed = true;
if let Ok(err) = res {
let display = format!("{}", err);
let debug = format!("{:?}", err);
span.record("exception.message", &display.as_str());
span.record("exception.details", &debug.as_str());
return Poll::Ready(Err(err));
}
}
}
}
if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) {
return Poll::Ready(res);
}
if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) {
if let Err(err) = &res {
let display = format!("{}", err);
let debug = format!("{:?}", err);
span.record("exception.message", &display.as_str());
span.record("exception.details", &debug.as_str());
}
return Poll::Ready(res);
}
Poll::Pending
Poll::Pending
})
}
}

View file

@ -58,6 +58,12 @@ impl Drop for UploadManagerSession {
if let Some(alias) = self.alias.take() {
let manager = self.manager.clone();
let cleanup_span = tracing::info_span!(
parent: None,
"Upload cleanup",
alias = &tracing::field::display(&alias),
);
cleanup_span.follows_from(Span::current());
actix_rt::spawn(
async move {
// undo alias -> hash mapping
@ -77,7 +83,7 @@ impl Drop for UploadManagerSession {
let _ = manager.check_delete_files(hash).await;
}
}
.instrument(Span::current()),
.instrument(cleanup_span),
);
}
}
@ -195,6 +201,7 @@ pub(crate) struct Details {
}
impl Details {
#[tracing::instrument("Details from bytes", skip(input))]
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
let details = crate::magick::details_bytes(input).await?;
@ -205,6 +212,7 @@ impl Details {
))
}
#[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))]
pub(crate) async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<std::path::Path>,
@ -515,6 +523,12 @@ impl UploadManager {
// -- DELETE FILES --
let this = self.clone();
let cleanup_span = tracing::info_span!(
parent: None,
"Cleanup",
filename = &tracing::field::display(String::from_utf8_lossy(&filename)),
);
cleanup_span.follows_from(Span::current());
debug!("Spawning cleanup task");
actix_rt::spawn(
async move {
@ -529,7 +543,7 @@ impl UploadManager {
String::from_utf8(filename.to_vec())
);
}
.instrument(Span::current()),
.instrument(cleanup_span),
);
Ok(())