Clean tracing, simplify validation, rename InputFormat -> VideoFormat
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2022-10-01 21:17:18 -05:00
parent e7cf21f862
commit 718f09c43a
15 changed files with 107 additions and 168 deletions

View file

@ -1,6 +1,6 @@
use crate::{
error::Error,
ffmpeg::InputFormat,
ffmpeg::VideoFormat,
magick::{video_mp4, video_webm, ValidInputType},
serde_str::Serde,
store::Store,
@ -29,7 +29,6 @@ impl Details {
|| self.content_type.type_() == "image" && self.content_type.subtype() == "gif"
}
#[tracing::instrument("Details from bytes", skip(input))]
pub(crate) async fn from_bytes(input: web::Bytes, hint: ValidInputType) -> Result<Self, Error> {
let details = if hint.is_video() {
crate::ffmpeg::details_bytes(input.clone()).await?
@ -51,7 +50,6 @@ impl Details {
))
}
#[tracing::instrument("Details from store")]
pub(crate) async fn from_store<S: Store + 'static>(
store: S,
identifier: S::Identifier,
@ -100,17 +98,17 @@ impl Details {
self.created_at.into()
}
pub(crate) fn to_input_format(&self) -> Option<InputFormat> {
pub(crate) fn to_input_format(&self) -> Option<VideoFormat> {
if *self.content_type == mime::IMAGE_GIF {
return Some(InputFormat::Gif);
return Some(VideoFormat::Gif);
}
if *self.content_type == video_mp4() {
return Some(InputFormat::Mp4);
return Some(VideoFormat::Mp4);
}
if *self.content_type == video_webm() {
return Some(InputFormat::Webm);
return Some(VideoFormat::Webm);
}
None

View file

@ -2,7 +2,7 @@ use crate::process::Process;
use actix_web::web::Bytes;
use tokio::io::AsyncRead;
#[tracing::instrument(name = "Clearing metadata", skip(input))]
#[tracing::instrument(level = "trace", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?;

View file

@ -1,5 +1,5 @@
use crate::{
config::{AudioCodec, VideoCodec},
config::{AudioCodec, ImageFormat, VideoCodec},
error::{Error, UploadError},
magick::{Details, ValidInputType},
process::Process,
@ -7,10 +7,9 @@ use crate::{
};
use actix_web::web::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::instrument;
#[derive(Clone, Copy, Debug)]
pub(crate) enum InputFormat {
pub(crate) enum VideoFormat {
Gif,
Mp4,
Webm,
@ -28,7 +27,26 @@ pub(crate) enum ThumbnailFormat {
// Webp,
}
impl InputFormat {
#[derive(Clone, Copy, Debug)]
pub(crate) enum FileFormat {
Image(ImageFormat),
Video(VideoFormat),
}
impl ValidInputType {
pub(crate) fn to_file_format(self) -> FileFormat {
match self {
Self::Gif => FileFormat::Video(VideoFormat::Gif),
Self::Mp4 => FileFormat::Video(VideoFormat::Mp4),
Self::Webm => FileFormat::Video(VideoFormat::Webm),
Self::Jpeg => FileFormat::Image(ImageFormat::Jpeg),
Self::Png => FileFormat::Image(ImageFormat::Png),
Self::Webp => FileFormat::Image(ImageFormat::Webp),
}
}
}
impl VideoFormat {
const fn to_file_extension(self) -> &'static str {
match self {
Self::Gif => ".gif",
@ -121,10 +139,10 @@ impl AudioCodec {
}
}
const FORMAT_MAPPINGS: &[(&str, InputFormat)] = &[
("gif", InputFormat::Gif),
("mp4", InputFormat::Mp4),
("webm", InputFormat::Webm),
const FORMAT_MAPPINGS: &[(&str, VideoFormat)] = &[
("gif", VideoFormat::Gif),
("mp4", VideoFormat::Mp4),
("webm", VideoFormat::Webm),
];
pub(crate) async fn input_type_bytes(input: Bytes) -> Result<Option<ValidInputType>, Error> {
@ -155,6 +173,7 @@ pub(crate) async fn details_bytes(input: Bytes) -> Result<Option<Details>, Error
.await
}
#[tracing::instrument(skip(f))]
async fn details_file<F, Fut>(f: F) -> Result<Option<Details>, Error>
where
F: FnOnce(crate::file::File) -> Fut,
@ -193,7 +212,7 @@ where
}
fn parse_details(output: std::borrow::Cow<'_, str>) -> Result<Option<Details>, Error> {
tracing::info!("OUTPUT: {}", output);
tracing::debug!("OUTPUT: {}", output);
let mut lines = output.lines();
@ -230,7 +249,7 @@ fn parse_details_inner(
width: &str,
height: &str,
frames: &str,
format: InputFormat,
format: VideoFormat,
) -> Result<Details, Error> {
let width = width.parse().map_err(|_| UploadError::UnsupportedFormat)?;
let height = height.parse().map_err(|_| UploadError::UnsupportedFormat)?;
@ -244,10 +263,10 @@ fn parse_details_inner(
})
}
#[tracing::instrument(name = "Transcode video", skip(input))]
#[tracing::instrument(skip(input))]
pub(crate) async fn trancsocde_bytes(
input: Bytes,
input_format: InputFormat,
input_format: VideoFormat,
permit_audio: bool,
video_codec: VideoCodec,
audio_codec: Option<AudioCodec>,
@ -318,11 +337,11 @@ pub(crate) async fn trancsocde_bytes(
Ok(Box::pin(clean_reader))
}
#[instrument(name = "Create video thumbnail")]
#[tracing::instrument]
pub(crate) async fn thumbnail<S: Store>(
store: S,
from: S::Identifier,
input_format: InputFormat,
input_format: VideoFormat,
format: ThumbnailFormat,
) -> Result<impl AsyncRead + Unpin, Error> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension()));

View file

@ -126,7 +126,7 @@ mod io_uring {
impl File {
pub(crate) async fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
tracing::info!("Opening io-uring file: {:?}", path.as_ref());
tracing::debug!("Opening io-uring file: {:?}", path.as_ref());
Ok(File {
path: path.as_ref().to_owned(),
inner: tracing::trace_span!(parent: None, "Open File")
@ -136,7 +136,7 @@ mod io_uring {
}
pub(crate) async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> {
tracing::info!("Creating io-uring file: {:?}", path.as_ref());
tracing::debug!("Creating io-uring file: {:?}", path.as_ref());
Ok(File {
path: path.as_ref().to_owned(),
inner: tracing::trace_span!(parent: None, "Create File")

View file

@ -3,7 +3,7 @@ use crate::{
config::ImageFormat,
details::Details,
error::Error,
ffmpeg::{InputFormat, ThumbnailFormat},
ffmpeg::{ThumbnailFormat, VideoFormat},
repo::{Alias, FullRepo},
store::Store,
};
@ -21,7 +21,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
alias: Alias,
thumbnail_path: PathBuf,
thumbnail_args: Vec<String>,
input_format: Option<InputFormat>,
input_format: Option<VideoFormat>,
thumbnail_format: Option<ThumbnailFormat>,
hash: R::Bytes,
) -> Result<(Details, Bytes), Error> {
@ -52,7 +52,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
alias: Alias,
thumbnail_path: PathBuf,
thumbnail_args: Vec<String>,
input_format: Option<InputFormat>,
input_format: Option<VideoFormat>,
thumbnail_format: Option<ThumbnailFormat>,
hash: R::Bytes,
) -> Result<(Details, Bytes), Error> {
@ -68,7 +68,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
let reader = crate::ffmpeg::thumbnail(
store.clone(),
identifier,
input_format.unwrap_or(InputFormat::Mp4),
input_format.unwrap_or(VideoFormat::Mp4),
thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg),
)
.await?;

View file

@ -27,7 +27,7 @@ where
identifier: Option<S::Identifier>,
}
#[tracing::instrument(name = "Aggregate", skip(stream))]
#[tracing::instrument(skip(stream))]
async fn aggregate<S>(mut stream: S) -> Result<Bytes, Error>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
@ -41,7 +41,7 @@ where
Ok(buf.into_bytes())
}
#[tracing::instrument(name = "Ingest", skip(stream))]
#[tracing::instrument(skip(stream))]
pub(crate) async fn ingest<R, S>(
repo: &R,
store: &S,
@ -231,12 +231,13 @@ where
{
#[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))]
fn drop(&mut self) {
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
cleanup_parent_span.follows_from(Span::current());
if let Some(hash) = self.hash.take() {
let repo = self.repo.clone();
let cleanup_span =
tracing::info_span!(parent: None, "Session cleanup hash", hash = ?hash);
cleanup_span.follows_from(Span::current());
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
@ -251,9 +252,7 @@ where
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
let cleanup_span =
tracing::info_span!(parent: None, "Session cleanup alias", alias = ?alias);
cleanup_span.follows_from(Span::current());
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
@ -275,8 +274,7 @@ where
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: None, "Session cleanup identifier", identifier = ?identifier);
cleanup_span.follows_from(Span::current());
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(

View file

@ -10,7 +10,6 @@ use tokio::{
io::{AsyncRead, AsyncReadExt},
process::Command,
};
use tracing::instrument;
pub(crate) fn details_hint(alias: &Alias) -> Option<ValidInputType> {
let ext = alias.extension()?;
@ -114,14 +113,7 @@ pub(crate) struct Details {
pub(crate) frames: Option<usize>,
}
#[tracing::instrument(name = "Clear Metadata", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run("magick", &["convert", "-", "-strip", "-"])?;
Ok(process.bytes_read(input))
}
#[tracing::instrument(name = "Convert", skip(input))]
#[tracing::instrument(level = "debug", skip(input))]
pub(crate) fn convert_bytes_read(
input: Bytes,
format: ImageFormat,
@ -139,7 +131,7 @@ pub(crate) fn convert_bytes_read(
Ok(process.bytes_read(input))
}
#[instrument(name = "Getting details from input bytes", skip(input))]
#[tracing::instrument(skip(input))]
pub(crate) async fn details_bytes(
input: Bytes,
hint: Option<ValidInputType>,
@ -290,7 +282,6 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, Error> {
})
}
#[instrument(name = "Getting input type from bytes", skip(input))]
pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Error> {
details_bytes(input, None).await?.validate_input()
}
@ -308,7 +299,6 @@ fn process_image(args: Vec<String>, format: ImageFormat) -> std::io::Result<Proc
)
}
#[instrument(name = "Spawning process command")]
pub(crate) fn process_image_store_read<S: Store + 'static>(
store: S,
identifier: S::Identifier,
@ -318,7 +308,6 @@ pub(crate) fn process_image_store_read<S: Store + 'static>(
Ok(process_image(args, format)?.store_read(store, identifier))
}
#[instrument(name = "Spawning process command", skip(async_read))]
pub(crate) fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
async_read: A,
args: Vec<String>,
@ -328,7 +317,7 @@ pub(crate) fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
}
impl Details {
#[instrument(name = "Validating input type")]
#[tracing::instrument(level = "debug", name = "Validating input type")]
pub(crate) fn validate_input(&self) -> Result<ValidInputType, Error> {
if self.width > crate::CONFIG.media.max_width
|| self.height > crate::CONFIG.media.max_height

View file

@ -958,6 +958,7 @@ async fn aliases<R: FullRepo>(
})))
}
#[instrument(name = "Fetching identifier", skip(repo))]
async fn identifier<R: FullRepo, S: Store>(
query: web::Query<AliasQuery>,
repo: web::Data<R>,

View file

@ -42,13 +42,11 @@ pin_project_lite::pin_project! {
}
impl Process {
#[tracing::instrument]
pub(crate) fn run(command: &str, args: &[&str]) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Create command")
.in_scope(|| Self::spawn(Command::new(command).args(args)))
}
#[tracing::instrument]
pub(crate) fn spawn(cmd: &mut Command) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Spawn command").in_scope(|| {
let cmd = cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
@ -66,37 +64,34 @@ impl Process {
Ok(())
}
#[tracing::instrument(skip(input))]
pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin {
self.read_fn(move |mut stdin| {
self.spawn_fn(move |mut stdin| {
let mut input = input;
async move { stdin.write_all_buf(&mut input).await }
})
}
#[tracing::instrument]
pub(crate) fn read(self) -> impl AsyncRead + Unpin {
self.read_fn(|_| async { Ok(()) })
self.spawn_fn(|_| async { Ok(()) })
}
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
self,
mut async_read: A,
) -> impl AsyncRead + Unpin {
self.read_fn(move |mut stdin| async move {
self.spawn_fn(move |mut stdin| async move {
tokio::io::copy(&mut async_read, &mut stdin)
.await
.map(|_| ())
})
}
#[tracing::instrument]
pub(crate) fn store_read<S: Store + 'static>(
self,
store: S,
identifier: S::Identifier,
) -> impl AsyncRead + Unpin {
self.read_fn(move |mut stdin| {
self.spawn_fn(move |mut stdin| {
let store = store;
let identifier = identifier;
@ -104,7 +99,8 @@ impl Process {
})
}
fn read_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin
#[tracing::instrument(skip(f))]
fn spawn_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin
where
F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>,

View file

@ -1,6 +1,5 @@
use crate::error::{Error, UploadError};
use std::path::PathBuf;
use tracing::instrument;
pub(crate) trait Processor {
const NAME: &'static str;
@ -88,7 +87,7 @@ impl ResizeKind {
}
}
#[instrument]
#[tracing::instrument(level = "debug")]
pub(crate) fn build_chain(
args: &[(String, String)],
ext: &str,

View file

@ -606,7 +606,7 @@ where
Ok(())
}
#[tracing::instrument(skip(repo))]
#[tracing::instrument(level = "debug", skip(repo))]
async fn migrate_identifier_details<T>(
repo: &T,
old: &FileId,

View file

@ -152,7 +152,7 @@ fn insert_cache_inverse(
bucket.insert(alias_bytes.to_vec());
tracing::info!("Inserting new {:?}", bucket);
tracing::trace!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
@ -218,10 +218,10 @@ impl CachedRepo for SledRepo {
bucket.remove(&alias_bytes);
if bucket.is_empty() {
tracing::info!("Removed old {:?}", bucket);
tracing::trace!("Removed old {:?}", bucket);
None
} else {
tracing::info!("Inserting old {:?}", bucket);
tracing::trace!("Inserting old {:?}", bucket);
Some(serde_cbor::to_vec(&bucket))
}
})
@ -243,12 +243,12 @@ impl CachedRepo for SledRepo {
cache_inverse.range(..to_clean_bytes).filter_map(Result::ok)
{
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::info!("Checking {}", datetime);
tracing::trace!("Checking {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(&bucket_bytes) {
tracing::info!("Read for deletion: {:?}", bucket);
tracing::trace!("Read for deletion: {:?}", bucket);
for alias_bytes in bucket {
// Best effort cleanup
let _ = cache.remove(&alias_bytes);
@ -266,7 +266,7 @@ impl CachedRepo for SledRepo {
#[cfg(debug)]
for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) {
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::info!("Not cleaning for {}", datetime);
tracing::trace!("Not cleaning for {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
@ -468,21 +468,21 @@ impl QueueRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
#[tracing::instrument(skip(value))]
#[tracing::instrument(level = "trace", skip(value))]
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> {
b!(self.settings, settings.insert(key, value));
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error> {
let opt = b!(self.settings, settings.get(key));
Ok(opt)
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), Error> {
b!(self.settings, settings.remove(key));
@ -505,7 +505,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
#[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo {
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
@ -522,7 +522,7 @@ impl IdentifierRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error> {
let key = identifier.to_bytes()?;
@ -535,7 +535,7 @@ impl IdentifierRepo for SledRepo {
}
}
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error> {
let key = identifier.to_bytes()?;
@ -568,7 +568,7 @@ impl HashRepo for SledRepo {
Box::pin(from_iterator(iter, 8))
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
@ -578,7 +578,7 @@ impl HashRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
@ -588,7 +588,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
@ -611,7 +611,7 @@ impl HashRepo for SledRepo {
Ok(v)
}
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -624,7 +624,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
@ -633,7 +633,7 @@ impl HashRepo for SledRepo {
.and_then(|ivec| I::from_bytes(ivec.to_vec()))
}
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -651,7 +651,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -693,7 +693,7 @@ impl HashRepo for SledRepo {
Ok(vec)
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> {
let key = variant_key(&hash, &variant);
@ -705,7 +705,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
#[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -721,7 +721,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -785,7 +785,7 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -798,7 +798,7 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_delete_token(
&self,
alias: &Alias,
@ -815,7 +815,7 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error> {
let key = alias.to_bytes();
@ -826,7 +826,7 @@ impl AliasRepo for SledRepo {
.map_err(Error::from)
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> {
let key = alias.to_bytes();
@ -835,7 +835,7 @@ impl AliasRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error> {
let key = alias.to_bytes();

View file

@ -13,7 +13,7 @@ use std::{
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument, Instrument};
use tracing::Instrument;
mod file_id;
pub(crate) use file_id::FileId;
@ -211,7 +211,6 @@ impl FileStore {
}
// Try writing to a file
#[instrument(name = "Saving file", skip(bytes), fields(path = tracing::field::debug(&path.as_ref())))]
async fn safe_save_bytes<P: AsRef<Path>>(
&self,
path: P,
@ -220,7 +219,6 @@ impl FileStore {
safe_create_parent(&path).await?;
// Only write the file if it doesn't already exist
debug!("Checking if {:?} already exists", path.as_ref());
if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
@ -230,23 +228,18 @@ impl FileStore {
}
// Open the file for writing
debug!("Creating {:?}", path.as_ref());
let mut file = File::create(&path).await?;
// try writing
debug!("Writing to {:?}", path.as_ref());
if let Err(e) = file.write_from_bytes(bytes).await {
error!("Error writing {:?}, {}", path.as_ref(), format!("{}", e));
// remove file if writing failed before completion
self.safe_remove_file(path).await?;
return Err(e.into());
}
debug!("{:?} written", path.as_ref());
Ok(())
}
#[instrument(skip(input), fields(to = tracing::field::debug(&to.as_ref())))]
async fn safe_save_reader<P: AsRef<Path>>(
&self,
to: P,
@ -254,7 +247,6 @@ impl FileStore {
) -> Result<(), FileError> {
safe_create_parent(&to).await?;
debug!("Checking if {:?} already exists", to.as_ref());
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
@ -263,8 +255,6 @@ impl FileStore {
return Err(FileError::FileExists);
}
debug!("Writing stream to {:?}", to.as_ref());
let mut file = File::create(to).await?;
file.write_from_async_read(input).await?;
@ -275,7 +265,6 @@ impl FileStore {
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {
if let Some(path) = path.as_ref().parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}

View file

@ -163,7 +163,6 @@ impl Store for ObjectStore {
type Identifier = ObjectId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin + 'static,

View file

@ -2,12 +2,11 @@ use crate::{
config::{AudioCodec, ImageFormat, VideoCodec},
either::Either,
error::{Error, UploadError},
ffmpeg::InputFormat,
ffmpeg::FileFormat,
magick::ValidInputType,
};
use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use tracing::instrument;
struct UnvalidatedBytes {
bytes: Bytes,
@ -36,7 +35,7 @@ impl AsyncRead for UnvalidatedBytes {
}
}
#[instrument(name = "Validate media", skip(bytes))]
#[tracing::instrument(skip_all)]
pub(crate) async fn validate_bytes(
bytes: Bytes,
prescribed_format: Option<ImageFormat>,
@ -57,8 +56,8 @@ pub(crate) async fn validate_bytes(
return Ok((input_type, Either::left(UnvalidatedBytes::new(bytes))));
}
match (prescribed_format, input_type) {
(_, ValidInputType::Gif) => {
match (input_type.to_file_format(), prescribed_format) {
(FileFormat::Video(video_format), _) => {
if !(enable_silent_video || enable_full_video) {
return Err(UploadError::SilentVideoDisabled.into());
}
@ -67,25 +66,7 @@ pub(crate) async fn validate_bytes(
Either::right(Either::left(
crate::ffmpeg::trancsocde_bytes(
bytes,
InputFormat::Gif,
false,
video_codec,
audio_codec,
)
.await?,
)),
))
}
(_, ValidInputType::Mp4) => {
if !(enable_silent_video || enable_full_video) {
return Err(UploadError::SilentVideoDisabled.into());
}
Ok((
ValidInputType::from_video_codec(video_codec),
Either::right(Either::left(
crate::ffmpeg::trancsocde_bytes(
bytes,
InputFormat::Mp4,
video_format,
enable_full_video,
video_codec,
audio_codec,
@ -94,47 +75,17 @@ pub(crate) async fn validate_bytes(
)),
))
}
(_, ValidInputType::Webm) => {
if !(enable_silent_video || enable_full_video) {
return Err(UploadError::SilentVideoDisabled.into());
}
Ok((
ValidInputType::from_video_codec(video_codec),
Either::right(Either::left(
crate::ffmpeg::trancsocde_bytes(
bytes,
InputFormat::Webm,
enable_full_video,
video_codec,
audio_codec,
)
.await?,
)),
))
}
(Some(ImageFormat::Jpeg) | None, ValidInputType::Jpeg) => Ok((
ValidInputType::Jpeg,
Either::right(Either::right(Either::left(
crate::exiftool::clear_metadata_bytes_read(bytes)?,
))),
)),
(Some(ImageFormat::Png) | None, ValidInputType::Png) => Ok((
ValidInputType::Png,
Either::right(Either::right(Either::left(
crate::exiftool::clear_metadata_bytes_read(bytes)?,
))),
)),
(Some(ImageFormat::Webp) | None, ValidInputType::Webp) => Ok((
ValidInputType::Webp,
Either::right(Either::right(Either::right(Either::left(
crate::magick::clear_metadata_bytes_read(bytes)?,
)))),
)),
(Some(format), _) => Ok((
(FileFormat::Image(image_format), Some(format)) if image_format != format => Ok((
ValidInputType::from_format(format),
Either::right(Either::right(Either::right(Either::right(
Either::right(Either::right(Either::left(
crate::magick::convert_bytes_read(bytes, format)?,
)))),
))),
)),
(FileFormat::Image(image_format), _) => Ok((
ValidInputType::from_format(image_format),
Either::right(Either::right(Either::right(
crate::exiftool::clear_metadata_bytes_read(bytes)?,
))),
)),
}
}