diff --git a/Cargo.lock b/Cargo.lock index 6458c45..55b794d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,6 +997,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.11.2" @@ -1690,6 +1696,7 @@ dependencies = [ "reqwest", "rust-s3", "serde", + "serde_cbor", "serde_json", "sha2 0.10.2", "sled", @@ -2134,6 +2141,16 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.136" diff --git a/Cargo.toml b/Cargo.toml index 63b2f5d..4ed3baf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,12 +48,13 @@ rust-s3 = { version = "0.29.0", default-features = false, features = [ "with-reqwest", ], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } serde = { version = "1.0", features = ["derive"] } +serde_cbor = "0.11.2" serde_json = "1.0" sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" thiserror = "1.0" -time = { version = "0.3.0", features = ["serde"] } +time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } diff --git a/defaults.toml b/defaults.toml index c43d4f2..639764c 100644 --- a/defaults.toml +++ b/defaults.toml @@ -29,6 +29,7 @@ filters = [ 'thumbnail', ] skip_validate_imports = false +cache_duration = 168 [repo] type = 'sled' diff --git a/pict-rs.toml b/pict-rs.toml index 5d3a6dc..b1b9311 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -154,6 +154,11 @@ filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] # Set this to true if you want to avoid processing imported media skip_validate_imports = false +## Optional: The duration, in hours, to keep media ingested through the "cache" endpoint +# environment variable: PICTRS_MEDIA__CACHE_DURATION +# default: 168 (1 week) +cache_duration = 168 + ## Database configuration [repo] diff --git a/src/config/commandline.rs b/src/config/commandline.rs index c335cbb..b4aac13 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -53,6 +53,7 @@ impl Args { media_enable_silent_video, media_filters, media_format, + media_cache_duration, store, }) => { let server = Server { @@ -69,6 +70,7 @@ impl Args { enable_silent_video: media_enable_silent_video, filters: media_filters, format: media_format, + cache_duration: media_cache_duration, }; let operation = Operation::Run; @@ -313,6 +315,8 @@ struct Media { format: Option, #[serde(skip_serializing_if = "Option::is_none")] skip_validate_imports: Option, + #[serde(skip_serializing_if = "Option::is_none")] + cache_duration: Option, } /// Run the pict-rs application @@ -407,6 +411,10 @@ struct Run { #[clap(long)] media_format: Option, + /// How long, in hours, to keep media ingested through the "cached" endpoint + #[clap(long)] + media_cache_duration: Option, + #[clap(subcommand)] store: Option, } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index bcedb0d..9cf4180 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -68,6 +68,7 @@ struct MediaDefaults { enable_silent_video: bool, filters: Vec, skip_validate_imports: bool, + cache_duration: i64, } #[derive(Clone, Debug, serde::Serialize)] @@ -151,13 +152,15 @@ impl Default for MediaDefaults { max_file_size: 40, enable_silent_video: true, filters: vec![ - "identity".into(), - "thumbnail".into(), - "resize".into(), - "crop".into(), "blur".into(), + "crop".into(), + "identity".into(), + "resize".into(), + "thumbnail".into(), ], skip_validate_imports: false, + // one week (in hours) + cache_duration: 24 * 7, } } } diff --git a/src/config/file.rs b/src/config/file.rs index d49edc2..1de0ef4 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -102,6 +102,8 @@ pub(crate) struct Media { pub(crate) format: Option, pub(crate) skip_validate_imports: bool, + + pub(crate) cache_duration: i64, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/error.rs b/src/error.rs index 52a1c70..db99cb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -114,6 +114,9 @@ pub(crate) enum UploadError { #[error("Error in json")] Json(#[from] serde_json::Error), + #[error("Error in cbor")] + Cbor(#[from] serde_cbor::Error), + #[error("Range header not satisfiable")] Range, diff --git a/src/main.rs b/src/main.rs index 6c2d6a5..13de2c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -270,6 +270,55 @@ async fn download( }))) } +/// cache an image from a URL +#[instrument(name = "Caching file", skip(client, repo))] +async fn cache( + client: web::Data, + repo: web::Data, + store: web::Data, + query: web::Query, +) -> Result { + let res = client.get(&query.url).send().await?; + + if !res.status().is_success() { + return Err(UploadError::Download(res.status()).into()); + } + + let stream = res + .map_err(Error::from) + .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); + + let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?; + + let alias = session.alias().expect("alias should exist").to_owned(); + let delete_token = session.delete_token().await?; + + let identifier = repo.identifier_from_alias::(&alias).await?; + + let details = repo.details(&identifier).await?; + + let details = if let Some(details) = details { + details + } else { + let hint = details_hint(&alias); + let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; + repo.relate_details(&identifier, &new_details).await?; + new_details + }; + + repo.mark_cached(&alias).await?; + + session.disarm(); + Ok(HttpResponse::Created().json(&serde_json::json!({ + "msg": "ok", + "files": [{ + "file": alias.to_string(), + "delete_token": delete_token.to_string(), + "details": details, + }] + }))) +} + /// Delete aliases and files #[instrument(name = "Deleting file", skip(repo))] async fn delete( @@ -359,6 +408,8 @@ async fn process( ) -> Result { let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; + repo.check_cached(&alias).await?; + let path_string = thumbnail_path.to_string_lossy().to_string(); let hash = repo.hash(&alias).await?; let identifier_opt = repo @@ -450,12 +501,11 @@ async fn process_backgrounded( /// Fetch file details #[instrument(name = "Fetching details", skip(repo))] async fn details( - alias: web::Path, + alias: web::Path>, repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); - let alias = Alias::from_existing(&alias); let identifier = repo.identifier_from_alias::(&alias).await?; @@ -477,12 +527,14 @@ async fn details( #[instrument(name = "Serving file", skip(repo))] async fn serve( range: Option>, - alias: web::Path, + alias: web::Path>, repo: web::Data, store: web::Data, ) -> Result { let alias = alias.into_inner(); - let alias = Alias::from_existing(&alias); + + repo.check_cached(&alias).await?; + let identifier = repo.identifier_from_alias::(&alias).await?; let details = repo.details(&identifier).await?; @@ -581,7 +633,7 @@ where #[derive(Debug, serde::Deserialize)] struct AliasQuery { - alias: String, + alias: Serde, } #[instrument(name = "Purging file", skip(repo))] @@ -589,13 +641,11 @@ async fn purge( query: web::Query, repo: web::Data, ) -> Result { - let alias = Alias::from_existing(&query.alias); + let alias = query.into_inner().alias; let aliases = repo.aliases_from_alias(&alias).await?; - for alias in aliases.iter() { - let token = repo.delete_token(alias).await?; - queue::cleanup_alias(&**repo, alias.clone(), token).await?; - } + let hash = repo.hash(&alias).await?; + queue::cleanup_hash(&**repo, hash).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -608,7 +658,7 @@ async fn aliases( query: web::Query, repo: web::Data, ) -> Result { - let alias = Alias::from_existing(&query.alias); + let alias = query.into_inner().alias; let aliases = repo.aliases_from_alias(&alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -778,6 +828,7 @@ async fn launch( ), ) .service(web::resource("/download").route(web::get().to(download::))) + .service(web::resource("/cache").route(web::get().to(cache::))) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 0496695..013d000 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -85,6 +85,11 @@ where let aliases = repo.aliases(hash.clone()).await?; if !aliases.is_empty() { + for alias in aliases { + let token = repo.delete_token(&alias).await?; + crate::queue::cleanup_alias(repo, alias, token).await?; + } + // Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned return Ok(()); } diff --git a/src/repo.rs b/src/repo.rs index f356a97..4cac295 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -44,7 +44,8 @@ pub(crate) enum UploadResult { #[async_trait::async_trait(?Send)] pub(crate) trait FullRepo: - UploadRepo + CachedRepo + + UploadRepo + SettingsRepo + IdentifierRepo + AliasRepo @@ -81,12 +82,35 @@ pub(crate) trait FullRepo: None => Ok(None), } } + + async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { + let hash = self.hash(alias).await?; + CachedRepo::create(self, hash).await + } + + async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { + let hash = self.hash(alias).await?; + let hashes = CachedRepo::update(self, hash).await?; + + for hash in hashes { + crate::queue::cleanup_hash(self, hash).await?; + } + + Ok(()) + } } pub(crate) trait BaseRepo { type Bytes: AsRef<[u8]> + From> + Clone; } +#[async_trait::async_trait(?Send)] +pub(crate) trait CachedRepo: BaseRepo { + async fn create(&self, hash: Self::Bytes) -> Result<(), Error>; + + async fn update(&self, hash: Self::Bytes) -> Result, Error>; +} + #[async_trait::async_trait(?Send)] pub(crate) trait UploadRepo: BaseRepo { async fn create(&self, upload_id: UploadId) -> Result<(), Error>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 7955309..3269a7d 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,21 +1,26 @@ use crate::{ error::{Error, UploadError}, repo::{ - Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, - Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, + Alias, AliasRepo, AlreadyExists, BaseRepo, CachedRepo, DeleteToken, Details, FullRepo, + HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, + UploadResult, }, serde_str::Serde, stream::from_iterator, }; use futures_util::Stream; -use sled::{Db, IVec, Tree}; +use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, pin::Pin, sync::{Arc, RwLock}, }; use tokio::sync::Notify; +mod datetime; + +use datetime::DateTime; + macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -57,6 +62,8 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, + cache: Tree, + cache_inverse: Tree, db: Db, } @@ -77,6 +84,8 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, + cache: db.open_tree("pict-rs-cache-tree")?, + cache_inverse: db.open_tree("pict-rs-cache-inverse-tree")?, db, }) } @@ -123,6 +132,111 @@ impl From for UploadResult { } } +#[derive(serde::Serialize, serde::Deserialize)] +struct Bucket { + inner: HashSet>, +} + +#[async_trait::async_trait(?Send)] +impl CachedRepo for SledRepo { + async fn create(&self, hash: Self::Bytes) -> Result<(), Error> { + let now = DateTime::now(); + let bytes = serde_json::to_vec(&now)?; + + let cache_inverse = self.cache_inverse.clone(); + b!(self.cache, { + cache.insert(hash.clone(), bytes.clone())?; + + let mut old = cache_inverse.get(bytes.clone())?; + loop { + let new: Option> = if let Some(old) = old.as_ref() { + let mut bucket = serde_cbor::from_slice::(old)?; + bucket.inner.insert(hash.as_ref().to_vec()); + let vec = serde_cbor::to_vec(&bucket)?; + Some(vec) + } else { + let mut bucket = Bucket { + inner: HashSet::new(), + }; + bucket.inner.insert(hash.as_ref().to_vec()); + let vec = serde_cbor::to_vec(&bucket)?; + Some(vec) + }; + + let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?; + + if let Err(CompareAndSwapError { current, .. }) = res { + old = current; + } else { + break; + } + } + + Ok(()) as Result<(), Error> + }); + Ok(()) + } + + async fn update(&self, hash: Self::Bytes) -> Result, Error> { + let now = DateTime::now(); + let bytes = serde_json::to_vec(&now)?; + + let to_clean = now.min_cache_date(); + let to_clean_bytes = serde_json::to_vec(&to_clean)?; + + let cache_inverse = self.cache_inverse.clone(); + let hashes = b!(self.cache, { + let prev_value = cache + .fetch_and_update(hash.clone(), |prev_value| prev_value.map(|_| bytes.clone()))?; + + if let Some(prev_value) = prev_value { + let mut old = cache_inverse.get(prev_value.clone())?; + loop { + let new = if let Some(bucket_bytes) = old.as_ref() { + if let Ok(mut bucket) = serde_cbor::from_slice::(bucket_bytes) { + bucket.inner.remove(hash.as_ref()); + let bucket_bytes = serde_cbor::to_vec(&bucket)?; + Some(bucket_bytes) + } else { + None + } + } else { + None + }; + + if let Err(CompareAndSwapError { current, .. }) = + cache_inverse.compare_and_swap(prev_value.clone(), old, new)? + { + old = current; + } else { + break; + } + } + } + + let mut hashes: Vec = Vec::new(); + + for (date_bytes, bucket_bytes) in + cache_inverse.range(..to_clean_bytes).filter_map(Result::ok) + { + if let Ok(bucket) = serde_cbor::from_slice::(&bucket_bytes) { + for hash in bucket.inner { + // Best effort cleanup + let _ = cache.remove(&hash); + hashes.push(hash.into()); + } + } + + cache_inverse.remove(date_bytes)?; + } + + Ok(hashes) as Result<_, Error> + }); + + Ok(hashes) + } +} + #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { async fn create(&self, upload_id: UploadId) -> Result<(), Error> { diff --git a/src/repo/sled/datetime.rs b/src/repo/sled/datetime.rs new file mode 100644 index 0000000..0c75ea0 --- /dev/null +++ b/src/repo/sled/datetime.rs @@ -0,0 +1,58 @@ +use std::ops::Deref; +use time::{Duration, OffsetDateTime}; + +use crate::CONFIG; + +const SECONDS: i64 = 1; +const MINUTES: i64 = 60 * SECONDS; +const HOURS: i64 = 60 * MINUTES; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize)] +pub(super) struct DateTime { + #[serde(with = "time::serde::rfc3339")] + inner_date: OffsetDateTime, +} + +impl DateTime { + pub(super) fn now() -> Self { + DateTime { + inner_date: OffsetDateTime::now_utc(), + } + } + + pub(super) fn min_cache_date(&self) -> Self { + let cache_duration = Duration::new(CONFIG.media.cache_duration * HOURS, 0); + + Self { + inner_date: self + .checked_sub(cache_duration) + .expect("Should never be older than Jan 7, 1970"), + } + } +} + +impl std::fmt::Display for DateTime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner_date.fmt(f) + } +} + +impl AsRef for DateTime { + fn as_ref(&self) -> &OffsetDateTime { + &self.inner_date + } +} + +impl Deref for DateTime { + type Target = OffsetDateTime; + + fn deref(&self) -> &Self::Target { + &self.inner_date + } +} + +impl From for DateTime { + fn from(inner_date: OffsetDateTime) -> Self { + Self { inner_date } + } +}