Remove more boxes

This commit is contained in:
Aode (Lion) 2021-10-20 19:28:40 -05:00
parent e7c3e2c96c
commit 877390878b
4 changed files with 274 additions and 207 deletions

120
src/concurrent_processor.rs Normal file
View file

@ -0,0 +1,120 @@
use crate::{
error::{Error, UploadError},
upload_manager::Details,
};
use actix_web::web;
use dashmap::{mapref::entry::Entry, DashMap};
use once_cell::sync::Lazy;
use std::{
future::Future,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot::{Receiver, Sender};
use tracing::Span;
type OutcomeSender = Sender<(Details, web::Bytes)>;
type ProcessMap = DashMap<PathBuf, Vec<OutcomeSender>>;
static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new);
struct CancelToken {
span: Span,
path: PathBuf,
receiver: Option<Receiver<(Details, web::Bytes)>>,
}
pin_project_lite::pin_project! {
pub(super) struct CancelSafeProcessor<F> {
cancel_token: CancelToken,
#[pin]
fut: F,
}
}
impl<F> CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
pub(super) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
let (receiver, span) = match entry {
Entry::Vacant(vacant) => {
vacant.insert(Vec::new());
let span = tracing::info_span!(
"Processing image",
path = &tracing::field::debug(&path),
completed = &tracing::field::Empty,
);
(None, span)
}
Entry::Occupied(mut occupied) => {
let (tx, rx) = tokio::sync::oneshot::channel();
occupied.get_mut().push(tx);
let span = tracing::info_span!(
"Waiting for processed image",
path = &tracing::field::debug(&path),
);
(Some(rx), span)
}
};
CancelSafeProcessor {
cancel_token: CancelToken {
span,
path,
receiver,
},
fut,
}
}
}
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
type Output = Result<(Details, web::Bytes), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
let span = &this.cancel_token.span;
let receiver = &mut this.cancel_token.receiver;
let path = &this.cancel_token.path;
let fut = this.fut;
span.in_scope(|| {
if let Some(ref mut rx) = receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
fut.poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(path);
res.map(|tup| {
if let Some((_, vec)) = opt {
for sender in vec {
let _ = sender.send(tup.clone());
}
}
tup
})
})
}
})
}
}
impl Drop for CancelToken {
fn drop(&mut self) {
if self.receiver.is_none() {
let completed = PROCESS_MAP.remove(&self.path).is_none();
self.span.record("completed", &completed);
}
}
}

54
src/init_tracing.rs Normal file
View file

@ -0,0 +1,54 @@
use opentelemetry::{
sdk::{propagation::TraceContextPropagator, Resource},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use tracing::subscriber::set_global_default;
use tracing_error::ErrorLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry};
use url::Url;
pub(super) fn init_tracing(
servic_name: &'static str,
opentelemetry_url: Option<&Url>,
) -> anyhow::Result<()> {
LogTracer::init()?;
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.pretty();
let subscriber = Registry::default()
.with(env_filter)
.with(format_layer)
.with(ErrorLayer::default());
if let Some(url) = opentelemetry_url {
let tracer =
opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new("service.name", servic_name)]),
))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(url.as_str()),
)
.install_batch(opentelemetry::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = subscriber.with(otel_layer);
set_global_default(subscriber)?;
} else {
set_global_default(subscriber)?;
}
Ok(())
}

View file

@ -5,59 +5,48 @@ use actix_web::{
web, App, HttpResponse, HttpResponseBuilder, HttpServer,
};
use awc::Client;
use dashmap::{mapref::entry::Entry, DashMap};
use futures_util::{stream::once, Stream};
use once_cell::sync::Lazy;
use opentelemetry::{
sdk::{propagation::TraceContextPropagator, Resource},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use std::{
collections::HashSet,
future::{ready, Future},
future::ready,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use structopt::StructOpt;
use tokio::{
io::AsyncReadExt,
sync::{
oneshot::{Receiver, Sender},
Semaphore,
},
};
use tracing::{debug, error, info, instrument, subscriber::set_global_default, Span};
use tokio::{io::AsyncReadExt, sync::Semaphore};
use tracing::{debug, error, info, instrument, Span};
use tracing_actix_web::TracingLogger;
use tracing_awc::Propagate;
use tracing_error::ErrorLayer;
use tracing_futures::Instrument;
use tracing_log::LogTracer;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry};
use uuid::Uuid;
mod concurrent_processor;
mod config;
mod either;
mod error;
mod exiftool;
mod ffmpeg;
mod file;
mod init_tracing;
mod magick;
mod middleware;
mod migrate;
mod process;
mod processor;
mod range;
mod process;
mod upload_manager;
mod validate;
use self::{
concurrent_processor::CancelSafeProcessor,
config::{Config, Format},
either::Either,
error::{Error, UploadError},
file::CrateError,
init_tracing::init_tracing,
middleware::{Deadline, Internal},
upload_manager::{Details, UploadManager, UploadManagerSession},
validate::{image_webp, video_mp4},
@ -78,109 +67,6 @@ static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
static CONFIG: Lazy<Config> = Lazy::new(Config::from_args);
static PROCESS_SEMAPHORE: Lazy<Semaphore> =
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)));
static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new);
type OutcomeSender = Sender<(Details, web::Bytes)>;
type ProcessMap = DashMap<PathBuf, Vec<OutcomeSender>>;
struct CancelToken {
span: Span,
path: PathBuf,
receiver: Option<Receiver<(Details, web::Bytes)>>,
}
pin_project_lite::pin_project! {
struct CancelSafeProcessor<F> {
cancel_token: CancelToken,
#[pin]
fut: F,
}
}
impl<F> CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
pub(crate) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
let (receiver, span) = match entry {
Entry::Vacant(vacant) => {
vacant.insert(Vec::new());
let span = tracing::info_span!(
"Processing image",
path = &tracing::field::debug(&path),
completed = &tracing::field::Empty,
);
(None, span)
}
Entry::Occupied(mut occupied) => {
let (tx, rx) = tokio::sync::oneshot::channel();
occupied.get_mut().push(tx);
let span = tracing::info_span!(
"Waiting for processed image",
path = &tracing::field::debug(&path),
);
(Some(rx), span)
}
};
CancelSafeProcessor {
cancel_token: CancelToken {
span,
path,
receiver,
},
fut,
}
}
}
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
type Output = Result<(Details, web::Bytes), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
let span = &this.cancel_token.span;
let receiver = &mut this.cancel_token.receiver;
let path = &this.cancel_token.path;
let fut = this.fut;
span.in_scope(|| {
if let Some(ref mut rx) = receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
fut.poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(path);
res.map(|tup| {
if let Some((_, vec)) = opt {
for sender in vec {
let _ = sender.send(tup.clone());
}
}
tup
})
})
}
})
}
}
impl Drop for CancelToken {
fn drop(&mut self) {
if self.receiver.is_none() {
let completed = PROCESS_MAP.remove(&self.path).is_none();
self.span.record("completed", &completed);
}
}
}
// try moving a file
#[instrument(name = "Moving file")]
@ -545,8 +431,8 @@ async fn process(
let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?;
// If the thumbnail doesn't exist, we need to create it
let thumbnail_exists = if let Some(real_path) = &real_path_opt {
if let Err(e) = tokio::fs::metadata(real_path)
let real_path_opt = if let Some(real_path) = real_path_opt {
if let Err(e) = tokio::fs::metadata(&real_path)
.instrument(tracing::info_span!("Get thumbnail metadata"))
.await
{
@ -554,32 +440,30 @@ async fn process(
error!("Error looking up processed image, {}", e);
return Err(e.into());
}
false
None
} else {
true
Some(real_path)
}
} else {
false
None
};
if thumbnail_exists {
if let Some(real_path) = real_path_opt {
let details_opt = manager
.variant_details(real_path.clone(), name.clone())
if let Some(real_path) = real_path_opt {
let details_opt = manager
.variant_details(real_path.clone(), name.clone())
.await?;
let details = if let Some(details) = details_opt {
details
} else {
let details = Details::from_path(real_path.clone()).await?;
manager
.store_variant_details(real_path.clone(), name, &details)
.await?;
details
};
let details = if let Some(details) = details_opt {
details
} else {
let details = Details::from_path(real_path.clone()).await?;
manager
.store_variant_details(real_path.clone(), name, &details)
.await?;
details
};
return ranged_file_resp(real_path, range, details).await;
}
return ranged_file_resp(real_path, range, details).await;
}
let mut original_path = manager.path_from_filename(name.clone()).await?;
@ -898,47 +782,11 @@ async fn filename_by_alias(
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
LogTracer::init()?;
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.pretty();
let subscriber = Registry::default()
.with(env_filter)
.with(format_layer)
.with(ErrorLayer::default());
if let Some(url) = CONFIG.opentelemetry_url() {
let tracer =
opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new("service.name", "pict-rs")]),
))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(url.as_str()),
)
.install_batch(opentelemetry::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = subscriber.with(otel_layer);
set_global_default(subscriber)?;
} else {
let subscriber = subscriber.with(tracing_subscriber::fmt::layer());
set_global_default(subscriber)?;
};
async fn main() -> anyhow::Result<()> {
let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?;
init_tracing("pict-rs", CONFIG.opentelemetry_url())?;
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it

View file

@ -4,7 +4,6 @@ use actix_web::{
http::StatusCode,
HttpResponse, ResponseError,
};
use futures_util::future::LocalBoxFuture;
use std::{
future::{ready, Future, Ready},
pin::Pin,
@ -19,19 +18,47 @@ pub(crate) struct DeadlineMiddleware<S> {
#[derive(Debug)]
struct DeadlineExceeded;
enum DeadlineFutureInner<F> {
Timed(Pin<Box<Timeout<F>>>),
Untimed(Pin<Box<F>>),
pin_project_lite::pin_project! {
pub(crate) struct DeadlineFuture<F> {
#[pin]
inner: DeadlineFutureInner<F>,
}
}
pub(crate) struct DeadlineFuture<F> {
inner: DeadlineFutureInner<F>,
pin_project_lite::pin_project! {
#[project = DeadlineFutureInnerProj]
#[project_replace = DeadlineFutureInnerProjReplace]
enum DeadlineFutureInner<F> {
Timed {
#[pin]
timeout: Timeout<F>,
},
Untimed {
#[pin]
future: F,
},
}
}
pub(crate) struct Internal(pub(crate) Option<String>);
pub(crate) struct InternalMiddleware<S>(Option<String>, S);
#[derive(Clone, Debug, thiserror::Error)]
#[error("Invalid API Key")]
struct ApiError;
pub(crate) struct ApiError;
pin_project_lite::pin_project! {
#[project = InternalFutureProj]
#[project_replace = InternalFutureProjReplace]
pub(crate) enum InternalFuture<F> {
Internal {
#[pin]
future: F,
},
Error {
error: Option<ApiError>,
},
}
}
impl ResponseError for ApiError {
fn status_code(&self) -> StatusCode {
@ -129,13 +156,13 @@ impl<F> DeadlineFuture<F>
where
F: Future,
{
fn new(inner: F, timeout: Option<std::time::Duration>) -> Self {
fn new(future: F, timeout: Option<std::time::Duration>) -> Self {
DeadlineFuture {
inner: match timeout {
Some(duration) => {
DeadlineFutureInner::Timed(Box::pin(actix_rt::time::timeout(duration, inner)))
}
None => DeadlineFutureInner::Untimed(Box::pin(inner)),
Some(duration) => DeadlineFutureInner::Timed {
timeout: actix_rt::time::timeout(duration, future),
},
None => DeadlineFutureInner::Untimed { future },
},
}
}
@ -148,15 +175,15 @@ where
{
type Output = Result<R, actix_web::Error>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner {
DeadlineFutureInner::Timed(ref mut fut) => {
Pin::new(fut).poll(cx).map(|res| match res {
Ok(res) => res.map_err(actix_web::Error::from),
Err(_) => Err(DeadlineExceeded.into()),
})
}
DeadlineFutureInner::Untimed(ref mut fut) => Pin::new(fut)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
match this.inner.project() {
DeadlineFutureInnerProj::Timed { timeout } => timeout.poll(cx).map(|res| match res {
Ok(res) => res.map_err(actix_web::Error::from),
Err(_) => Err(DeadlineExceeded.into()),
}),
DeadlineFutureInnerProj::Untimed { future } => future
.poll(cx)
.map(|res| res.map_err(actix_web::Error::from)),
}
@ -186,7 +213,7 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
type Future = InternalFuture<S::Future>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.1.poll_ready(cx)
@ -195,11 +222,29 @@ where
fn call(&self, req: ServiceRequest) -> Self::Future {
if let Some(value) = req.headers().get("x-api-token") {
if value.to_str().is_ok() && value.to_str().ok() == self.0.as_deref() {
let fut = self.1.call(req);
return Box::pin(async move { fut.await });
return InternalFuture::Internal {
future: self.1.call(req),
};
}
}
Box::pin(async move { Err(ApiError.into()) })
InternalFuture::Error {
error: Some(ApiError),
}
}
}
impl<F, T, E> Future for InternalFuture<F>
where
F: Future<Output = Result<T, E>>,
E: From<ApiError>,
{
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
InternalFutureProj::Internal { future } => future.poll(cx),
InternalFutureProj::Error { error } => Poll::Ready(Err(error.take().unwrap().into())),
}
}
}