diff --git a/Cargo.lock b/Cargo.lock index 722502e..6b0622c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1837,6 +1837,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "mime", + "nanorand", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", diff --git a/Cargo.toml b/Cargo.toml index a0dc1f4..944d51f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ strip = true default = [] io-uring = ["dep:tokio-uring", "sled/io_uring", "actix-web/experimental-io-uring"] poll-timer-warnings = [] +random-errors = ["dep:nanorand"] [dependencies] actix-form-data = "0.7.0-beta.6" @@ -40,6 +41,7 @@ md-5 = "0.10.5" metrics = "0.22.0" metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] } mime = "0.3.1" +nanorand = { version = "0.7", optional = true } opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] } opentelemetry = "0.22" opentelemetry-otlp = "0.15" diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index fdb534c..4584983 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -59,12 +59,12 @@ impl BytesStream { } pub(crate) fn into_io_stream(self) -> impl Stream> { - streem::from_fn(move |yielder| async move { + crate::stream::error_injector(streem::from_fn(move |yielder| async move { for bytes in self { crate::sync::cooperate().await; yielder.yield_ok(bytes).await; } - }) + })) } } diff --git a/src/error.rs b/src/error.rs index 2350074..5529499 100644 --- a/src/error.rs +++ b/src/error.rs @@ -167,6 +167,10 @@ pub(crate) enum UploadError { #[error("Failed external validation")] FailedExternalValidation, + + #[cfg(feature = "random-errors")] + #[error("Randomly generated error for testing purposes")] + RandomError, } impl UploadError { @@ -205,6 +209,8 @@ impl UploadError { Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT, Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB, + #[cfg(feature = "random-errors")] + Self::RandomError => ErrorCode::RANDOM_ERROR, } } diff --git a/src/error_code.rs b/src/error_code.rs index 462a284..9e9b936 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -147,4 +147,8 @@ impl ErrorCode { pub(crate) const INVALID_JOB: ErrorCode = ErrorCode { code: "invalid-job", }; + #[cfg(feature = "random-errors")] + pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode { + code: "random-error", + }; } diff --git a/src/future.rs b/src/future.rs index 5bbf9ec..e553491 100644 --- a/src/future.rs +++ b/src/future.rs @@ -167,11 +167,18 @@ where #[cfg(not(feature = "poll-timer-warnings"))] tracing::debug!("Future {} polled for {} ms", this.name, elapsed.as_millis()); } else if elapsed > Duration::from_micros(200) { + #[cfg(feature = "poll-timer-warnings")] tracing::debug!( "Future {} polled for {} microseconds", this.name, elapsed.as_micros(), ); + #[cfg(not(feature = "poll-timer-warnings"))] + tracing::trace!( + "Future {} polled for {} microseconds", + this.name, + elapsed.as_micros(), + ); } else if elapsed > Duration::from_micros(1) { tracing::trace!( "Future {} polled for {} microseconds", diff --git a/src/lib.rs b/src/lib.rs index 7a83903..8d3532a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1914,6 +1914,11 @@ impl PictRsConfiguration { /// } /// ``` pub async fn run(self) -> color_eyre::Result<()> { + #[cfg(feature = "random-errors")] + tracing::error!("pict-rs has been compiled with with the 'random-errors' feature enabled."); + #[cfg(feature = "random-errors")] + tracing::error!("This is not suitable for production environments"); + let PictRsConfiguration { config, operation } = self; // describe all the metrics pict-rs produces diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index aa94291..39b784a 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -23,6 +23,15 @@ where Box::pin(async move { let job_text = format!("{job}"); + #[cfg(feature = "random-errors")] + { + use nanorand::Rng; + + if nanorand::tls_rng().generate_range(0..25) < 1 { + return Err(crate::error::UploadError::RandomError).retry(); + } + } + let job = serde_json::from_value(job) .map_err(|e| UploadError::InvalidJob(e, job_text)) .abort()?; diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index b66ab99..faa5792 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1548,14 +1548,14 @@ impl QueueRepo for PostgresRepo { let mut conn = self.get_connection().await?; - if matches!(job_status, JobResult::Failure) { + let count = if matches!(job_status, JobResult::Failure) { diesel::update(job_queue) .filter( id.eq(job_id.0) .and(queue.eq(queue_name)) .and(worker.eq(worker_id)), ) - .set(retry.eq(retry - 1)) + .set((retry.eq(retry - 1), worker.eq(Option::::None))) .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_RETRY) .with_timeout(Duration::from_secs(5)) @@ -1564,18 +1564,13 @@ impl QueueRepo for PostgresRepo { .map_err(PostgresError::Diesel)?; diesel::delete(job_queue) - .filter( - id.eq(job_id.0) - .and(queue.eq(queue_name)) - .and(worker.eq(worker_id)) - .and(retry.le(0)), - ) + .filter(id.eq(job_id.0).and(retry.le(0))) .execute(&mut conn) .with_metrics(crate::init_metrics::POSTGRES_QUEUE_CLEANUP) .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map_err(PostgresError::Diesel)? } else { diesel::delete(job_queue) .filter( @@ -1588,7 +1583,20 @@ impl QueueRepo for PostgresRepo { .with_timeout(Duration::from_secs(5)) .await .map_err(|_| PostgresError::DbTimeout)? - .map_err(PostgresError::Diesel)?; + .map_err(PostgresError::Diesel)? + }; + + match job_status { + JobResult::Success => tracing::debug!("completed {job_id:?}"), + JobResult::Failure if count == 0 => { + tracing::info!("{job_id:?} failed, marked for retry") + } + JobResult::Failure => tracing::warn!("{job_id:?} failed permantently"), + JobResult::Aborted => tracing::warn!("{job_id:?} dead"), + } + + if count > 0 { + tracing::debug!("Deleted {count} jobs"); } Ok(()) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ff67e8b..6042116 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -899,15 +899,25 @@ impl QueueRepo for SledRepo { job_retries.remove(&key[..])?; } - Ok(()) + Ok(retry_count > 0 && retry) }, ) }) .await .map_err(|_| RepoError::Canceled)?; - if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res { - return Err(RepoError::from(SledError::from(e))); + match res { + Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => { + return Err(RepoError::from(SledError::from(e))); + } + Ok(retried) => match job_status { + JobResult::Success => tracing::debug!("completed {job_id:?}"), + JobResult::Failure if retried => { + tracing::info!("{job_id:?} failed, marked for retry") + } + JobResult::Failure => tracing::warn!("{job_id:?} failed permantently"), + JobResult::Aborted => tracing::warn!("{job_id:?} dead"), + }, } Ok(()) diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 1f04fd5..84a7456 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -62,7 +62,10 @@ impl Store for FileStore { { let path = self.next_file(extension); - if let Err(e) = self.safe_save_stream(&path, stream).await { + if let Err(e) = self + .safe_save_stream(&path, crate::stream::error_injector(stream)) + .await + { self.safe_remove_file(&path).await?; return Err(e.into()); } @@ -95,7 +98,7 @@ impl Store for FileStore { .instrument(file_span) .await?; - Ok(Box::pin(stream)) + Ok(Box::pin(crate::stream::error_injector(stream))) } #[tracing::instrument(skip(self))] diff --git a/src/store/object_store.rs b/src/store/object_store.rs index e152bd5..c142948 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -216,7 +216,11 @@ impl Store for ObjectStore { S: Stream>, { match self - .start_upload(stream, content_type.clone(), extension) + .start_upload( + crate::stream::error_injector(stream), + content_type.clone(), + extension, + ) .await? { UploadState::Single(first_chunk) => { @@ -306,9 +310,11 @@ impl Store for ObjectStore { return Err(status_error(response, Some(identifier.clone())).await); } - Ok(Box::pin(crate::stream::metrics( - crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST_STREAM, - crate::stream::map_err(response.bytes_stream(), payload_to_io_error), + Ok(Box::pin(crate::stream::error_injector( + crate::stream::metrics( + crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST_STREAM, + crate::stream::map_err(response.bytes_stream(), payload_to_io_error), + ), ))) } diff --git a/src/stream.rs b/src/stream.rs index 9482de3..3235415 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,6 +5,38 @@ use streem::IntoStreamer; use crate::future::WithMetrics; +#[cfg(not(feature = "random-errors"))] +pub(crate) fn error_injector( + stream: impl Stream>, +) -> impl Stream> { + stream +} + +#[cfg(feature = "random-errors")] +pub(crate) fn error_injector( + stream: impl Stream>, +) -> impl Stream> { + streem::try_from_fn(|yielder| async move { + let stream = std::pin::pin!(stream); + let mut streamer = stream.into_streamer(); + + while let Some(item) = streamer.try_next().await? { + yielder.yield_ok(item).await; + + use nanorand::Rng; + + if nanorand::tls_rng().generate_range(0..1000) < 1 { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + crate::error::UploadError::RandomError, + )); + } + } + + Ok(()) + }) +} + pub(crate) fn take(stream: S, amount: usize) -> impl Stream where S: Stream,