From 1662f153ce657ed3427c5f05580754f9d02ef570 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Mon, 28 Mar 2022 15:34:36 -0500 Subject: [PATCH] Numerous changes: - Improve error printing (display chain in addition to spantrace) - Fix migration (read main identifier from identifier tree, not filename tree) - Ensure uniqueness for processed images in ConcurrentProcessor (use source identifier in addition to thumbnail path, include extension in thumbnail path) - Update default log levels (make pict-rs quieter) - Add timeout for serving images from object storage (5 seconds) --- docker/object-storage/pict-rs.toml | 51 +++++++++++++----- src/concurrent_processor.rs | 29 ++++++---- src/config/defaults.rs | 4 +- src/config/primitives.rs | 10 ++-- src/error.rs | 37 +++++++------ src/main.rs | 30 +++++------ src/processor.rs | 9 +++- src/repo/old.rs | 4 +- src/store/object_store.rs | 85 +++++++++++++++++++++++++++++- 9 files changed, 195 insertions(+), 64 deletions(-) diff --git a/docker/object-storage/pict-rs.toml b/docker/object-storage/pict-rs.toml index 04fc42a..8056e35 100644 --- a/docker/object-storage/pict-rs.toml +++ b/docker/object-storage/pict-rs.toml @@ -1,17 +1,42 @@ -path = 'data/' -addr = '0.0.0.0:8080' +[server] +address = '0.0.0.0:8080' +[tracing.logging] +format = 'normal' +targets = 'info' -repo = 'sled' -store = 'object_storage' +[tracing.console] +buffer_capacity = 102400 -[sled] -sled_cache_capacity = 67108864 +[tracing.opentelemetry] +service_name = 'pict-rs' +targets = 'info' -[filesystem_storage] -filesystem_storage_path = '/mnt/files' +[old_db] +path = '/mnt' -[object_storage] -object_store_bucket_name = 'pict-rs' -object_store_region = 'http://minio:9000' -object_store_access_key = 'XZEZ5B8Y3UCINU1KCVF6' -object_store_secret_key = 'cWbE5LcCK9YH8j1NvhOZocl+vH+b6T5Zvy3z+BZu' +[media] +max_width = 10000 +max_height = 10000 +max_area = 40000000 +max_file_size = 40 +enable_silent_video = true +filters = [ + 'crop', + 'resize', + 'thumbnail', + 'blur', + 'identity', +] +skip_validate_imports = false + +[repo] +type = 'sled' +path = '/mnt/sled-repo' +cache_capacity = 67108864 + +[store] +type = 'object_storage' +bucket_name = 'pict-rs' +region = 'http://minio:9000' +access_key = 'XZEZ5B8Y3UCINU1KCVF6' +secret_key = 'cWbE5LcCK9YH8j1NvhOZocl+vH+b6T5Zvy3z+BZu' diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 52eb27e..3db0e6b 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -1,6 +1,7 @@ use crate::{ details::Details, error::{Error, UploadError}, + store::Identifier, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; @@ -16,13 +17,15 @@ use tracing::Span; type OutcomeSender = Sender<(Details, web::Bytes)>; -type ProcessMap = DashMap>; +type ProcessMapKey = (Vec, PathBuf); + +type ProcessMap = DashMap>; static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); struct CancelToken { span: Span, - path: PathBuf, + key: ProcessMapKey, receiver: Option>, } @@ -39,14 +42,19 @@ impl CancelSafeProcessor where F: Future>, { - pub(super) fn new(path: PathBuf, fut: F) -> Self { - let entry = PROCESS_MAP.entry(path.clone()); + pub(super) fn new(identifier: I, path: PathBuf, fut: F) -> Result { + let id_bytes = identifier.to_bytes()?; + + let key = (id_bytes, path.clone()); + + let entry = PROCESS_MAP.entry(key.clone()); let (receiver, span) = match entry { Entry::Vacant(vacant) => { vacant.insert(Vec::new()); let span = tracing::info_span!( "Processing image", + identifier = &tracing::field::debug(&identifier), path = &tracing::field::debug(&path), completed = &tracing::field::Empty, ); @@ -57,20 +65,21 @@ where occupied.get_mut().push(tx); let span = tracing::info_span!( "Waiting for processed image", + identifier = &tracing::field::debug(&identifier), path = &tracing::field::debug(&path), ); (Some(rx), span) } }; - CancelSafeProcessor { + Ok(CancelSafeProcessor { cancel_token: CancelToken { span, - path, + key, receiver, }, fut, - } + }) } } @@ -85,7 +94,7 @@ where let span = &this.cancel_token.span; let receiver = &mut this.cancel_token.receiver; - let path = &this.cancel_token.path; + let key = &this.cancel_token.key; let fut = this.fut; span.in_scope(|| { @@ -95,7 +104,7 @@ where .map(|res| res.map_err(|_| UploadError::Canceled.into())) } else { fut.poll(cx).map(|res| { - let opt = PROCESS_MAP.remove(path); + let opt = PROCESS_MAP.remove(key); res.map(|tup| { if let Some((_, vec)) = opt { for sender in vec { @@ -113,7 +122,7 @@ where impl Drop for CancelToken { fn drop(&mut self) { if self.receiver.is_none() { - let completed = PROCESS_MAP.remove(&self.path).is_none(); + let completed = PROCESS_MAP.remove(&self.key).is_none(); self.span.record("completed", &completed); } } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index e45066f..a459e73 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -108,7 +108,9 @@ impl Default for LoggingDefaults { fn default() -> Self { LoggingDefaults { format: LogFormat::Normal, - targets: "info".parse().expect("Valid targets string"), + targets: "warn,tracing_actix_web=info,actix_web=info,actix_server=info" + .parse() + .expect("Valid targets string"), } } } diff --git a/src/config/primitives.rs b/src/config/primitives.rs index fd94797..f48027d 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -182,12 +182,12 @@ impl FromStr for ImageFormat { type Err = String; fn from_str(s: &str) -> Result { - for variant in Self::value_variants() { - if variant.to_possible_value().unwrap().matches(s, false) { - return Ok(*variant); - } + match s.to_lowercase().as_str() { + "jpeg" | "jpg" => Ok(Self::Jpeg), + "png" => Ok(Self::Png), + "webp" => Ok(Self::Webp), + other => Err(format!("Invalid variant: {}", other)), } - Err(format!("Invalid variant: {}", s)) } } diff --git a/src/error.rs b/src/error.rs index bfb336b..34d30db 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,23 +16,32 @@ impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{}", self.kind)?; writeln!(f)?; - let mut count = 0; - let mut source = std::error::Error::source(self); - if source.is_some() { - writeln!(f, "Chain:")?; - } - while let Some(err) = source { - write!(f, "{}. ", count)?; - writeln!(f, "{}", err)?; - count += 1; - source = std::error::Error::source(err); - } + writeln!(f, "Chain:")?; + fmt_chain(f, &self.kind)?; + writeln!(f)?; + writeln!(f, "Spantrace:")?; std::fmt::Display::fmt(&self.context, f) } } +fn fmt_chain( + f: &mut std::fmt::Formatter<'_>, + err: &dyn std::error::Error, +) -> Result { + let count = if let Some(source) = std::error::Error::source(err) { + fmt_chain(f, source)? + } else { + 0 + }; + + write!(f, "\t{}. ", count)?; + writeln!(f, "{}", err)?; + + Ok(count + 1) +} + impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.kind.source() @@ -113,9 +122,6 @@ pub(crate) enum UploadError { #[error("Unable to send request, {0}")] SendRequest(String), - #[error("No filename provided in request")] - MissingFilename, - #[error("Error converting Path to String")] Path, @@ -157,7 +163,8 @@ impl ResponseError for Error { | UploadError::Limit(_) | UploadError::NoFiles | UploadError::Upload(_) => StatusCode::BAD_REQUEST, - UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND, + UploadError::Sled(crate::repo::sled::SledError::Missing) + | UploadError::MissingAlias => StatusCode::NOT_FOUND, UploadError::InvalidToken => StatusCode::FORBIDDEN, UploadError::Range => StatusCode::RANGE_NOT_SATISFIABLE, _ => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/src/main.rs b/src/main.rs index 51b1e87..91f6883 100644 --- a/src/main.rs +++ b/src/main.rs @@ -263,7 +263,7 @@ type ProcessQuery = Vec<(String, String)>; fn prepare_process( query: web::Query, ext: &str, - filters: &Option>, + filters: &HashSet, ) -> Result<(ImageFormat, Alias, PathBuf, Vec), Error> { let (alias, operations) = query @@ -279,25 +279,23 @@ fn prepare_process( }); if alias.is_empty() { - return Err(UploadError::MissingFilename.into()); + return Err(UploadError::MissingAlias.into()); } let alias = Alias::from_existing(&alias); - let operations = if let Some(filters) = filters.as_ref() { - operations - .into_iter() - .filter(|(k, _)| filters.contains(&k.to_lowercase())) - .collect() - } else { - operations - }; + let operations = operations + .into_iter() + .filter(|(k, _)| filters.contains(&k.to_lowercase())) + .collect::>(); let format = ext .parse::() .map_err(|_| UploadError::UnsupportedFormat)?; - let (thumbnail_path, thumbnail_args) = self::processor::build_chain(&operations)?; + let ext = format.to_string(); + + let (thumbnail_path, thumbnail_args) = self::processor::build_chain(&operations, &ext)?; Ok((format, alias, thumbnail_path, thumbnail_args)) } @@ -308,7 +306,7 @@ async fn process_details( ext: web::Path, manager: web::Data, store: web::Data, - filters: web::Data>>, + filters: web::Data>, ) -> Result { let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?; @@ -332,7 +330,7 @@ async fn process( ext: web::Path, manager: web::Data, store: web::Data, - filters: web::Data>>, + filters: web::Data>, ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &filters)?; @@ -361,6 +359,7 @@ async fn process( .await?; let thumbnail_path2 = thumbnail_path.clone(); + let identifier2 = identifier.clone(); let process_fut = async { let thumbnail_path = thumbnail_path2; @@ -368,7 +367,7 @@ async fn process( let mut processed_reader = crate::magick::process_image_store_read( (**store).clone(), - identifier, + identifier2, thumbnail_args, format, )?; @@ -417,7 +416,8 @@ async fn process( Ok((details, bytes)) as Result<(Details, web::Bytes), Error> }; - let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?; + let (details, bytes) = + CancelSafeProcessor::new(identifier, thumbnail_path.clone(), process_fut)?.await?; let (builder, stream) = if let Some(web::Header(range_header)) = range { if let Some(range) = range::single_bytes_range(&range_header) { diff --git a/src/processor.rs b/src/processor.rs index 0e01a5d..bf7c749 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -20,7 +20,10 @@ pub(crate) struct Crop(usize, usize); pub(crate) struct Blur(f64); #[instrument] -pub(crate) fn build_chain(args: &[(String, String)]) -> Result<(PathBuf, Vec), Error> { +pub(crate) fn build_chain( + args: &[(String, String)], + ext: &str, +) -> Result<(PathBuf, Vec), Error> { fn parse(key: &str, value: &str) -> Result, UploadError> { if key == P::NAME { return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?)); @@ -37,7 +40,7 @@ pub(crate) fn build_chain(args: &[(String, String)]) -> Result<(PathBuf, Vec Result<(PathBuf, Vec { + sleep: Option>>, + + woken: Arc, + + #[pin] + inner: S, + } +} + #[async_trait::async_trait(?Send)] impl Store for ObjectStore { type Identifier = ObjectId; @@ -107,7 +124,10 @@ impl Store for ObjectStore { let response = request.response().await.map_err(ObjectError::from)?; - Ok(Box::pin(io_error(response.bytes_stream()))) + Ok(Box::pin(timeout( + io_error(response.bytes_stream()), + std::time::Duration::from_secs(5), + ))) } #[tracing::instrument(skip(writer))] @@ -238,6 +258,17 @@ where IoError { inner: stream } } +fn timeout(stream: S, duration: std::time::Duration) -> impl Stream> +where + S: Stream>, +{ + Timeout { + sleep: Some(Box::pin(actix_rt::time::sleep(duration))), + woken: Arc::new(AtomicBool::new(true)), + inner: stream, + } +} + impl Stream for IoError where S: Stream>, @@ -254,6 +285,56 @@ where } } +struct TimeoutWaker { + woken: Arc, + inner: Waker, +} + +impl Wake for TimeoutWaker { + fn wake(self: Arc) { + self.wake_by_ref() + } + + fn wake_by_ref(self: &Arc) { + self.woken.store(true, Ordering::Release); + self.inner.wake_by_ref(); + } +} + +impl Stream for Timeout +where + S: Stream>, +{ + type Item = std::io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.as_mut().project(); + + if this.woken.swap(false, Ordering::Acquire) { + if let Some(mut sleep) = this.sleep.take() { + let timeout_waker = Arc::new(TimeoutWaker { + woken: Arc::clone(this.woken), + inner: cx.waker().clone(), + }) + .into(); + let mut timeout_cx = Context::from_waker(&timeout_waker); + if let Poll::Ready(()) = sleep.as_mut().poll(&mut timeout_cx) { + return Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Stream timeout".to_string(), + )))); + } else { + *this.sleep = Some(sleep); + } + } else { + return Poll::Ready(None); + } + } + + this.inner.poll_next(cx) + } +} + impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectStore")