Use tmp files for mp4s

This commit is contained in:
Aode (lion) 2021-10-23 14:14:12 -05:00
parent 6f04595c3b
commit 26a2401027
9 changed files with 323 additions and 54 deletions

View file

@ -1,4 +1,8 @@
use crate::{error::Error, process::Process, store::Store};
use crate::{
error::{Error, UploadError},
process::Process,
store::Store,
};
use actix_web::web::Bytes;
use tokio::io::AsyncRead;
use tracing::instrument;
@ -16,10 +20,10 @@ pub(crate) enum ThumbnailFormat {
}
impl InputFormat {
fn as_format(&self) -> &'static str {
fn to_ext(&self) -> &'static str {
match self {
InputFormat::Gif => "gif_pipe",
InputFormat::Mp4 => "mp4",
InputFormat::Gif => ".gif",
InputFormat::Mp4 => ".mp4",
}
}
}
@ -32,6 +36,12 @@ impl ThumbnailFormat {
}
}
fn to_ext(&self) -> &'static str {
match self {
ThumbnailFormat::Jpeg => ".jpeg",
}
}
fn as_format(&self) -> &'static str {
match self {
ThumbnailFormat::Jpeg => "singlejpeg",
@ -40,19 +50,27 @@ impl ThumbnailFormat {
}
}
pub(crate) fn to_mp4_bytes(
pub(crate) async fn to_mp4_bytes(
input: Bytes,
input_format: InputFormat,
) -> std::io::Result<impl AsyncRead + Unpin> {
) -> Result<impl AsyncRead + Unpin, Error> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext()));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
let output_file = crate::tmp_file::tmp_file(Some(".mp4"));
let output_file_str = output_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&output_file).await?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one.write_from_bytes(input).await?;
tmp_one.close().await?;
let process = Process::run(
"ffmpeg",
&[
"-f",
input_format.as_format(),
"-i",
"pipe:",
"-movflags",
"faststart+frag_keyframe+empty_moov",
&input_file_str,
"-pix_fmt",
"yuv420p",
"-vf",
@ -62,11 +80,19 @@ pub(crate) fn to_mp4_bytes(
"h264",
"-f",
"mp4",
"pipe:",
&output_file_str,
],
)?;
Ok(process.bytes_read(input).unwrap())
process.wait().await?;
tokio::fs::remove_file(input_file).await?;
let tmp_two = crate::file::File::open(&output_file).await?;
let stream = tmp_two.read_to_stream(None, None).await?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);
Ok(Box::pin(clean_reader))
}
#[instrument(name = "Create video thumbnail")]
@ -75,23 +101,46 @@ pub(crate) async fn thumbnail<S: Store>(
from: S::Identifier,
input_format: InputFormat,
format: ThumbnailFormat,
) -> Result<impl AsyncRead + Unpin, Error> {
) -> Result<impl AsyncRead + Unpin, Error>
where
Error: From<S::Error>,
{
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_ext()));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
let output_file = crate::tmp_file::tmp_file(Some(format.to_ext()));
let output_file_str = output_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&output_file).await?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one
.write_from_stream(store.to_stream(&from, None, None).await?)
.await?;
tmp_one.close().await?;
let process = Process::run(
"ffmpeg",
&[
"-f",
input_format.as_format(),
"-i",
"pipe:",
&input_file_str,
"-vframes",
"1",
"-codec",
format.as_codec(),
"-f",
format.as_format(),
"pipe:",
&output_file_str,
],
)?;
Ok(process.store_read(store, from).unwrap())
process.wait().await?;
tokio::fs::remove_file(input_file).await?;
let tmp_two = crate::file::File::open(&output_file).await?;
let stream = tmp_two.read_to_stream(None, None).await?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);
Ok(Box::pin(clean_reader))
}

View file

@ -8,8 +8,8 @@ pub(crate) use tokio_file::File;
mod tokio_file {
use crate::{store::file_store::FileError, Either};
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::Stream;
use std::{io::SeekFrom, path::Path};
use futures_util::stream::{Stream, StreamExt};
use std::{io::SeekFrom, path::Path, pin::Pin};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead};
@ -35,6 +35,22 @@ mod tokio_file {
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, mut stream: S) -> std::io::Result<()>
where
S: Stream<Item = std::io::Result<Bytes>>,
{
// SAFETY: pinned stream shadows original stream so it cannot be moved
let mut stream = unsafe { Pin::new_unchecked(&mut stream) };
while let Some(res) = stream.next().await {
let mut bytes = res?;
self.inner.write_all_buf(&mut bytes).await?;
}
Ok(())
}
pub(crate) async fn write_from_async_read<R>(
&mut self,
mut reader: R,
@ -46,6 +62,10 @@ mod tokio_file {
Ok(())
}
pub(crate) async fn close(self) -> std::io::Result<()> {
Ok(())
}
pub(crate) async fn read_to_async_write<W>(&mut self, writer: &mut W) -> std::io::Result<()>
where
W: AsyncWrite + Unpin + ?Sized,
@ -112,7 +132,7 @@ mod tokio_file {
mod io_uring {
use crate::store::file_store::FileError;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use futures_util::stream::{Stream, StreamExt};
use std::{
convert::TryInto,
fs::Metadata,
@ -182,6 +202,50 @@ mod io_uring {
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, mut stream: S) -> std::io::Result<()>
where
S: Stream<Item = std::io::Result<Bytes>>,
{
// SAFETY: pinned stream shadows original stream so it cannot be moved
let mut stream = unsafe { Pin::new_unchecked(&mut stream) };
let mut cursor: u64 = 0;
while let Some(res) = stream.next().await {
let bytes = res?;
let mut buf = bytes.to_vec();
let len = buf.len();
let mut position = 0;
loop {
if position == len {
break;
}
let position_u64: u64 = position.try_into().unwrap();
let (res, slice) = self
.write_at(buf.slice(position..len), cursor + position_u64)
.await;
let n = res?;
if n == 0 {
return Err(std::io::ErrorKind::UnexpectedEof.into());
}
position += n;
buf = slice.into_inner();
}
let position: u64 = position.try_into().unwrap();
cursor += position;
}
self.inner.sync_all().await?;
Ok(())
}
pub(crate) async fn write_from_async_read<R>(
&mut self,
mut reader: R,
@ -232,6 +296,10 @@ mod io_uring {
Ok(())
}
pub(crate) async fn close(self) -> std::io::Result<()> {
self.inner.close().await
}
pub(crate) async fn read_to_async_write<W>(&mut self, writer: &mut W) -> std::io::Result<()>
where
W: AsyncWrite + Unpin + ?Sized,

View file

@ -57,6 +57,10 @@ impl ValidInputType {
}
}
fn is_mp4(&self) -> bool {
matches!(self, Self::Mp4)
}
pub(crate) fn from_format(format: Format) -> Self {
match format {
Format::Jpeg => ValidInputType::Jpeg,
@ -79,11 +83,40 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl As
Ok(process.bytes_read(input).unwrap())
}
pub(crate) fn convert_bytes_read(
input: Bytes,
format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run(
"magick",
&[
"convert",
"-",
"-strip",
format!("{}:-", format.to_magick_format()).as_str(),
],
)?;
Ok(process.bytes_read(input).unwrap())
}
#[instrument(name = "Getting details from input bytes", skip(input))]
pub(crate) async fn details_bytes(
input: Bytes,
hint: Option<ValidInputType>,
) -> Result<Details, Error> {
if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) {
let input_file = crate::tmp_file::tmp_file(Some(".mp4"));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one.write_from_bytes(input).await?;
tmp_one.close().await?;
return details_file(input_file_str).await;
}
let last_arg = if let Some(expected_format) = hint {
format!("{}:-", expected_format.to_str())
} else {
@ -104,29 +137,29 @@ pub(crate) async fn details_bytes(
parse_details(s)
}
pub(crate) fn convert_bytes_read(
input: Bytes,
format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> {
let process = Process::run(
"magick",
&[
"convert",
"-",
"-strip",
format!("{}:-", format.to_magick_format()).as_str(),
],
)?;
Ok(process.bytes_read(input).unwrap())
}
pub(crate) async fn details_store<S: Store>(
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,
) -> Result<Details, Error> {
let last_arg = if let Some(expected_format) = expected_format {
hint: Option<ValidInputType>,
) -> Result<Details, Error>
where
Error: From<S::Error>,
{
if hint.as_ref().map(|h| h.is_mp4()).unwrap_or(false) {
let input_file = crate::tmp_file::tmp_file(Some(".mp4"));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one
.write_from_stream(store.to_stream(&identifier, None, None).await?)
.await?;
tmp_one.close().await?;
return details_file(input_file_str).await;
}
let last_arg = if let Some(expected_format) = hint {
format!("{}:-", expected_format.to_str())
} else {
"-".to_owned()
@ -147,6 +180,23 @@ pub(crate) async fn details_store<S: Store>(
parse_details(s)
}
pub(crate) async fn details_file(path_str: &str) -> Result<Details, Error> {
let process = Process::run(
"magick",
&["identify", "-ping", "-format", "%w %h | %m\n", &path_str],
)?;
let mut reader = process.read().unwrap();
let mut output = Vec::new();
reader.read_to_end(&mut output).await?;
tokio::fs::remove_file(path_str).await?;
let s = String::from_utf8_lossy(&output);
parse_details(s)
}
fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, Error> {
let mut lines = s.lines();
let first = lines.next().ok_or(UploadError::UnsupportedFormat)?;

View file

@ -28,6 +28,7 @@ mod either;
mod error;
mod exiftool;
mod ffmpeg;
mod file;
mod init_tracing;
mod magick;
mod map_error;
@ -37,6 +38,7 @@ mod process;
mod processor;
mod range;
mod store;
mod tmp_file;
mod upload_manager;
mod validate;
@ -821,6 +823,8 @@ where
.run()
.await?;
crate::tmp_file::remove_tmp_dir().await?;
Ok(())
}

View file

@ -63,6 +63,14 @@ impl Process {
cmd.spawn().map(|child| Process { child, span })
}
pub(crate) async fn wait(mut self) -> std::io::Result<()> {
let status = self.child.wait().await?;
if !status.success() {
return Err(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
Ok(())
}
pub(crate) fn bytes_read(mut self, mut input: Bytes) -> Option<impl AsyncRead + Unpin> {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
@ -103,6 +111,39 @@ impl Process {
})
}
pub(crate) fn read(mut self) -> Option<impl AsyncRead + Unpin> {
let stdout = self.child.stdout.take()?;
let (tx, rx) = channel();
let span = self.spawn_span();
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx
.send(std::io::Error::new(std::io::ErrorKind::Other, &StatusError));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
}
.instrument(span),
);
Some(ProcessRead {
inner: stdout,
span: self.span,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
})
}
pub(crate) fn store_read<S: Store>(
mut self,
store: S,

View file

@ -1,4 +1,4 @@
use crate::store::Store;
use crate::{file::File, store::Store};
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::{
@ -10,10 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, error, instrument};
use uuid::Uuid;
mod file;
mod file_id;
mod restructure;
use file::File;
pub(crate) use file_id::FileId;
// - Settings Tree

58
src/tmp_file.rs Normal file
View file

@ -0,0 +1,58 @@
use once_cell::sync::Lazy;
use std::path::PathBuf;
use tokio::io::AsyncRead;
use uuid::Uuid;
static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
let dir = std::env::temp_dir().join(Uuid::new_v4().to_string());
std::fs::create_dir(&dir).unwrap();
dir
});
struct TmpFile(PathBuf);
impl Drop for TmpFile {
fn drop(&mut self) {
actix_rt::spawn(tokio::fs::remove_file(self.0.clone()));
}
}
pin_project_lite::pin_project! {
pub(crate) struct TmpFileCleanup<R> {
#[pin]
inner: R,
file: TmpFile,
}
}
pub(crate) fn tmp_file(ext: Option<&str>) -> PathBuf {
if let Some(ext) = ext {
TMP_DIR.join(format!("{}{}", Uuid::new_v4(), ext))
} else {
TMP_DIR.join(Uuid::new_v4().to_string())
}
}
pub(crate) async fn remove_tmp_dir() -> std::io::Result<()> {
tokio::fs::remove_dir_all(&*TMP_DIR).await
}
pub(crate) fn cleanup_tmpfile<R: AsyncRead>(reader: R, file: PathBuf) -> TmpFileCleanup<R> {
TmpFileCleanup {
inner: reader,
file: TmpFile(file),
}
}
impl<R: AsyncRead> AsyncRead for TmpFileCleanup<R> {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.as_mut().project();
this.inner.poll_read(cx, buf)
}
}

View file

@ -585,7 +585,10 @@ impl Details {
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,
) -> Result<Self, Error> {
) -> Result<Self, Error>
where
Error: From<S::Error>,
{
let details = crate::magick::details_store(store, identifier, expected_format).await?;
Ok(Details::now(

View file

@ -47,17 +47,15 @@ pub(crate) async fn validate_image_bytes(
match (prescribed_format, input_type) {
(_, ValidInputType::Gif) => Ok((
ValidInputType::Mp4,
Either::right(Either::left(crate::ffmpeg::to_mp4_bytes(
bytes,
InputFormat::Gif,
)?)),
Either::right(Either::left(
crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Gif).await?,
)),
)),
(_, ValidInputType::Mp4) => Ok((
ValidInputType::Mp4,
Either::right(Either::left(crate::ffmpeg::to_mp4_bytes(
bytes,
InputFormat::Mp4,
)?)),
Either::right(Either::left(
crate::ffmpeg::to_mp4_bytes(bytes, InputFormat::Mp4).await?,
)),
)),
(Some(Format::Jpeg) | None, ValidInputType::Jpeg) => Ok((
ValidInputType::Jpeg,