pict-rs/src/generate.rs

349 lines
10 KiB
Rust
Raw Normal View History

mod ffmpeg;
mod magick;
use crate::{
details::Details,
error::{Error, UploadError},
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
future::{WithMetrics, WithPollTimer, WithTimeout},
repo::{Hash, VariantAlreadyExists},
state::State,
store::Store,
};
2024-02-22 22:03:49 +00:00
use std::{
2024-03-30 14:36:31 +00:00
future::Future,
sync::Arc,
time::{Duration, Instant},
};
2023-12-23 02:54:02 +00:00
2022-04-07 17:56:40 +00:00
use tracing::Instrument;
2023-07-22 21:47:59 +00:00
struct MetricsGuard {
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard() -> Self {
2024-02-04 20:26:18 +00:00
metrics::counter!(crate::init_metrics::GENERATE_START).increment(1);
2023-07-22 21:47:59 +00:00
Self {
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
2024-02-04 20:26:18 +00:00
metrics::histogram!(crate::init_metrics::GENERATE_DURATION, "completed" => (!self.armed).to_string())
.record(self.start.elapsed().as_secs_f64());
2024-02-04 20:26:18 +00:00
metrics::counter!(crate::init_metrics::GENERATE_END, "completed" => (!self.armed).to_string())
.increment(1);
2023-07-22 21:47:59 +00:00
}
}
2024-03-30 14:36:31 +00:00
#[tracing::instrument(skip(state, original_details, hash))]
pub(crate) async fn generate<S: Store + 'static>(
state: &State<S>,
format: InputProcessableFormat,
variant: String,
2024-03-31 21:23:34 +00:00
variant_args: Vec<String>,
original_details: &Details,
2023-08-14 19:25:19 +00:00
hash: Hash,
2024-02-22 22:02:33 +00:00
) -> Result<(Details, Arc<str>), Error> {
if state.config.server.danger_dummy_mode {
let identifier = state
.repo
2023-11-11 20:22:12 +00:00
.identifier(hash)
.await?
.ok_or(UploadError::MissingIdentifier)?;
2024-02-22 22:02:33 +00:00
Ok((original_details.clone(), identifier))
2023-11-11 20:22:12 +00:00
} else {
2024-03-30 14:36:31 +00:00
let mut attempts = 0;
2024-03-30 19:11:12 +00:00
let tup = loop {
if attempts > 2 {
return Err(UploadError::ProcessTimeout.into());
2024-03-30 14:36:31 +00:00
}
match state
.repo
.claim_variant_processing_rights(hash.clone(), variant.clone())
.await?
{
Ok(()) => {
// process
let process_future = process(
state,
format,
variant.clone(),
2024-03-31 21:23:34 +00:00
variant_args,
2024-03-30 14:36:31 +00:00
original_details,
hash.clone(),
)
.with_poll_timer("process-future");
2024-03-30 19:11:12 +00:00
let res = heartbeat(state, hash.clone(), variant.clone(), process_future)
2024-03-30 14:36:31 +00:00
.with_poll_timer("heartbeat-future")
2024-03-31 21:34:50 +00:00
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
.await
.map_err(|_| Error::from(UploadError::ProcessTimeout));
2024-03-30 19:11:12 +00:00
state
.repo
.notify_variant(hash.clone(), variant.clone())
.await?;
2024-03-31 21:34:50 +00:00
break res???;
2024-03-30 19:11:12 +00:00
}
Err(mut entry) => {
let notified = entry.notified_timeout(Duration::from_secs(20));
if let Some(identifier) = state
2024-03-30 19:11:12 +00:00
.repo
.variant_identifier(hash.clone(), variant.clone())
2024-03-30 19:11:12 +00:00
.await?
{
let details = crate::ensure_details_identifier(state, &identifier).await?;
2024-03-31 21:34:50 +00:00
break (details, identifier);
}
match notified.await {
Ok(()) => tracing::debug!("notified"),
2024-03-31 21:34:50 +00:00
Err(_) => tracing::debug!("timeout"),
2024-03-30 14:36:31 +00:00
}
attempts += 1;
2024-03-30 19:11:12 +00:00
}
2024-03-30 14:36:31 +00:00
}
};
2023-11-11 20:22:12 +00:00
2024-03-30 19:11:12 +00:00
Ok(tup)
2023-11-11 20:22:12 +00:00
}
}
2024-03-30 14:36:31 +00:00
async fn heartbeat<S, O>(
state: &State<S>,
hash: Hash,
variant: String,
future: impl Future<Output = O>,
) -> Result<O, Error> {
let repo = state.repo.clone();
let handle = crate::sync::abort_on_drop(crate::sync::spawn("heartbeat-task", async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = repo.variant_heartbeat(hash.clone(), variant.clone()).await {
break Error::from(e);
}
}
}));
let future = std::pin::pin!(future);
tokio::select! {
biased;
output = future => {
Ok(output)
}
res = handle => {
Err(res.map_err(|_| UploadError::Canceled)?)
}
}
}
#[tracing::instrument(skip(state, hash))]
async fn process<S: Store + 'static>(
state: &State<S>,
output_format: InputProcessableFormat,
2024-03-30 14:36:31 +00:00
variant: String,
2024-03-31 21:23:34 +00:00
variant_args: Vec<String>,
original_details: &Details,
2023-08-14 19:25:19 +00:00
hash: Hash,
2024-02-22 22:02:33 +00:00
) -> Result<(Details, Arc<str>), Error> {
2023-07-22 21:47:59 +00:00
let guard = MetricsGuard::guard();
2023-09-24 20:32:00 +00:00
let permit = crate::process_semaphore().acquire().await?;
let identifier = input_identifier(state, output_format, hash.clone(), original_details).await?;
let input_details = crate::ensure_details_identifier(state, &identifier).await?;
let input_format = input_details
.internal_format()
2023-08-16 17:36:18 +00:00
.processable_format()
.expect("Already verified format is processable");
let format = input_format.process_to(output_format);
let quality = match format {
ProcessableFormat::Image(format) => state.config.media.image.quality_for(format),
ProcessableFormat::Animation(format) => state.config.media.animation.quality_for(format),
};
let stream = state.store.to_stream(&identifier, None, None).await?;
2023-12-23 17:58:20 +00:00
2024-02-25 01:07:48 +00:00
let bytes =
2024-03-31 21:23:34 +00:00
crate::magick::process_image_command(state, variant_args, input_format, format, quality)
2024-02-25 01:07:48 +00:00
.await?
.drive_with_stream(stream)
.into_bytes_stream()
.instrument(tracing::info_span!(
"Reading processed image to BytesStream"
))
.await?;
drop(permit);
2024-02-22 22:02:33 +00:00
let details = Details::from_bytes_stream(state, bytes.clone()).await?;
let identifier = state
.store
2024-02-28 02:41:25 +00:00
.save_stream(
bytes.into_io_stream(),
details.media_type(),
Some(details.file_extension()),
)
.await?;
2023-08-16 20:12:16 +00:00
2024-03-30 14:36:31 +00:00
let identifier = if let Err(VariantAlreadyExists) = state
.repo
2024-03-30 14:36:31 +00:00
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
2023-08-16 20:12:16 +00:00
.await?
{
state.store.remove(&identifier).await?;
2024-03-30 14:36:31 +00:00
state
.repo
.variant_identifier(hash, variant)
.await?
.ok_or(UploadError::MissingIdentifier)?
} else {
state.repo.relate_details(&identifier, &details).await?;
identifier
};
2023-07-22 21:47:59 +00:00
guard.disarm();
2024-02-22 22:02:33 +00:00
Ok((details, identifier)) as Result<(Details, Arc<str>), Error>
}
2024-02-24 04:12:19 +00:00
pub(crate) async fn ensure_motion_identifier<S>(
state: &State<S>,
hash: Hash,
original_details: &Details,
) -> Result<Arc<str>, Error>
where
S: Store + 'static,
{
if let Some(identifier) = state.repo.motion_identifier(hash.clone()).await? {
return Ok(identifier);
};
let identifier = state
.repo
.identifier(hash.clone())
.await?
.ok_or(UploadError::MissingIdentifier)?;
2024-02-28 02:41:25 +00:00
let (reader, media_type, file_extension) =
2024-02-25 01:27:34 +00:00
if let Some(processable_format) = original_details.internal_format().processable_format() {
let thumbnail_format = state.config.media.image.format.unwrap_or(ImageFormat::Webp);
let stream = state.store.to_stream(&identifier, None, None).await?;
let process =
magick::thumbnail_command(state, processable_format, thumbnail_format).await?;
(
process.drive_with_stream(stream),
thumbnail_format.media_type(),
2024-02-28 02:41:25 +00:00
thumbnail_format.file_extension(),
2024-02-25 01:27:34 +00:00
)
} else {
let thumbnail_format = match state.config.media.image.format {
Some(ImageFormat::Webp | ImageFormat::Avif | ImageFormat::Jxl) => {
ffmpeg::ThumbnailFormat::Webp
}
Some(ImageFormat::Png) => ffmpeg::ThumbnailFormat::Png,
Some(ImageFormat::Jpeg) | None => ffmpeg::ThumbnailFormat::Jpeg,
};
let reader = ffmpeg::thumbnail(
state,
identifier,
original_details
.video_format()
.unwrap_or(InternalVideoFormat::Mp4),
thumbnail_format,
)
.await?;
2024-02-24 04:12:19 +00:00
2024-02-28 02:41:25 +00:00
(
reader,
thumbnail_format.media_type(),
thumbnail_format.file_extension(),
)
2024-02-24 04:12:19 +00:00
};
let motion_identifier = reader
2024-02-28 02:41:25 +00:00
.with_stdout(|stdout| async {
state
.store
2024-03-10 04:53:46 +00:00
.save_stream(
tokio_util::io::ReaderStream::with_capacity(stdout, 1024 * 64),
media_type,
Some(file_extension),
)
2024-02-28 02:41:25 +00:00
.await
})
2024-02-24 04:12:19 +00:00
.await??;
state
.repo
.relate_motion_identifier(hash, &motion_identifier)
.await?;
2024-02-25 01:36:29 +00:00
Ok(motion_identifier)
2024-02-24 04:12:19 +00:00
}
#[tracing::instrument(skip_all)]
async fn input_identifier<S>(
state: &State<S>,
output_format: InputProcessableFormat,
hash: Hash,
original_details: &Details,
) -> Result<Arc<str>, Error>
where
S: Store + 'static,
{
let should_thumbnail =
if let Some(input_format) = original_details.internal_format().processable_format() {
let output_format = input_format.process_to(output_format);
input_format.should_thumbnail(output_format)
} else {
// video case
true
};
if should_thumbnail {
2024-02-24 04:12:19 +00:00
return ensure_motion_identifier(state, hash.clone(), original_details).await;
}
state
.repo
.identifier(hash)
.await?
.ok_or(UploadError::MissingIdentifier)
.map_err(From::from)
}