From 6cdae7b318dd2e385e36ed2b303bceb56e25c5b5 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Tue, 5 Apr 2022 20:29:30 -0500 Subject: [PATCH] Add 'cache' endpoint for ingesting ephemeral media By default, cached media should only stick around for 7 days, however The timeout is reset every time media is accessed, so only obscure cached media will be flushed from the cache. This '7 days' number is configurable through the commandline run options as --media-cache-duration and in the pict-rs.toml file as [media] cache_duration --- Cargo.lock | 17 ++++++ Cargo.toml | 3 +- defaults.toml | 1 + pict-rs.toml | 5 ++ src/config/commandline.rs | 8 +++ src/config/defaults.rs | 11 ++-- src/config/file.rs | 2 + src/error.rs | 3 + src/main.rs | 73 +++++++++++++++++++---- src/queue/cleanup.rs | 5 ++ src/repo.rs | 26 +++++++- src/repo/sled.rs | 122 ++++++++++++++++++++++++++++++++++++-- src/repo/sled/datetime.rs | 58 ++++++++++++++++++ 13 files changed, 313 insertions(+), 21 deletions(-) create mode 100644 src/repo/sled/datetime.rs diff --git a/Cargo.lock b/Cargo.lock index 6458c45..55b794d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,6 +997,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.11.2" @@ -1690,6 +1696,7 @@ dependencies = [ "reqwest", "rust-s3", "serde", + "serde_cbor", "serde_json", "sha2 0.10.2", "sled", @@ -2134,6 +2141,16 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.136" diff --git a/Cargo.toml b/Cargo.toml index 63b2f5d..4ed3baf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,12 +48,13 @@ rust-s3 = { version = "0.29.0", default-features = false, features = [ "with-reqwest", ], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } serde = { version = "1.0", features = ["derive"] } +serde_cbor = "0.11.2" serde_json = "1.0" sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" thiserror = "1.0" -time = { version = "0.3.0", features = ["serde"] } +time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } diff --git a/defaults.toml b/defaults.toml index c43d4f2..639764c 100644 --- a/defaults.toml +++ b/defaults.toml @@ -29,6 +29,7 @@ filters = [ 'thumbnail', ] skip_validate_imports = false +cache_duration = 168 [repo] type = 'sled' diff --git a/pict-rs.toml b/pict-rs.toml index 5d3a6dc..b1b9311 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -154,6 +154,11 @@ filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] # Set this to true if you want to avoid processing imported media skip_validate_imports = false +## Optional: The duration, in hours, to keep media ingested through the "cache" endpoint +# environment variable: PICTRS_MEDIA__CACHE_DURATION +# default: 168 (1 week) +cache_duration = 168 + ## Database configuration [repo] diff --git a/src/config/commandline.rs b/src/config/commandline.rs index c335cbb..b4aac13 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -53,6 +53,7 @@ impl Args { media_enable_silent_video, media_filters, media_format, + media_cache_duration, store, }) => { let server = Server { @@ -69,6 +70,7 @@ impl Args { enable_silent_video: media_enable_silent_video, filters: media_filters, format: media_format, + cache_duration: media_cache_duration, }; let operation = Operation::Run; @@ -313,6 +315,8 @@ struct Media { format: Option, #[serde(skip_serializing_if = "Option::is_none")] skip_validate_imports: Option, + #[serde(skip_serializing_if = "Option::is_none")] + cache_duration: Option, } /// Run the pict-rs application @@ -407,6 +411,10 @@ struct Run { #[clap(long)] media_format: Option, + /// How long, in hours, to keep media ingested through the "cached" endpoint + #[clap(long)] + media_cache_duration: Option, + #[clap(subcommand)] store: Option, } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index bcedb0d..9cf4180 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -68,6 +68,7 @@ struct MediaDefaults { enable_silent_video: bool, filters: Vec, skip_validate_imports: bool, + cache_duration: i64, } #[derive(Clone, Debug, serde::Serialize)] @@ -151,13 +152,15 @@ impl Default for MediaDefaults { max_file_size: 40, enable_silent_video: true, filters: vec![ - "identity".into(), - "thumbnail".into(), - "resize".into(), - "crop".into(), "blur".into(), + "crop".into(), + "identity".into(), + "resize".into(), + "thumbnail".into(), ], skip_validate_imports: false, + // one week (in hours) + cache_duration: 24 * 7, } } } diff --git a/src/config/file.rs b/src/config/file.rs index d49edc2..1de0ef4 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -102,6 +102,8 @@ pub(crate) struct Media { pub(crate) format: Option, pub(crate) skip_validate_imports: bool, + + pub(crate) cache_duration: i64, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/error.rs b/src/error.rs index 52a1c70..db99cb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -114,6 +114,9 @@ pub(crate) enum UploadError { #[error("Error in json")] Json(#[from] serde_json::Error), + #[error("Error in cbor")] + Cbor(#[from] serde_cbor::Error), + #[error("Range header not satisfiable")] Range, diff --git a/src/main.rs b/src/main.rs index 6c2d6a5..13de2c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -270,6 +270,55 @@ async fn download( }))) } +/// cache an image from a URL +#[instrument(name = "Caching file", skip(client, repo))] +async fn cache( + client: web::Data, + repo: web::Data, + store: web::Data, + query: web::Query, +) -> Result { + let res = client.get(&query.url).send().await?; + + if !res.status().is_success() { + return Err(UploadError::Download(res.status()).into()); + } + + let stream = res + .map_err(Error::from) + .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); + + let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; + + let alias = session.alias().expect("alias should exist").to_owned(); + let delete_token = session.delete_token().await?; + + let identifier = repo.identifier_from_alias::(&alias).await?; + + let details = repo.details(&identifier).await?; + + let details = if let Some(details) = details { + details + } else { + let hint = details_hint(&alias); + let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; + repo.relate_details(&identifier, &new_details).await?; + new_details + }; + + repo.mark_cached(&alias).await?; + + session.disarm(); + Ok(HttpResponse::Created().json(&serde_json::json!({ + "msg": "ok", + "files": [{ + "file": alias.to_string(), + "delete_token": delete_token.to_string(), + "details": details, + }] + }))) +} + /// Delete aliases and files #[instrument(name = "Deleting file", skip(repo))] async fn delete( @@ -359,6 +408,8 @@ async fn process( ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; + repo.check_cached(&alias).await?; + let path_string = thumbnail_path.to_string_lossy().to_string(); let hash = repo.hash(&alias).await?; let identifier_opt = repo @@ -450,12 +501,11 @@ async fn process_backgrounded( /// Fetch file details #[instrument(name = "Fetching details", skip(repo))] async fn details( - alias: web::Path, + alias: web::Path>, repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); - let alias = Alias::from_existing(&alias); let identifier = repo.identifier_from_alias::(&alias).await?; @@ -477,12 +527,14 @@ async fn details( #[instrument(name = "Serving file", skip(repo))] async fn serve( range: Option>, - alias: web::Path, + alias: web::Path>, repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); - let alias = Alias::from_existing(&alias); + + repo.check_cached(&alias).await?; + let identifier = repo.identifier_from_alias::(&alias).await?; let details = repo.details(&identifier).await?; @@ -581,7 +633,7 @@ where #[derive(Debug, serde::Deserialize)] struct AliasQuery { - alias: String, + alias: Serde, } #[instrument(name = "Purging file", skip(repo))] @@ -589,13 +641,11 @@ async fn purge( query: web::Query, repo: web::Data, ) -> Result { - let alias = Alias::from_existing(&query.alias); + let alias = query.into_inner().alias; let aliases = repo.aliases_from_alias(&alias).await?; - for alias in aliases.iter() { - let token = repo.delete_token(alias).await?; - queue::cleanup_alias(&**repo, alias.clone(), token).await?; - } + let hash = repo.hash(&alias).await?; + queue::cleanup_hash(&**repo, hash).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -608,7 +658,7 @@ async fn aliases( query: web::Query, repo: web::Data, ) -> Result { - let alias = Alias::from_existing(&query.alias); + let alias = query.into_inner().alias; let aliases = repo.aliases_from_alias(&alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -778,6 +828,7 @@ async fn launch( ), ) .service(web::resource("/download").route(web::get().to(download::))) + .service(web::resource("/cache").route(web::get().to(cache::))) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 0496695..013d000 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -85,6 +85,11 @@ where let aliases = repo.aliases(hash.clone()).await?; if !aliases.is_empty() { + for alias in aliases { + let token = repo.delete_token(&alias).await?; + crate::queue::cleanup_alias(repo, alias, token).await?; + } + // Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned return Ok(()); } diff --git a/src/repo.rs b/src/repo.rs index f356a97..4cac295 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -44,7 +44,8 @@ pub(crate) enum UploadResult { #[async_trait::async_trait(?Send)] pub(crate) trait FullRepo: - UploadRepo + CachedRepo + + UploadRepo + SettingsRepo + IdentifierRepo + AliasRepo @@ -81,12 +82,35 @@ pub(crate) trait FullRepo: None => Ok(None), } } + + async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { + let hash = self.hash(alias).await?; + CachedRepo::create(self, hash).await + } + + async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { + let hash = self.hash(alias).await?; + let hashes = CachedRepo::update(self, hash).await?; + + for hash in hashes { + crate::queue::cleanup_hash(self, hash).await?; + } + + Ok(()) + } } pub(crate) trait BaseRepo { type Bytes: AsRef<[u8]> + From> + Clone; } +#[async_trait::async_trait(?Send)] +pub(crate) trait CachedRepo: BaseRepo { + async fn create(&self, hash: Self::Bytes) -> Result<(), Error>; + + async fn update(&self, hash: Self::Bytes) -> Result, Error>; +} + #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { async fn create(&self, upload_id: UploadId) -> Result<(), Error>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 7955309..3269a7d 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,21 +1,26 @@ use crate::{ error::{Error, UploadError}, repo::{ - Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, - Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, + Alias, AliasRepo, AlreadyExists, BaseRepo, CachedRepo, DeleteToken, Details, FullRepo, + HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, + UploadResult, }, serde_str::Serde, stream::from_iterator, }; use futures_util::Stream; -use sled::{Db, IVec, Tree}; +use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, pin::Pin, sync::{Arc, RwLock}, }; use tokio::sync::Notify; +mod datetime; + +use datetime::DateTime; + macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -57,6 +62,8 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, + cache: Tree, + cache_inverse: Tree, db: Db, } @@ -77,6 +84,8 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, + cache: db.open_tree("pict-rs-cache-tree")?, + cache_inverse: db.open_tree("pict-rs-cache-inverse-tree")?, db, }) } @@ -123,6 +132,111 @@ impl From for UploadResult { } } +#[derive(serde::Serialize, serde::Deserialize)] +struct Bucket { + inner: HashSet>, +} + +#[async_trait::async_trait(?Send)] +impl CachedRepo for SledRepo { + async fn create(&self, hash: Self::Bytes) -> Result<(), Error> { + let now = DateTime::now(); + let bytes = serde_json::to_vec(&now)?; + + let cache_inverse = self.cache_inverse.clone(); + b!(self.cache, { + cache.insert(hash.clone(), bytes.clone())?; + + let mut old = cache_inverse.get(bytes.clone())?; + loop { + let new: Option> = if let Some(old) = old.as_ref() { + let mut bucket = serde_cbor::from_slice::(old)?; + bucket.inner.insert(hash.as_ref().to_vec()); + let vec = serde_cbor::to_vec(&bucket)?; + Some(vec) + } else { + let mut bucket = Bucket { + inner: HashSet::new(), + }; + bucket.inner.insert(hash.as_ref().to_vec()); + let vec = serde_cbor::to_vec(&bucket)?; + Some(vec) + }; + + let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?; + + if let Err(CompareAndSwapError { current, .. }) = res { + old = current; + } else { + break; + } + } + + Ok(()) as Result<(), Error> + }); + Ok(()) + } + + async fn update(&self, hash: Self::Bytes) -> Result, Error> { + let now = DateTime::now(); + let bytes = serde_json::to_vec(&now)?; + + let to_clean = now.min_cache_date(); + let to_clean_bytes = serde_json::to_vec(&to_clean)?; + + let cache_inverse = self.cache_inverse.clone(); + let hashes = b!(self.cache, { + let prev_value = cache + .fetch_and_update(hash.clone(), |prev_value| prev_value.map(|_| bytes.clone()))?; + + if let Some(prev_value) = prev_value { + let mut old = cache_inverse.get(prev_value.clone())?; + loop { + let new = if let Some(bucket_bytes) = old.as_ref() { + if let Ok(mut bucket) = serde_cbor::from_slice::(bucket_bytes) { + bucket.inner.remove(hash.as_ref()); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + Some(bucket_bytes) + } else { + None + } + } else { + None + }; + + if let Err(CompareAndSwapError { current, .. }) = + cache_inverse.compare_and_swap(prev_value.clone(), old, new)? + { + old = current; + } else { + break; + } + } + } + + let mut hashes: Vec = Vec::new(); + + for (date_bytes, bucket_bytes) in + cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) + { + if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { + for hash in bucket.inner { + // Best effort cleanup + let _ = cache.remove(&hash); + hashes.push(hash.into()); + } + } + + cache_inverse.remove(date_bytes)?; + } + + Ok(hashes) as Result<_, Error> + }); + + Ok(hashes) + } +} + #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { async fn create(&self, upload_id: UploadId) -> Result<(), Error> { diff --git a/src/repo/sled/datetime.rs b/src/repo/sled/datetime.rs new file mode 100644 index 0000000..0c75ea0 --- /dev/null +++ b/src/repo/sled/datetime.rs @@ -0,0 +1,58 @@ +use std::ops::Deref; +use time::{Duration, OffsetDateTime}; + +use crate::CONFIG; + +const SECONDS: i64 = 1; +const MINUTES: i64 = 60 * SECONDS; +const HOURS: i64 = 60 * MINUTES; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize)] +pub(super) struct DateTime { + #[serde(with = "time::serde::rfc3339")] + inner_date: OffsetDateTime, +} + +impl DateTime { + pub(super) fn now() -> Self { + DateTime { + inner_date: OffsetDateTime::now_utc(), + } + } + + pub(super) fn min_cache_date(&self) -> Self { + let cache_duration = Duration::new(CONFIG.media.cache_duration * HOURS, 0); + + Self { + inner_date: self + .checked_sub(cache_duration) + .expect("Should never be older than Jan 7, 1970"), + } + } +} + +impl std::fmt::Display for DateTime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner_date.fmt(f) + } +} + +impl AsRef for DateTime { + fn as_ref(&self) -> &OffsetDateTime { + &self.inner_date + } +} + +impl Deref for DateTime { + type Target = OffsetDateTime; + + fn deref(&self) -> &Self::Target { + &self.inner_date + } +} + +impl From for DateTime { + fn from(inner_date: OffsetDateTime) -> Self { + Self { inner_date } + } +}