Compare commits

...

3 commits

Author SHA1 Message Date
asonix 4bb3bad703 Prepare 0.5.12
All checks were successful
/ check (aarch64-unknown-linux-musl) (push) Successful in 1m54s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 1m55s
/ check (x86_64-unknown-linux-musl) (push) Successful in 2m10s
/ clippy (push) Successful in 2m16s
/ tests (push) Successful in 1m59s
/ publish-docker (push) Successful in 14s
/ build (map[artifact:linux-amd64 platform:linux/amd64 target:x86_64-unknown-linux-musl]) (push) Successful in 3m39s
/ build (map[artifact:linux-arm32v7 platform:linux/arm/v7 target:armv7-unknown-linux-musleabihf]) (push) Successful in 3m55s
/ build (map[artifact:linux-arm64v8 platform:linux/arm64 target:aarch64-unknown-linux-musl]) (push) Successful in 3m53s
/ publish-forgejo (push) Successful in 15s
/ publish-crate (push) Successful in 1m52s
2024-04-05 13:05:16 -05:00
asonix 4021458be8 Prevent divided-by-zero for empty BytesStreams 2024-04-05 12:57:40 -05:00
asonix eca3697410 Add panic boundaries for background jobs 2024-04-05 12:57:32 -05:00
6 changed files with 97 additions and 33 deletions

2
Cargo.lock generated
View file

@ -1819,7 +1819,7 @@ dependencies = [
[[package]]
name = "pict-rs"
version = "0.5.11"
version = "0.5.12"
dependencies = [
"actix-form-data",
"actix-web",

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.11"
version = "0.5.12"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"

View file

@ -11,7 +11,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.11";
version = "0.5.12";
src = ./.;
cargoLock = {

46
releases/0.5.12.md Normal file
View file

@ -0,0 +1,46 @@
# pict-rs 0.5.12
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
to fail to process media.
### Fixes
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
## Upgrade Notes
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
version of pict-rs.
## Descriptions
### Panic Handling in Background Jobs
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
in background jobs.
Previously, a panic in a background job would bring down that thread's job processor, which resulted
in future jobs never being processed. Now job processing should properly continue after panics
occur.
### BytesStream Divide-by-Zero
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
0.5.11 this would end up killing the background jobs processor.
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
greater.

View file

@ -35,7 +35,7 @@ impl BytesStream {
tracing::debug!(
"BytesStream with {} chunks, avg length {}",
bs.chunks_len(),
bs.len() / bs.chunks_len()
bs.len() / bs.chunks_len().max(1)
);
Ok(bs)

View file

@ -11,9 +11,11 @@ use crate::{
use std::{
ops::Deref,
rc::Rc,
sync::Arc,
time::{Duration, Instant},
};
use tokio::task::JoinError;
use tracing::Instrument;
pub(crate) mod cleanup;
@ -297,54 +299,66 @@ where
}
}
fn job_result(result: &JobResult) -> crate::repo::JobResult {
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
match result {
Ok(()) => crate::repo::JobResult::Success,
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
Ok(Ok(())) => crate::repo::JobResult::Success,
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
Err(_) => crate::repo::JobResult::Aborted,
}
}
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
let worker_id = uuid::Uuid::new_v4();
let state = Rc::new(state);
loop {
tracing::trace!("process_jobs: looping");
crate::sync::cooperate().await;
let res = job_loop(&state, worker_id, queue, callback)
.with_poll_timer("job-loop")
.await;
// add a panic boundary by spawning a task
let res = crate::sync::spawn(
"job-loop",
job_loop(state.clone(), worker_id, queue, callback),
)
.await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
match res {
// clean exit
Ok(Ok(())) => break,
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
// job error
Ok(Err(e)) => {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
continue;
// job panic
Err(_) => {
tracing::warn!("Panic while processing jobs");
}
}
break;
}
}
async fn job_loop<S, F>(
state: &State<S>,
state: Rc<State<S>>,
worker_id: uuid::Uuid,
queue: &'static str,
callback: F,
) -> Result<(), Error>
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
loop {
tracing::trace!("job_loop: looping");
@ -360,14 +374,18 @@ where
let guard = MetricsGuard::guard(worker_id, queue);
let res = heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(state, job),
)
.with_poll_timer("job-and-heartbeat")
let state2 = state.clone();
let res = crate::sync::spawn("job-and-heartbeat", async move {
let state = state2;
heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(&state, job),
)
.await
})
.await;
state
@ -376,7 +394,7 @@ where
.with_poll_timer("job-complete")
.await?;
res?;
res.map_err(|_| UploadError::Canceled)??;
guard.disarm();