diff --git a/Cargo.lock b/Cargo.lock index 45ee660..8ceaad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,20 +34,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "actix-fs" -version = "0.1.0" -source = "git+https://git.asonix.dog/asonix/actix-fs?branch=asonix/actix-rt-2#aef0e3c557b1365c0c1039bedd321724cd09201f" -dependencies = [ - "actix-rt", - "bytes", - "futures-util", - "log", - "thiserror", - "tokio", - "tokio-stream", -] - [[package]] name = "actix-http" version = "3.0.0-beta.9" @@ -292,27 +278,6 @@ version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" -[[package]] -name = "async-stream" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" -dependencies = [ - "async-stream-impl", - "futures-core", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atty" version = "0.2.14" @@ -998,16 +963,13 @@ name = "pict-rs" version = "0.3.0-alpha.23" dependencies = [ "actix-form-data", - "actix-fs", "actix-rt", "actix-web", "anyhow", - "async-stream", "awc", "base64", - "futures", + "futures-core", "mime", - "num_cpus", "once_cell", "rand", "serde", @@ -1018,7 +980,6 @@ dependencies = [ "thiserror", "time 0.3.2", "tokio", - "tokio-stream", "tokio-util", "tracing", "tracing-futures", @@ -1477,9 +1438,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f58f7e8eaa0009c5fec437aabf511bd9933e4b2d7407bd05273c01a8906ea7" +checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84" dependencies = [ "proc-macro2", "quote", @@ -1602,37 +1563,14 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "tokio-macros", "winapi", ] -[[package]] -name = "tokio-macros" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tokio-stream" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" dependencies = [ "bytes", "futures-core", @@ -1739,9 +1677,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.13.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" +checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" [[package]] name = "ucd-trie" diff --git a/Cargo.toml b/Cargo.toml index d7a629b..8f1b70e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,16 +12,13 @@ edition = "2018" [dependencies] actix-form-data = "0.6.0-beta.1" -actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs", branch = "asonix/actix-rt-2" } actix-rt = "2.2.0" actix-web = { version = "4.0.0-beta.8", default-features = false } anyhow = "1.0" -async-stream = "0.3.0" awc = { version = "3.0.0-beta.7", default-features = false } base64 = "0.13.0" -futures = "0.3.4" +futures-core = "0.3.17" mime = "0.3.1" -num_cpus = "1" once_cell = "1.4.0" rand = "0.8.0" serde = { version = "1.0", features = ["derive"] } @@ -31,8 +28,7 @@ sled = { version = "0.34.6" } structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } -tokio = { version = "1", default-features = false, features = ["fs", "io-util", "macros", "process", "sync"] } -tokio-stream = { version = "0.1", default-features = false } +tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] } tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" tracing-futures = "0.2.4" diff --git a/src/error.rs b/src/error.rs index 7b80d52..eee7e05 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,9 +18,6 @@ pub(crate) enum UploadError { #[error("Error interacting with filesystem, {0}")] Io(#[from] std::io::Error), - #[error("Error in filesyste, {0}")] - Fs(#[from] actix_fs::Error), - #[error("Panic in blocking operation")] Canceled, diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index fd1d915..f1f873b 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -9,9 +9,6 @@ pub(crate) enum VideoError { #[error("Failed to convert file")] Status, - - #[error("Transcode semaphore is closed")] - Closed, } pub(crate) enum InputFormat { @@ -49,14 +46,6 @@ impl ThumbnailFormat { } } -static MAX_TRANSCODES: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); - -fn semaphore() -> &'static tokio::sync::Semaphore { - MAX_TRANSCODES - .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) -} - pub(crate) fn to_mp4_bytes( input: Bytes, input_format: InputFormat, @@ -92,8 +81,6 @@ where P1: AsRef, P2: AsRef, { - let permit = semaphore().acquire().await?; - let mut child = Command::new("ffmpeg") .arg(&"-i") .arg(&from.as_ref()) @@ -109,7 +96,6 @@ where .spawn()?; let status = child.wait().await?; - drop(permit); if !status.success() { return Err(VideoError::Status); @@ -117,9 +103,3 @@ where Ok(()) } - -impl From for VideoError { - fn from(_: tokio::sync::AcquireError) -> VideoError { - VideoError::Closed - } -} diff --git a/src/magick.rs b/src/magick.rs index 99fe8d6..b968067 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -11,9 +11,6 @@ pub(crate) enum MagickError { #[error("{0}")] IO(#[from] std::io::Error), - #[error("Magick semaphore is closed")] - Closed, - #[error("Invalid format")] Format, } @@ -32,14 +29,6 @@ pub(crate) struct Details { pub(crate) height: usize, } -static MAX_CONVERSIONS: once_cell::sync::OnceCell = - once_cell::sync::OnceCell::new(); - -fn semaphore() -> &'static tokio::sync::Semaphore { - MAX_CONVERSIONS - .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) -} - pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result { let process = Process::spawn(Command::new("magick").args(["convert", "-", "-strip", "-"]))?; @@ -81,16 +70,12 @@ pub(crate) async fn details

(file: P) -> Result where P: AsRef, { - let permit = semaphore().acquire().await?; - let output = Command::new("magick") .args([&"identify", &"-ping", &"-format", &"%w %h | %m\n"]) .arg(&file.as_ref()) .output() .await?; - drop(permit); - let s = String::from_utf8_lossy(&output.stdout); parse_details(s) @@ -140,8 +125,6 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result { } pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result { - let permit = semaphore().acquire().await.map_err(MagickError::from)?; - let mut child = Command::new("magick") .args(["identify", "-ping", "-format", "%m\n", "-"]) .stdin(Stdio::piped()) @@ -152,13 +135,13 @@ pub(crate) async fn input_type_bytes(mut input: Bytes) -> Result for MagickError { - fn from(_: tokio::sync::AcquireError) -> MagickError { - MagickError::Closed - } -} - impl From for MagickError { fn from(_: std::num::ParseIntError) -> MagickError { MagickError::Format diff --git a/src/main.rs b/src/main.rs index f9d7271..dd55389 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,11 @@ use actix_web::{ web, App, HttpResponse, HttpResponseBuilder, HttpServer, }; use awc::Client; -use futures::stream::{Stream, TryStreamExt}; +use futures_core::stream::Stream; use once_cell::sync::Lazy; -use std::{collections::HashSet, path::PathBuf, pin::Pin, time::SystemTime}; +use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime}; use structopt::StructOpt; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; @@ -31,6 +31,7 @@ use self::{ config::{Config, Format}, error::UploadError, middleware::{Internal, Tracing}, + stream::{once, LocalBoxStream}, upload_manager::{Details, UploadManager}, validate::{image_webp, video_mp4}, }; @@ -64,12 +65,12 @@ static CONFIG: Lazy = Lazy::new(Config::from_args); async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + tokio::fs::create_dir_all(path).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = actix_fs::metadata(to.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = tokio::fs::metadata(&to).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -77,8 +78,8 @@ async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { } debug!("Moving {:?} to {:?}", from, to); - actix_fs::copy(from.clone(), to).await?; - actix_fs::remove_file(from).await?; + tokio::fs::copy(&from, to).await?; + tokio::fs::remove_file(from).await?; Ok(()) } @@ -88,7 +89,7 @@ where { if let Some(path) = path.as_ref().parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + tokio::fs::create_dir_all(path).await?; } Ok(()) @@ -96,17 +97,17 @@ where // Try writing to a file #[instrument(skip(bytes))] -async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), UploadError> { +async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), UploadError> { if let Some(path) = path.parent() { // create the directory for the file debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + tokio::fs::create_dir_all(path).await?; } // Only write the file if it doesn't already exist debug!("Checking if {:?} already exists", path); - if let Err(e) = actix_fs::metadata(path.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = tokio::fs::metadata(&path).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -115,14 +116,14 @@ async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), UploadEr // Open the file for writing debug!("Creating {:?}", path); - let file = actix_fs::file::create(path.clone()).await?; + let mut file = tokio::fs::File::create(&path).await?; // try writing debug!("Writing to {:?}", path); - if let Err(e) = actix_fs::file::write(file, bytes).await { + if let Err(e) = file.write_all_buf(&mut bytes).await { error!("Error writing {:?}, {}", path, e); // remove file if writing failed before completion - actix_fs::remove_file(path).await?; + tokio::fs::remove_file(path).await?; return Err(e.into()); } debug!("{:?} written", path); @@ -240,7 +241,7 @@ async fn download( let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES); - let stream = Box::pin(futures::stream::once(fut)); + let stream = Box::pin(once(fut)); let alias = manager.upload(stream).await?; let delete_token = manager.delete_token(alias.clone()).await?; @@ -362,8 +363,8 @@ async fn process( prepare_process(query, ext.as_str(), &manager, &whitelist).await?; // If the thumbnail doesn't exist, we need to create it - let thumbnail_exists = if let Err(e) = actix_fs::metadata(thumbnail_path.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + let thumbnail_exists = if let Err(e) = tokio::fs::metadata(&thumbnail_path).await { + if e.kind() != std::io::ErrorKind::NotFound { error!("Error looking up processed image, {}", e); return Err(e.into()); } @@ -442,7 +443,7 @@ async fn process( return Ok(srv_response( HttpResponse::Ok(), - futures::stream::once(futures::future::ready(Ok(bytes) as Result<_, UploadError>)), + once(ready(Ok(bytes) as Result<_, UploadError>)), details.content_type(), 7 * DAYS, details.system_time(), @@ -527,9 +528,9 @@ async fn ranged_file_resp( if range_header.is_empty() { return Err(UploadError::Range); } else if range_header.len() == 1 { - let file = actix_fs::file::open(path).await?; + let file = tokio::fs::File::open(path).await?; - let (file, meta) = actix_fs::file::metadata(file).await?; + let meta = file.metadata().await?; let range = range_header.ranges().next().unwrap(); @@ -543,12 +544,8 @@ async fn ranged_file_resp( } //No Range header in the request - return the entire document None => { - let stream = actix_fs::read_to_stream(path) - .await? - .faster() - .map_err(UploadError::from); - let stream: Pin>>> = - Box::pin(stream); + let file = tokio::fs::File::open(path).await?; + let stream = Box::pin(crate::stream::bytes_stream(file)) as LocalBoxStream<'_, _>; (HttpResponse::Ok(), stream) } }; @@ -774,8 +771,8 @@ async fn main() -> Result<(), anyhow::Error> { .run() .await?; - if actix_fs::metadata(&*TMP_DIR).await.is_ok() { - actix_fs::remove_dir_all(&*TMP_DIR).await?; + if tokio::fs::metadata(&*TMP_DIR).await.is_ok() { + tokio::fs::remove_dir_all(&*TMP_DIR).await?; } Ok(()) diff --git a/src/middleware.rs b/src/middleware.rs index 7b44b59..2e98890 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,10 +1,13 @@ +use crate::stream::LocalBoxFuture; use actix_web::{ dev::{Service, ServiceRequest, Transform}, http::StatusCode, HttpResponse, ResponseError, }; -use futures::future::{ok, LocalBoxFuture, Ready}; -use std::task::{Context, Poll}; +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; use tracing_futures::{Instrument, Instrumented}; use uuid::Uuid; @@ -47,7 +50,7 @@ where type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { - ok(TracingMiddleware { inner: service }) + ready(Ok(TracingMiddleware { inner: service })) } } @@ -85,7 +88,7 @@ where type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { - ok(InternalMiddleware(self.0.clone(), service)) + ready(Ok(InternalMiddleware(self.0.clone(), service))) } } diff --git a/src/processor.rs b/src/processor.rs index f276670..f52264f 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -302,7 +302,7 @@ pub(crate) async fn prepare_image( let jpg_path = format!("{}.jpg", original_path_str); let jpg_path = PathBuf::from(jpg_path); - if actix_fs::metadata(jpg_path.clone()).await.is_ok() { + if tokio::fs::metadata(&jpg_path).await.is_ok() { return Ok(Some((jpg_path, Exists::Exists))); } @@ -316,7 +316,7 @@ pub(crate) async fn prepare_image( if let Err(e) = res { error!("transcode error: {:?}", e); - actix_fs::remove_file(tmpfile.clone()).await?; + tokio::fs::remove_file(&tmpfile).await?; return Err(e.into()); } diff --git a/src/range.rs b/src/range.rs index 5602c03..49ea67b 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,4 +1,7 @@ -use crate::UploadError; +use crate::{ + stream::{bytes_stream, LocalBoxStream}, + UploadError, +}; use actix_web::{ dev::Payload, http::{ @@ -8,9 +11,8 @@ use actix_web::{ web::Bytes, FromRequest, HttpRequest, }; -use futures::stream::{Stream, StreamExt, TryStreamExt}; -use std::{fs, io}; -use std::{future::ready, pin::Pin}; +use std::{future::ready, io}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; #[derive(Debug)] pub(crate) enum Range { @@ -45,32 +47,25 @@ impl Range { pub(crate) async fn chop_file( &self, - file: fs::File, - ) -> Result>>>, UploadError> { + mut file: tokio::fs::File, + ) -> Result>, UploadError> { match self { Range::RangeStart(start) => { - let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; + file.seek(io::SeekFrom::Start(*start)).await?; - Ok(Box::pin( - actix_fs::file::read_to_stream(file) - .await? - .faster() - .map_err(UploadError::from), - )) + Ok(Box::pin(bytes_stream(file))) } Range::SuffixLength(from_start) => { - let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(0)).await?; + file.seek(io::SeekFrom::Start(0)).await?; + let reader = file.take(*from_start); - Ok(Box::pin( - read_num_bytes_to_stream(file, *from_start as usize).await?, - )) + Ok(Box::pin(bytes_stream(reader))) } Range::Segment(start, end) => { - let (file, _) = actix_fs::file::seek(file, io::SeekFrom::Start(*start)).await?; + file.seek(io::SeekFrom::Start(*start)).await?; + let reader = file.take(end.saturating_sub(*start)); - Ok(Box::pin( - read_num_bytes_to_stream(file, end.saturating_sub(*start) as usize).await?, - )) + Ok(Box::pin(bytes_stream(reader))) } } } @@ -175,32 +170,3 @@ fn parse_range(s: &str) -> Result { Ok(Range::Segment(range_start, range_end)) } } - -async fn read_num_bytes_to_stream( - file: fs::File, - mut num_bytes: usize, -) -> Result>, UploadError> { - let mut stream = actix_fs::file::read_to_stream(file).await?; - - let stream = async_stream::stream! { - while let Some(res) = stream.next().await { - let read_bytes = res.as_ref().map(|b| b.len()).unwrap_or(0); - - if read_bytes == 0 { - break; - } - - yield res.map_err(UploadError::from).map(|bytes| { - if bytes.len() > num_bytes { - bytes.slice(0..num_bytes) - } else { - bytes - } - }); - - num_bytes = num_bytes.saturating_sub(read_bytes); - } - }; - - Ok(stream) -} diff --git a/src/stream.rs b/src/stream.rs index 99efdb3..a145e47 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,5 +1,6 @@ -use actix_web::web::Bytes; -use futures::stream::{LocalBoxStream, Stream}; +use crate::error::UploadError; +use actix_web::web::{Bytes, BytesMut}; +use futures_core::stream::Stream; use std::{ future::Future, pin::Pin, @@ -7,9 +8,8 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; -pub(crate) struct ReadAdapter { - inner: S, -} +pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; +pub(crate) type LocalBoxFuture<'a, T> = Pin + 'a>>; pub(crate) struct Process { child: tokio::process::Child, @@ -21,10 +21,14 @@ pub(crate) struct ProcessRead { err_closed: bool, } -pub(crate) struct ProcessSinkStream { - stream: LocalBoxStream<'static, Result>, +struct BytesFreezer(S); + +pub(crate) struct Once { + inner: Option, } +pub(crate) struct Next<'a, S>(&'a mut S); + impl Process { fn new(child: tokio::process::Child) -> Self { Process { child } @@ -79,28 +83,21 @@ impl Process { } } -impl AsyncRead for ReadAdapter -where - S: Stream> + Unpin, - E: Into>, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(Ok(bytes))) => { - buf.put_slice(&bytes); - Poll::Ready(Ok(())) - } - Poll::Ready(None) => Poll::Ready(Ok(())), - Poll::Ready(Some(Err(e))) => { - Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))) - } - Poll::Pending => Poll::Pending, - } - } +pub(crate) fn bytes_stream( + input: impl AsyncRead + Unpin, +) -> impl Stream> + Unpin { + BytesFreezer(tokio_util::codec::FramedRead::new( + input, + tokio_util::codec::BytesCodec::new(), + )) +} + +pub(crate) fn once(input: T) -> Once { + Once { inner: Some(input) } +} + +pub(crate) fn next<'a, S>(stream: &'a mut S) -> Next<'a, S> { + Next(stream) } impl AsyncRead for ProcessRead @@ -129,10 +126,48 @@ where } } -impl Stream for ProcessSinkStream { - type Item = Result; +impl Stream for BytesFreezer +where + S: Stream> + Unpin, +{ + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) + Pin::new(&mut self.0) + .poll_next(cx) + .map(|opt| opt.map(|res| res.map(|bytes_mut| bytes_mut.freeze()))) + .map_err(UploadError::from) + } +} + +impl Stream for Once +where + T: Future + Unpin, +{ + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(mut fut) = self.inner.take() { + match Pin::new(&mut fut).poll(cx) { + Poll::Ready(item) => Poll::Ready(Some(item)), + Poll::Pending => { + self.inner = Some(fut); + Poll::Pending + } + } + } else { + Poll::Ready(None) + } + } +} + +impl<'a, S> Future for Next<'a, S> +where + S: Stream + Unpin, +{ + type Output = Option<::Item>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll_next(cx) } } diff --git a/src/upload_manager.rs b/src/upload_manager.rs index f7a66fd..7168283 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -2,10 +2,10 @@ use crate::{ config::Format, error::UploadError, migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb}, + stream::{next, LocalBoxStream}, to_ext, }; use actix_web::web; -use futures::stream::{Stream, StreamExt, TryStreamExt}; use sha2::Digest; use std::{ path::PathBuf, @@ -13,7 +13,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; use tracing::{debug, error, info, instrument, warn, Span}; // TREE STRUCTURE @@ -92,7 +92,7 @@ impl std::fmt::Debug for UploadManager { } } -type UploadStream = Pin>>>; +type UploadStream = LocalBoxStream<'static, Result>; #[derive(Clone)] pub(crate) struct Serde { @@ -247,7 +247,7 @@ impl UploadManager { root_dir.push("files"); // Ensure file dir exists - actix_fs::create_dir_all(root_dir.clone()).await?; + tokio::fs::create_dir_all(&root_dir).await?; Ok(UploadManager { inner: Arc::new(UploadManagerInner { @@ -547,7 +547,7 @@ impl UploadManager { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { + while let Some(res) = next(&mut stream).await { let bytes = res?; bytes_mut.extend_from_slice(&bytes); } @@ -582,7 +582,7 @@ impl UploadManager { let mut bytes_mut = actix_web::web::BytesMut::new(); debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { + while let Some(res) = next(&mut stream).await { let bytes = res?; bytes_mut.extend_from_slice(&bytes); } @@ -638,7 +638,7 @@ impl UploadManager { let mut errors = Vec::new(); debug!("Deleting {:?}", path); - if let Err(e) = actix_fs::remove_file(path).await { + if let Err(e) = tokio::fs::remove_file(path).await { errors.push(e.into()); } @@ -767,8 +767,8 @@ impl UploadManager { path.push(filename.clone()); - if let Err(e) = actix_fs::metadata(path).await { - if e.kind() == Some(std::io::ErrorKind::NotFound) { + if let Err(e) = tokio::fs::metadata(path).await { + if e.kind() == std::io::ErrorKind::NotFound { debug!("Generated unused filename {}", filename); return Ok(filename); } @@ -904,12 +904,12 @@ pub(crate) async fn safe_save_reader( ) -> Result<(), UploadError> { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + tokio::fs::create_dir_all(path.to_owned()).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = actix_fs::metadata(to.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = tokio::fs::metadata(to.clone()).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -928,7 +928,7 @@ pub(crate) async fn safe_save_reader( #[instrument(skip(stream))] pub(crate) async fn safe_save_stream( to: PathBuf, - stream: UploadStream, + mut stream: UploadStream, ) -> Result<(), UploadError> where UploadError: From, @@ -936,12 +936,12 @@ where { if let Some(path) = to.parent() { debug!("Creating directory {:?}", path); - actix_fs::create_dir_all(path.to_owned()).await?; + tokio::fs::create_dir_all(path).await?; } debug!("Checking if {:?} already exists", to); - if let Err(e) = actix_fs::metadata(to.clone()).await { - if e.kind() != Some(std::io::ErrorKind::NotFound) { + if let Err(e) = tokio::fs::metadata(&to).await { + if e.kind() != std::io::ErrorKind::NotFound { return Err(e.into()); } } else { @@ -950,16 +950,30 @@ where debug!("Writing stream to {:?}", to); - let file = actix_fs::file::create(to).await?; + let to1 = to.clone(); + let fut = async move { + let mut file = tokio::fs::File::create(to1).await?; - actix_fs::file::write_stream_faster(file, stream.map_err(UploadError::from)).await?; + while let Some(res) = next(&mut stream).await { + let mut bytes = res?; + file.write_all_buf(&mut bytes).await?; + } + + Ok(()) + }; + + if let Err(e) = fut.await { + error!("Failed to save file: {}", e); + let _ = tokio::fs::remove_file(to).await; + return Err(e); + } Ok(()) } async fn remove_path(path: sled::IVec) -> Result<(), UploadError> { let path_string = String::from_utf8(path.to_vec())?; - actix_fs::remove_file(path_string).await?; + tokio::fs::remove_file(path_string).await?; Ok(()) }