From 877390878ba0d4e8efdf7663ebf057e3994039e7 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Wed, 20 Oct 2021 19:28:40 -0500 Subject: [PATCH] Remove more boxes --- src/concurrent_processor.rs | 120 ++++++++++++++++++++ src/init_tracing.rs | 54 +++++++++ src/main.rs | 212 +++++------------------------------- src/middleware.rs | 95 +++++++++++----- 4 files changed, 274 insertions(+), 207 deletions(-) create mode 100644 src/concurrent_processor.rs create mode 100644 src/init_tracing.rs diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs new file mode 100644 index 0000000..abaeaca --- /dev/null +++ b/src/concurrent_processor.rs @@ -0,0 +1,120 @@ +use crate::{ + error::{Error, UploadError}, + upload_manager::Details, +}; +use actix_web::web; +use dashmap::{mapref::entry::Entry, DashMap}; +use once_cell::sync::Lazy; +use std::{ + future::Future, + path::PathBuf, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::oneshot::{Receiver, Sender}; +use tracing::Span; + +type OutcomeSender = Sender<(Details, web::Bytes)>; + +type ProcessMap = DashMap>; + +static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); + +struct CancelToken { + span: Span, + path: PathBuf, + receiver: Option>, +} + +pin_project_lite::pin_project! { + pub(super) struct CancelSafeProcessor { + cancel_token: CancelToken, + + #[pin] + fut: F, + } +} + +impl CancelSafeProcessor +where + F: Future>, +{ + pub(super) fn new(path: PathBuf, fut: F) -> Self { + let entry = PROCESS_MAP.entry(path.clone()); + + let (receiver, span) = match entry { + Entry::Vacant(vacant) => { + vacant.insert(Vec::new()); + let span = tracing::info_span!( + "Processing image", + path = &tracing::field::debug(&path), + completed = &tracing::field::Empty, + ); + (None, span) + } + Entry::Occupied(mut occupied) => { + let (tx, rx) = tokio::sync::oneshot::channel(); + occupied.get_mut().push(tx); + let span = tracing::info_span!( + "Waiting for processed image", + path = &tracing::field::debug(&path), + ); + (Some(rx), span) + } + }; + + CancelSafeProcessor { + cancel_token: CancelToken { + span, + path, + receiver, + }, + fut, + } + } +} + +impl Future for CancelSafeProcessor +where + F: Future>, +{ + type Output = Result<(Details, web::Bytes), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().project(); + + let span = &this.cancel_token.span; + let receiver = &mut this.cancel_token.receiver; + let path = &this.cancel_token.path; + let fut = this.fut; + + span.in_scope(|| { + if let Some(ref mut rx) = receiver { + Pin::new(rx) + .poll(cx) + .map(|res| res.map_err(|_| UploadError::Canceled.into())) + } else { + fut.poll(cx).map(|res| { + let opt = PROCESS_MAP.remove(path); + res.map(|tup| { + if let Some((_, vec)) = opt { + for sender in vec { + let _ = sender.send(tup.clone()); + } + } + tup + }) + }) + } + }) + } +} + +impl Drop for CancelToken { + fn drop(&mut self) { + if self.receiver.is_none() { + let completed = PROCESS_MAP.remove(&self.path).is_none(); + self.span.record("completed", &completed); + } + } +} diff --git a/src/init_tracing.rs b/src/init_tracing.rs new file mode 100644 index 0000000..e8346ed --- /dev/null +++ b/src/init_tracing.rs @@ -0,0 +1,54 @@ +use opentelemetry::{ + sdk::{propagation::TraceContextPropagator, Resource}, + KeyValue, +}; +use opentelemetry_otlp::WithExportConfig; +use tracing::subscriber::set_global_default; +use tracing_error::ErrorLayer; +use tracing_log::LogTracer; +use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry}; +use url::Url; + +pub(super) fn init_tracing( + servic_name: &'static str, + opentelemetry_url: Option<&Url>, +) -> anyhow::Result<()> { + LogTracer::init()?; + + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + let format_layer = tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .pretty(); + + let subscriber = Registry::default() + .with(env_filter) + .with(format_layer) + .with(ErrorLayer::default()); + + if let Some(url) = opentelemetry_url { + let tracer = + opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config(opentelemetry::sdk::trace::config().with_resource( + Resource::new(vec![KeyValue::new("service.name", servic_name)]), + )) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(url.as_str()), + ) + .install_batch(opentelemetry::runtime::Tokio)?; + + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + let subscriber = subscriber.with(otel_layer); + + set_global_default(subscriber)?; + } else { + set_global_default(subscriber)?; + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 52b41e6..aee6b99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,59 +5,48 @@ use actix_web::{ web, App, HttpResponse, HttpResponseBuilder, HttpServer, }; use awc::Client; -use dashmap::{mapref::entry::Entry, DashMap}; use futures_util::{stream::once, Stream}; use once_cell::sync::Lazy; -use opentelemetry::{ - sdk::{propagation::TraceContextPropagator, Resource}, - KeyValue, -}; -use opentelemetry_otlp::WithExportConfig; use std::{ collections::HashSet, - future::{ready, Future}, + future::ready, path::PathBuf, pin::Pin, task::{Context, Poll}, time::SystemTime, }; use structopt::StructOpt; -use tokio::{ - io::AsyncReadExt, - sync::{ - oneshot::{Receiver, Sender}, - Semaphore, - }, -}; -use tracing::{debug, error, info, instrument, subscriber::set_global_default, Span}; +use tokio::{io::AsyncReadExt, sync::Semaphore}; +use tracing::{debug, error, info, instrument, Span}; use tracing_actix_web::TracingLogger; use tracing_awc::Propagate; -use tracing_error::ErrorLayer; use tracing_futures::Instrument; -use tracing_log::LogTracer; -use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry}; use uuid::Uuid; +mod concurrent_processor; mod config; mod either; mod error; mod exiftool; mod ffmpeg; mod file; +mod init_tracing; mod magick; mod middleware; mod migrate; +mod process; mod processor; mod range; -mod process; mod upload_manager; mod validate; use self::{ + concurrent_processor::CancelSafeProcessor, config::{Config, Format}, either::Either, error::{Error, UploadError}, file::CrateError, + init_tracing::init_tracing, middleware::{Deadline, Internal}, upload_manager::{Details, UploadManager, UploadManagerSession}, validate::{image_webp, video_mp4}, @@ -78,109 +67,6 @@ static TMP_DIR: Lazy = Lazy::new(|| { static CONFIG: Lazy = Lazy::new(Config::from_args); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); -static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); - -type OutcomeSender = Sender<(Details, web::Bytes)>; -type ProcessMap = DashMap>; - -struct CancelToken { - span: Span, - path: PathBuf, - receiver: Option>, -} - -pin_project_lite::pin_project! { - struct CancelSafeProcessor { - cancel_token: CancelToken, - - #[pin] - fut: F, - } -} - -impl CancelSafeProcessor -where - F: Future>, -{ - pub(crate) fn new(path: PathBuf, fut: F) -> Self { - let entry = PROCESS_MAP.entry(path.clone()); - - let (receiver, span) = match entry { - Entry::Vacant(vacant) => { - vacant.insert(Vec::new()); - let span = tracing::info_span!( - "Processing image", - path = &tracing::field::debug(&path), - completed = &tracing::field::Empty, - ); - (None, span) - } - Entry::Occupied(mut occupied) => { - let (tx, rx) = tokio::sync::oneshot::channel(); - occupied.get_mut().push(tx); - let span = tracing::info_span!( - "Waiting for processed image", - path = &tracing::field::debug(&path), - ); - (Some(rx), span) - } - }; - - CancelSafeProcessor { - cancel_token: CancelToken { - span, - path, - receiver, - }, - fut, - } - } -} - -impl Future for CancelSafeProcessor -where - F: Future>, -{ - type Output = Result<(Details, web::Bytes), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - let span = &this.cancel_token.span; - let receiver = &mut this.cancel_token.receiver; - let path = &this.cancel_token.path; - let fut = this.fut; - - span.in_scope(|| { - if let Some(ref mut rx) = receiver { - Pin::new(rx) - .poll(cx) - .map(|res| res.map_err(|_| UploadError::Canceled.into())) - } else { - fut.poll(cx).map(|res| { - let opt = PROCESS_MAP.remove(path); - res.map(|tup| { - if let Some((_, vec)) = opt { - for sender in vec { - let _ = sender.send(tup.clone()); - } - } - tup - }) - }) - } - }) - } -} - -impl Drop for CancelToken { - fn drop(&mut self) { - if self.receiver.is_none() { - let completed = PROCESS_MAP.remove(&self.path).is_none(); - self.span.record("completed", &completed); - } - } -} // try moving a file #[instrument(name = "Moving file")] @@ -545,8 +431,8 @@ async fn process( let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?; // If the thumbnail doesn't exist, we need to create it - let thumbnail_exists = if let Some(real_path) = &real_path_opt { - if let Err(e) = tokio::fs::metadata(real_path) + let real_path_opt = if let Some(real_path) = real_path_opt { + if let Err(e) = tokio::fs::metadata(&real_path) .instrument(tracing::info_span!("Get thumbnail metadata")) .await { @@ -554,32 +440,30 @@ async fn process( error!("Error looking up processed image, {}", e); return Err(e.into()); } - false + None } else { - true + Some(real_path) } } else { - false + None }; - if thumbnail_exists { - if let Some(real_path) = real_path_opt { - let details_opt = manager - .variant_details(real_path.clone(), name.clone()) + if let Some(real_path) = real_path_opt { + let details_opt = manager + .variant_details(real_path.clone(), name.clone()) + .await?; + + let details = if let Some(details) = details_opt { + details + } else { + let details = Details::from_path(real_path.clone()).await?; + manager + .store_variant_details(real_path.clone(), name, &details) .await?; + details + }; - let details = if let Some(details) = details_opt { - details - } else { - let details = Details::from_path(real_path.clone()).await?; - manager - .store_variant_details(real_path.clone(), name, &details) - .await?; - details - }; - - return ranged_file_resp(real_path, range, details).await; - } + return ranged_file_resp(real_path, range, details).await; } let mut original_path = manager.path_from_filename(name.clone()).await?; @@ -898,47 +782,11 @@ async fn filename_by_alias( } #[actix_rt::main] -async fn main() -> Result<(), anyhow::Error> { - LogTracer::init()?; - - opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); - let format_layer = tracing_subscriber::fmt::layer() - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) - .pretty(); - - let subscriber = Registry::default() - .with(env_filter) - .with(format_layer) - .with(ErrorLayer::default()); - - if let Some(url) = CONFIG.opentelemetry_url() { - let tracer = - opentelemetry_otlp::new_pipeline() - .tracing() - .with_trace_config(opentelemetry::sdk::trace::config().with_resource( - Resource::new(vec![KeyValue::new("service.name", "pict-rs")]), - )) - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(url.as_str()), - ) - .install_batch(opentelemetry::runtime::Tokio)?; - - let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - - let subscriber = subscriber.with(otel_layer); - - set_global_default(subscriber)?; - } else { - let subscriber = subscriber.with(tracing_subscriber::fmt::layer()); - set_global_default(subscriber)?; - }; - +async fn main() -> anyhow::Result<()> { let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?; + init_tracing("pict-rs", CONFIG.opentelemetry_url())?; + // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it diff --git a/src/middleware.rs b/src/middleware.rs index 232e9bd..47dbd28 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -4,7 +4,6 @@ use actix_web::{ http::StatusCode, HttpResponse, ResponseError, }; -use futures_util::future::LocalBoxFuture; use std::{ future::{ready, Future, Ready}, pin::Pin, @@ -19,19 +18,47 @@ pub(crate) struct DeadlineMiddleware { #[derive(Debug)] struct DeadlineExceeded; -enum DeadlineFutureInner { - Timed(Pin>>), - Untimed(Pin>), +pin_project_lite::pin_project! { + pub(crate) struct DeadlineFuture { + #[pin] + inner: DeadlineFutureInner, + } } -pub(crate) struct DeadlineFuture { - inner: DeadlineFutureInner, + +pin_project_lite::pin_project! { + #[project = DeadlineFutureInnerProj] + #[project_replace = DeadlineFutureInnerProjReplace] + enum DeadlineFutureInner { + Timed { + #[pin] + timeout: Timeout, + }, + Untimed { + #[pin] + future: F, + }, + } } pub(crate) struct Internal(pub(crate) Option); pub(crate) struct InternalMiddleware(Option, S); #[derive(Clone, Debug, thiserror::Error)] #[error("Invalid API Key")] -struct ApiError; +pub(crate) struct ApiError; + +pin_project_lite::pin_project! { + #[project = InternalFutureProj] + #[project_replace = InternalFutureProjReplace] + pub(crate) enum InternalFuture { + Internal { + #[pin] + future: F, + }, + Error { + error: Option, + }, + } +} impl ResponseError for ApiError { fn status_code(&self) -> StatusCode { @@ -129,13 +156,13 @@ impl DeadlineFuture where F: Future, { - fn new(inner: F, timeout: Option) -> Self { + fn new(future: F, timeout: Option) -> Self { DeadlineFuture { inner: match timeout { - Some(duration) => { - DeadlineFutureInner::Timed(Box::pin(actix_rt::time::timeout(duration, inner))) - } - None => DeadlineFutureInner::Untimed(Box::pin(inner)), + Some(duration) => DeadlineFutureInner::Timed { + timeout: actix_rt::time::timeout(duration, future), + }, + None => DeadlineFutureInner::Untimed { future }, }, } } @@ -148,15 +175,15 @@ where { type Output = Result; - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.inner { - DeadlineFutureInner::Timed(ref mut fut) => { - Pin::new(fut).poll(cx).map(|res| match res { - Ok(res) => res.map_err(actix_web::Error::from), - Err(_) => Err(DeadlineExceeded.into()), - }) - } - DeadlineFutureInner::Untimed(ref mut fut) => Pin::new(fut) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().project(); + + match this.inner.project() { + DeadlineFutureInnerProj::Timed { timeout } => timeout.poll(cx).map(|res| match res { + Ok(res) => res.map_err(actix_web::Error::from), + Err(_) => Err(DeadlineExceeded.into()), + }), + DeadlineFutureInnerProj::Untimed { future } => future .poll(cx) .map(|res| res.map_err(actix_web::Error::from)), } @@ -186,7 +213,7 @@ where { type Response = S::Response; type Error = S::Error; - type Future = LocalBoxFuture<'static, Result>; + type Future = InternalFuture; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.1.poll_ready(cx) @@ -195,11 +222,29 @@ where fn call(&self, req: ServiceRequest) -> Self::Future { if let Some(value) = req.headers().get("x-api-token") { if value.to_str().is_ok() && value.to_str().ok() == self.0.as_deref() { - let fut = self.1.call(req); - return Box::pin(async move { fut.await }); + return InternalFuture::Internal { + future: self.1.call(req), + }; } } - Box::pin(async move { Err(ApiError.into()) }) + InternalFuture::Error { + error: Some(ApiError), + } + } +} + +impl Future for InternalFuture +where + F: Future>, + E: From, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project() { + InternalFutureProj::Internal { future } => future.poll(cx), + InternalFutureProj::Error { error } => Poll::Ready(Err(error.take().unwrap().into())), + } } }