Add timeout to media processing, ingest
All checks were successful
continuous-integration/drone/push Build is passing

slightly improve into_bytes performance
This commit is contained in:
asonix 2023-12-10 18:54:13 -06:00
parent 4c16016446
commit b35716254b
4 changed files with 57 additions and 10 deletions

View file

@ -32,14 +32,20 @@ impl BytesStream {
self.total_len
}
pub(crate) fn into_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.total_len);
pub(crate) fn into_bytes(mut self) -> Bytes {
match self.inner.len() {
0 => Bytes::new(),
1 => self.inner.pop_back().expect("one element"),
_ => {
let mut buf = BytesMut::with_capacity(self.total_len);
for bytes in self.inner {
buf.extend_from_slice(&bytes);
for bytes in self.inner {
buf.extend(bytes);
}
buf.freeze()
}
}
buf.freeze()
}
}

View file

@ -136,6 +136,9 @@ pub(crate) enum UploadError {
#[error("Client took too long to send request")]
AggregateTimeout,
#[error("Media processing took too long")]
ProcessTimeout,
#[error("Failed external validation")]
ExternalValidation,
}

View file

@ -8,7 +8,7 @@ use crate::{
store::Store,
};
use actix_web::web::Bytes;
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};
use tokio::io::AsyncReadExt;
use tracing::Instrument;
@ -39,8 +39,12 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
hash.clone(),
);
let (details, bytes) =
CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut).await?;
let (details, bytes) = actix_rt::time::timeout(
Duration::from_secs(timeout * 4),
CancelSafeProcessor::new(hash.as_ref(), thumbnail_path, process_fut),
)
.await
.map_err(|_| UploadError::ProcessTimeout)??;
Ok((details, bytes))
}

View file

@ -63,7 +63,41 @@ where
R: FullRepo + 'static,
S: Store,
{
let bytes = tokio::time::timeout(Duration::from_secs(60), aggregate(stream))
actix_rt::time::timeout(
Duration::from_secs(timeout * 4 + 60),
perform_ingest(
repo,
store,
client,
stream,
declared_alias,
external_validation,
external_validation_timeout,
should_validate,
timeout,
),
)
.await
.map_err(|_| UploadError::ProcessTimeout)?
}
#[allow(clippy::too_many_arguments)]
async fn perform_ingest<R, S>(
repo: &R,
store: &S,
client: &ClientWithMiddleware,
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>,
external_validation: Option<&Url>,
external_validation_timeout: u64,
should_validate: bool,
timeout: u64,
) -> Result<Session<R, S>, Error>
where
R: FullRepo + 'static,
S: Store,
{
let bytes = actix_rt::time::timeout(Duration::from_secs(60), aggregate(stream))
.await
.map_err(|_| UploadError::AggregateTimeout)??;