From b94ba5fcfc2bab079244717cc18b019ad8057609 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 22 Dec 2023 13:12:19 -0600 Subject: [PATCH] Inline process background future, clean tracing a bit --- src/generate.rs | 2 +- src/ingest.rs | 2 +- src/lib.rs | 2 +- src/process.rs | 143 +++++++++++++++++++------------------- src/queue/cleanup.rs | 4 +- src/queue/process.rs | 36 ++++++---- src/repo/postgres.rs | 9 +-- src/store/object_store.rs | 4 +- src/stream.rs | 4 +- src/sync.rs | 36 +++++++++- 10 files changed, 142 insertions(+), 100 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index a7137d8..14d54ab 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -47,7 +47,7 @@ impl Drop for MetricsGuard { } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(repo, store, hash, process_map, config))] +#[tracing::instrument(skip(tmp_dir, repo, store, hash, process_map, config))] pub(crate) async fn generate( tmp_dir: &TmpDir, repo: &ArcRepo, diff --git a/src/ingest.rs b/src/ingest.rs index ffdad03..670447e 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -144,7 +144,7 @@ where Ok((input_type, identifier, details, state)) } -#[tracing::instrument(skip(repo, store, client, stream, config))] +#[tracing::instrument(skip(tmp_dir, repo, store, client, stream, config))] pub(crate) async fn ingest( tmp_dir: &TmpDir, repo: &ArcRepo, diff --git a/src/lib.rs b/src/lib.rs index b5be2ab..a78c720 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -866,7 +866,7 @@ async fn not_found_hash(repo: &ArcRepo) -> Result, Error> #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "Serving processed image", - skip(repo, store, client, config, process_map) + skip(tmp_dir, repo, store, client, config, process_map) )] async fn process( range: Option>, diff --git a/src/process.rs b/src/process.rs index ab84d41..3a1c32e 100644 --- a/src/process.rs +++ b/src/process.rs @@ -14,21 +14,25 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncWriteExt, ReadBuf}, process::{Child, ChildStdin, ChildStdout, Command}, - task::JoinHandle, }; use tracing::{Instrument, Span}; +use uuid::Uuid; -use crate::{error_code::ErrorCode, future::WithTimeout}; +use crate::{ + error_code::ErrorCode, + future::{LocalBoxFuture, WithTimeout}, + sync::DropHandle, +}; struct MetricsGuard { start: Instant, armed: bool, - command: String, + command: Arc, } impl MetricsGuard { - fn guard(command: String) -> Self { - metrics::increment_counter!("pict-rs.process.start", "command" => command.clone()); + fn guard(command: Arc) -> Self { + metrics::increment_counter!("pict-rs.process.start", "command" => command.to_string()); Self { start: Instant::now(), @@ -47,11 +51,11 @@ impl Drop for MetricsGuard { metrics::histogram!( "pict-rs.process.duration", self.start.elapsed().as_secs_f64(), - "command" => self.command.clone(), + "command" => self.command.to_string(), "completed" => (!self.armed).to_string(), ); - metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.clone()); + metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.to_string()); } } @@ -59,10 +63,11 @@ impl Drop for MetricsGuard { struct StatusError(ExitStatus); pub(crate) struct Process { - command: String, + command: Arc, child: Child, guard: MetricsGuard, timeout: Duration, + id: Uuid, } impl std::fmt::Debug for Process { @@ -71,10 +76,6 @@ impl std::fmt::Debug for Process { } } -struct DropHandle { - inner: JoinHandle>, -} - struct ProcessReadState { flags: AtomicU8, parent: Mutex>, @@ -86,29 +87,31 @@ struct ProcessReadWaker { } pub(crate) struct ProcessRead { - inner: ChildStdout, - handle: DropHandle, + stdout: ChildStdout, + handle: LocalBoxFuture<'static, std::io::Result<()>>, closed: bool, state: Arc, - span: Span, + span: Option, + command: Arc, + id: Uuid, } #[derive(Debug, thiserror::Error)] pub(crate) enum ProcessError { #[error("Required command {0} not found, make sure it exists in pict-rs' $PATH")] - NotFound(String), + NotFound(Arc), #[error("Cannot run command {0} due to invalid permissions on binary, make sure the pict-rs user has permission to run it")] - PermissionDenied(String), + PermissionDenied(Arc), #[error("Reached process spawn limit")] LimitReached, #[error("{0} timed out")] - Timeout(String), + Timeout(Arc), #[error("{0} Failed with {1}")] - Status(String, ExitStatus), + Status(Arc, ExitStatus), #[error("Unknown process error")] Other(#[source] std::io::Error), @@ -136,10 +139,14 @@ impl Process { where T: AsRef, { + let command: Arc = Arc::from(String::from(command)); + let res = tracing::trace_span!(parent: None, "Create command", %command).in_scope(|| { Self::spawn( - command, - Command::new(command).args(args).envs(envs.iter().copied()), + command.clone(), + Command::new(command.as_ref()) + .args(args) + .envs(envs.iter().copied()), timeout, ) }); @@ -147,9 +154,9 @@ impl Process { match res { Ok(this) => Ok(this), Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command.to_string())), + std::io::ErrorKind::NotFound => Err(ProcessError::NotFound(command)), std::io::ErrorKind::PermissionDenied => { - Err(ProcessError::PermissionDenied(command.to_string())) + Err(ProcessError::PermissionDenied(command)) } std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached), _ => Err(ProcessError::Other(e)), @@ -157,9 +164,9 @@ impl Process { } } - fn spawn(command: &str, cmd: &mut Command, timeout: u64) -> std::io::Result { + fn spawn(command: Arc, cmd: &mut Command, timeout: u64) -> std::io::Result { tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| { - let guard = MetricsGuard::guard(command.into()); + let guard = MetricsGuard::guard(command.clone()); let cmd = cmd .stdin(Stdio::piped()) @@ -168,20 +175,22 @@ impl Process { cmd.spawn().map(|child| Process { child, - command: String::from(command), + command, guard, timeout: Duration::from_secs(timeout), + id: Uuid::now_v7(), }) }) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self), fields(command = %self.command, id = %self.id))] pub(crate) async fn wait(self) -> Result<(), ProcessError> { let Process { command, mut child, guard, timeout, + id: _, } = self; let res = child.wait().with_timeout(timeout).await; @@ -226,52 +235,44 @@ impl Process { mut child, guard, timeout, + id, } = self; let stdin = child.stdin.take().expect("stdin exists"); let stdout = child.stdout.take().expect("stdout exists"); - let background_span = - tracing::info_span!(parent: None, "Background process task", %command); - background_span.follows_from(Span::current()); + let handle = Box::pin(async move { + let child_fut = async { + (f)(stdin).await?; - let span = tracing::info_span!(parent: None, "Foreground process task", %command); - span.follows_from(Span::current()); + child.wait().await + }; - let handle = crate::sync::spawn( - "await-process", - async move { - let child_fut = async { - (f)(stdin).await?; + let error = match child_fut.with_timeout(timeout).await { + Ok(Ok(status)) if status.success() => { + guard.disarm(); + return Ok(()); + } + Ok(Ok(status)) => { + std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) + } + Ok(Err(e)) => e, + Err(_) => std::io::ErrorKind::TimedOut.into(), + }; - child.wait().await - }; + child.kill().await?; - let error = match child_fut.with_timeout(timeout).await { - Ok(Ok(status)) if status.success() => { - guard.disarm(); - return Ok(()); - } - Ok(Ok(status)) => { - std::io::Error::new(std::io::ErrorKind::Other, StatusError(status)) - } - Ok(Err(e)) => e, - Err(_) => std::io::ErrorKind::TimedOut.into(), - }; - - child.kill().await?; - - Err(error) - } - .instrument(background_span), - ); + Err(error) + }); ProcessRead { - inner: stdout, - handle: DropHandle { inner: handle }, + stdout, + handle, closed: false, state: ProcessReadState::new_woken(), - span, + span: None, + command, + id, } } } @@ -345,14 +346,19 @@ impl AsyncRead for ProcessRead { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let span = self.span.clone(); + let command = self.command.clone(); + let id = self.id; + let span = self + .span + .get_or_insert_with(|| tracing::info_span!("process task", %command, %id)) + .clone(); let guard = span.enter(); let value = loop { // always poll for bytes when poll_read is called let before_size = buf.filled().len(); - if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { + if let Poll::Ready(res) = Pin::new(&mut self.stdout).poll_read(cx, buf) { if let Err(e) = res { self.closed = true; @@ -368,11 +374,10 @@ impl AsyncRead for ProcessRead { // only poll handle if we've been explicitly woken let mut handle_cx = Context::from_waker(&waker); - if let Poll::Ready(res) = Pin::new(&mut self.handle.inner).poll(&mut handle_cx) { + if let Poll::Ready(res) = Pin::new(&mut self.handle).poll(&mut handle_cx) { let error = match res { - Ok(Ok(())) => continue, - Ok(Err(e)) => e, - Err(e) => std::io::Error::new(std::io::ErrorKind::Other, e), + Ok(()) => continue, + Err(e) => e, }; self.closed = true; @@ -398,12 +403,6 @@ impl AsyncRead for ProcessRead { } } -impl Drop for DropHandle { - fn drop(&mut self) { - self.inner.abort(); - } -} - impl Wake for ProcessReadWaker { fn wake(self: Arc) { match Arc::try_unwrap(self) { diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 85e7cac..6c497bc 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -236,7 +236,7 @@ where let span = tracing::info_span!(parent: current_span, "error-boundary"); - let res = crate::sync::spawn( + let res = crate::sync::abort_on_drop(crate::sync::spawn( "prune-missing", async move { let mut count = count; @@ -261,7 +261,7 @@ where Ok(count) as Result } .instrument(span), - ) + )) .await; match res { diff --git a/src/queue/process.rs b/src/queue/process.rs index 399099e..b5163f9 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,5 +1,6 @@ use reqwest_middleware::ClientWithMiddleware; use time::Instant; +use tracing::{Instrument, Span}; use crate::{ concurrent_processor::ProcessMap, @@ -135,23 +136,30 @@ where let repo = repo.clone(); let client = client.clone(); + let current_span = Span::current(); + let span = tracing::info_span!(parent: current_span, "error_boundary"); + let config = config.clone(); - let error_boundary = crate::sync::spawn("ingest-media", async move { - let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); + let error_boundary = crate::sync::abort_on_drop(crate::sync::spawn( + "ingest-media", + async move { + let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); - let session = crate::ingest::ingest( - &tmp_dir, - &repo, - &store2, - &client, - stream, - declared_alias, - &config, - ) - .await?; + let session = crate::ingest::ingest( + &tmp_dir, + &repo, + &store2, + &client, + stream, + declared_alias, + &config, + ) + .await?; - Ok(session) as Result - }) + Ok(session) as Result + } + .instrument(span), + )) .await; store.remove(&unprocessed_identifier).await?; diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 2250ceb..cd2fb60 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -33,6 +33,7 @@ use crate::{ future::{WithMetrics, WithTimeout}, serde_str::Serde, stream::LocalBoxStream, + sync::DropHandle, }; use self::job_status::JobStatus; @@ -49,7 +50,7 @@ use super::{ pub(crate) struct PostgresRepo { inner: Arc, #[allow(dead_code)] - notifications: Arc>, + notifications: Arc>, } struct Inner { @@ -151,7 +152,7 @@ impl PostgresRepo { .await .map_err(ConnectPostgresError::ConnectForMigration)?; - let handle = crate::sync::spawn("postgres-migrations", conn); + let handle = crate::sync::abort_on_drop(crate::sync::spawn("postgres-migrations", conn)); embedded::migrations::runner() .run_async(&mut client) @@ -199,10 +200,10 @@ impl PostgresRepo { upload_notifications: DashMap::new(), }); - let handle = crate::sync::spawn( + let handle = crate::sync::abort_on_drop(crate::sync::spawn( "postgres-delegate-notifications", delegate_notifications(rx, inner.clone(), parallelism * 8), - ); + )); let notifications = Arc::new(handle); diff --git a/src/store/object_store.rs b/src/store/object_store.rs index dd22e27..402051f 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -298,7 +298,7 @@ impl Store for ObjectStore { let object_id2 = object_id.clone(); let upload_id2 = upload_id.to_string(); - let handle = crate::sync::spawn( + let handle = crate::sync::abort_on_drop(crate::sync::spawn( "upload-multipart-part", async move { let response = this @@ -333,7 +333,7 @@ impl Store for ObjectStore { Ok(etag) as Result } .instrument(tracing::Span::current()), - ); + )); futures.push(handle); } diff --git a/src/stream.rs b/src/stream.rs index 3cf6641..e5aa7d6 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -30,7 +30,7 @@ where { let (tx, rx) = crate::sync::channel(1); - let handle = crate::sync::spawn("send-stream", async move { + let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); @@ -39,7 +39,7 @@ where break; } } - }); + })); streem::from_fn(|yiedler| async move { let mut stream = rx.into_stream().into_streamer(); diff --git a/src/sync.rs b/src/sync.rs index e79a13d..f40b741 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,40 @@ use std::sync::Arc; -use tokio::sync::{Notify, Semaphore}; +use tokio::{ + sync::{Notify, Semaphore}, + task::JoinHandle, +}; + +pub(crate) struct DropHandle { + handle: JoinHandle, +} + +pub(crate) fn abort_on_drop(handle: JoinHandle) -> DropHandle { + DropHandle { handle } +} + +impl DropHandle { + pub(crate) fn abort(&self) { + self.handle.abort(); + } +} + +impl Drop for DropHandle { + fn drop(&mut self) { + self.handle.abort(); + } +} + +impl std::future::Future for DropHandle { + type Output = as std::future::Future>::Output; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + std::pin::Pin::new(&mut self.handle).poll(cx) + } +} #[track_caller] pub(crate) fn channel(bound: usize) -> (flume::Sender, flume::Receiver) {