Compare commits
3 commits
d41fca5b6c
...
4bb3bad703
Author | SHA1 | Date | |
---|---|---|---|
asonix | 4bb3bad703 | ||
asonix | 4021458be8 | ||
asonix | eca3697410 |
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1819,7 +1819,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pict-rs"
|
name = "pict-rs"
|
||||||
version = "0.5.11"
|
version = "0.5.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-form-data",
|
"actix-form-data",
|
||||||
"actix-web",
|
"actix-web",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "pict-rs"
|
name = "pict-rs"
|
||||||
description = "A simple image hosting service"
|
description = "A simple image hosting service"
|
||||||
version = "0.5.11"
|
version = "0.5.12"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
|
|
||||||
rustPlatform.buildRustPackage {
|
rustPlatform.buildRustPackage {
|
||||||
pname = "pict-rs";
|
pname = "pict-rs";
|
||||||
version = "0.5.11";
|
version = "0.5.12";
|
||||||
src = ./.;
|
src = ./.;
|
||||||
|
|
||||||
cargoLock = {
|
cargoLock = {
|
||||||
|
|
46
releases/0.5.12.md
Normal file
46
releases/0.5.12.md
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
# pict-rs 0.5.12
|
||||||
|
|
||||||
|
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||||
|
animations, and videos, as well as providing basic image processing functionality.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
|
||||||
|
to fail to process media.
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
|
||||||
|
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
|
||||||
|
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
|
||||||
|
|
||||||
|
|
||||||
|
## Upgrade Notes
|
||||||
|
|
||||||
|
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
|
||||||
|
version of pict-rs.
|
||||||
|
|
||||||
|
|
||||||
|
## Descriptions
|
||||||
|
|
||||||
|
### Panic Handling in Background Jobs
|
||||||
|
|
||||||
|
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
|
||||||
|
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
|
||||||
|
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
|
||||||
|
in background jobs.
|
||||||
|
|
||||||
|
Previously, a panic in a background job would bring down that thread's job processor, which resulted
|
||||||
|
in future jobs never being processed. Now job processing should properly continue after panics
|
||||||
|
occur.
|
||||||
|
|
||||||
|
|
||||||
|
### BytesStream Divide-by-Zero
|
||||||
|
|
||||||
|
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
|
||||||
|
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
|
||||||
|
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
|
||||||
|
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
|
||||||
|
0.5.11 this would end up killing the background jobs processor.
|
||||||
|
|
||||||
|
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
|
||||||
|
greater.
|
|
@ -35,7 +35,7 @@ impl BytesStream {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
"BytesStream with {} chunks, avg length {}",
|
"BytesStream with {} chunks, avg length {}",
|
||||||
bs.chunks_len(),
|
bs.chunks_len(),
|
||||||
bs.len() / bs.chunks_len()
|
bs.len() / bs.chunks_len().max(1)
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(bs)
|
Ok(bs)
|
||||||
|
|
76
src/queue.rs
76
src/queue.rs
|
@ -11,9 +11,11 @@ use crate::{
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
|
rc::Rc,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
use tokio::task::JoinError;
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
|
|
||||||
pub(crate) mod cleanup;
|
pub(crate) mod cleanup;
|
||||||
|
@ -297,54 +299,66 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn job_result(result: &JobResult) -> crate::repo::JobResult {
|
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
|
||||||
match result {
|
match result {
|
||||||
Ok(()) => crate::repo::JobResult::Success,
|
Ok(Ok(())) => crate::repo::JobResult::Success,
|
||||||
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
|
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
|
||||||
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
|
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
|
||||||
|
Err(_) => crate::repo::JobResult::Aborted,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
||||||
where
|
where
|
||||||
S: Store,
|
S: Store + 'static,
|
||||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||||
{
|
{
|
||||||
let worker_id = uuid::Uuid::new_v4();
|
let worker_id = uuid::Uuid::new_v4();
|
||||||
|
let state = Rc::new(state);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tracing::trace!("process_jobs: looping");
|
tracing::trace!("process_jobs: looping");
|
||||||
|
|
||||||
crate::sync::cooperate().await;
|
crate::sync::cooperate().await;
|
||||||
|
|
||||||
let res = job_loop(&state, worker_id, queue, callback)
|
// add a panic boundary by spawning a task
|
||||||
.with_poll_timer("job-loop")
|
let res = crate::sync::spawn(
|
||||||
.await;
|
"job-loop",
|
||||||
|
job_loop(state.clone(), worker_id, queue, callback),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
if let Err(e) = res {
|
match res {
|
||||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
// clean exit
|
||||||
tracing::warn!("{}", format!("{e:?}"));
|
Ok(Ok(())) => break,
|
||||||
|
|
||||||
if e.is_disconnected() {
|
// job error
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
Ok(Err(e)) => {
|
||||||
|
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||||
|
tracing::warn!("{}", format!("{e:?}"));
|
||||||
|
|
||||||
|
if e.is_disconnected() {
|
||||||
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
continue;
|
// job panic
|
||||||
|
Err(_) => {
|
||||||
|
tracing::warn!("Panic while processing jobs");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn job_loop<S, F>(
|
async fn job_loop<S, F>(
|
||||||
state: &State<S>,
|
state: Rc<State<S>>,
|
||||||
worker_id: uuid::Uuid,
|
worker_id: uuid::Uuid,
|
||||||
queue: &'static str,
|
queue: &'static str,
|
||||||
callback: F,
|
callback: F,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
S: Store,
|
S: Store + 'static,
|
||||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||||
{
|
{
|
||||||
loop {
|
loop {
|
||||||
tracing::trace!("job_loop: looping");
|
tracing::trace!("job_loop: looping");
|
||||||
|
@ -360,14 +374,18 @@ where
|
||||||
|
|
||||||
let guard = MetricsGuard::guard(worker_id, queue);
|
let guard = MetricsGuard::guard(worker_id, queue);
|
||||||
|
|
||||||
let res = heartbeat(
|
let state2 = state.clone();
|
||||||
&state.repo,
|
let res = crate::sync::spawn("job-and-heartbeat", async move {
|
||||||
queue,
|
let state = state2;
|
||||||
worker_id,
|
heartbeat(
|
||||||
job_id,
|
&state.repo,
|
||||||
(callback)(state, job),
|
queue,
|
||||||
)
|
worker_id,
|
||||||
.with_poll_timer("job-and-heartbeat")
|
job_id,
|
||||||
|
(callback)(&state, job),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
state
|
state
|
||||||
|
@ -376,7 +394,7 @@ where
|
||||||
.with_poll_timer("job-complete")
|
.with_poll_timer("job-complete")
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
res?;
|
res.map_err(|_| UploadError::Canceled)??;
|
||||||
|
|
||||||
guard.disarm();
|
guard.disarm();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue