Add 'cache' endpoint for ingesting ephemeral media
All checks were successful
continuous-integration/drone/push Build is passing

By default, cached media should only stick around for 7 days, however
The timeout is reset every time media is accessed, so only obscure
cached media will be flushed from the cache. This '7 days' number is
configurable through the commandline run options as --media-cache-duration
and in the pict-rs.toml file as [media] cache_duration
This commit is contained in:
Aode (Lion) 2022-04-05 20:29:30 -05:00
parent 7d9d947256
commit 6cdae7b318
13 changed files with 313 additions and 21 deletions

17
Cargo.lock generated
View file

@ -997,6 +997,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.11.2" version = "0.11.2"
@ -1690,6 +1696,7 @@ dependencies = [
"reqwest", "reqwest",
"rust-s3", "rust-s3",
"serde", "serde",
"serde_cbor",
"serde_json", "serde_json",
"sha2 0.10.2", "sha2 0.10.2",
"sled", "sled",
@ -2134,6 +2141,16 @@ dependencies = [
"xml-rs", "xml-rs",
] ]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.136" version = "1.0.136"

View file

@ -48,12 +48,13 @@ rust-s3 = { version = "0.29.0", default-features = false, features = [
"with-reqwest", "with-reqwest",
], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } ], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2"
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.10.0" sha2 = "0.10.0"
sled = { version = "0.34.7" } sled = { version = "0.34.7" }
storage-path-generator = "0.1.0" storage-path-generator = "0.1.0"
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.0", features = ["serde"] } time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] } tokio = { version = "1", features = ["full", "tracing"] }
tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] } tokio-util = { version = "0.7", default-features = false, features = ["codec"] }

View file

@ -29,6 +29,7 @@ filters = [
'thumbnail', 'thumbnail',
] ]
skip_validate_imports = false skip_validate_imports = false
cache_duration = 168
[repo] [repo]
type = 'sled' type = 'sled'

View file

@ -154,6 +154,11 @@ filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail']
# Set this to true if you want to avoid processing imported media # Set this to true if you want to avoid processing imported media
skip_validate_imports = false skip_validate_imports = false
## Optional: The duration, in hours, to keep media ingested through the "cache" endpoint
# environment variable: PICTRS_MEDIA__CACHE_DURATION
# default: 168 (1 week)
cache_duration = 168
## Database configuration ## Database configuration
[repo] [repo]

View file

@ -53,6 +53,7 @@ impl Args {
media_enable_silent_video, media_enable_silent_video,
media_filters, media_filters,
media_format, media_format,
media_cache_duration,
store, store,
}) => { }) => {
let server = Server { let server = Server {
@ -69,6 +70,7 @@ impl Args {
enable_silent_video: media_enable_silent_video, enable_silent_video: media_enable_silent_video,
filters: media_filters, filters: media_filters,
format: media_format, format: media_format,
cache_duration: media_cache_duration,
}; };
let operation = Operation::Run; let operation = Operation::Run;
@ -313,6 +315,8 @@ struct Media {
format: Option<ImageFormat>, format: Option<ImageFormat>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
skip_validate_imports: Option<bool>, skip_validate_imports: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
cache_duration: Option<i64>,
} }
/// Run the pict-rs application /// Run the pict-rs application
@ -407,6 +411,10 @@ struct Run {
#[clap(long)] #[clap(long)]
media_format: Option<ImageFormat>, media_format: Option<ImageFormat>,
/// How long, in hours, to keep media ingested through the "cached" endpoint
#[clap(long)]
media_cache_duration: Option<i64>,
#[clap(subcommand)] #[clap(subcommand)]
store: Option<RunStore>, store: Option<RunStore>,
} }

View file

@ -68,6 +68,7 @@ struct MediaDefaults {
enable_silent_video: bool, enable_silent_video: bool,
filters: Vec<String>, filters: Vec<String>,
skip_validate_imports: bool, skip_validate_imports: bool,
cache_duration: i64,
} }
#[derive(Clone, Debug, serde::Serialize)] #[derive(Clone, Debug, serde::Serialize)]
@ -151,13 +152,15 @@ impl Default for MediaDefaults {
max_file_size: 40, max_file_size: 40,
enable_silent_video: true, enable_silent_video: true,
filters: vec![ filters: vec![
"identity".into(),
"thumbnail".into(),
"resize".into(),
"crop".into(),
"blur".into(), "blur".into(),
"crop".into(),
"identity".into(),
"resize".into(),
"thumbnail".into(),
], ],
skip_validate_imports: false, skip_validate_imports: false,
// one week (in hours)
cache_duration: 24 * 7,
} }
} }
} }

View file

@ -102,6 +102,8 @@ pub(crate) struct Media {
pub(crate) format: Option<ImageFormat>, pub(crate) format: Option<ImageFormat>,
pub(crate) skip_validate_imports: bool, pub(crate) skip_validate_imports: bool,
pub(crate) cache_duration: i64,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]

View file

@ -114,6 +114,9 @@ pub(crate) enum UploadError {
#[error("Error in json")] #[error("Error in json")]
Json(#[from] serde_json::Error), Json(#[from] serde_json::Error),
#[error("Error in cbor")]
Cbor(#[from] serde_cbor::Error),
#[error("Range header not satisfiable")] #[error("Range header not satisfiable")]
Range, Range,

View file

@ -270,6 +270,55 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}))) })))
} }
/// cache an image from a URL
#[instrument(name = "Caching file", skip(client, repo))]
async fn cache<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
repo: web::Data<R>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> {
let res = client.get(&query.url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
}
let stream = res
.map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
let mut session = ingest::ingest(&**repo, &**store, stream, None, true).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&alias);
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
repo.relate_details(&identifier, &new_details).await?;
new_details
};
repo.mark_cached(&alias).await?;
session.disarm();
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
"files": [{
"file": alias.to_string(),
"delete_token": delete_token.to_string(),
"details": details,
}]
})))
}
/// Delete aliases and files /// Delete aliases and files
#[instrument(name = "Deleting file", skip(repo))] #[instrument(name = "Deleting file", skip(repo))]
async fn delete<R: FullRepo>( async fn delete<R: FullRepo>(
@ -359,6 +408,8 @@ async fn process<R: FullRepo, S: Store + 'static>(
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
repo.check_cached(&alias).await?;
let path_string = thumbnail_path.to_string_lossy().to_string(); let path_string = thumbnail_path.to_string_lossy().to_string();
let hash = repo.hash(&alias).await?; let hash = repo.hash(&alias).await?;
let identifier_opt = repo let identifier_opt = repo
@ -450,12 +501,11 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
/// Fetch file details /// Fetch file details
#[instrument(name = "Fetching details", skip(repo))] #[instrument(name = "Fetching details", skip(repo))]
async fn details<R: FullRepo, S: Store + 'static>( async fn details<R: FullRepo, S: Store + 'static>(
alias: web::Path<String>, alias: web::Path<Serde<Alias>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = alias.into_inner(); let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?; let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
@ -477,12 +527,14 @@ async fn details<R: FullRepo, S: Store + 'static>(
#[instrument(name = "Serving file", skip(repo))] #[instrument(name = "Serving file", skip(repo))]
async fn serve<R: FullRepo, S: Store + 'static>( async fn serve<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
alias: web::Path<String>, alias: web::Path<Serde<Alias>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = alias.into_inner(); let alias = alias.into_inner();
let alias = Alias::from_existing(&alias);
repo.check_cached(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?; let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = repo.details(&identifier).await?; let details = repo.details(&identifier).await?;
@ -581,7 +633,7 @@ where
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
struct AliasQuery { struct AliasQuery {
alias: String, alias: Serde<Alias>,
} }
#[instrument(name = "Purging file", skip(repo))] #[instrument(name = "Purging file", skip(repo))]
@ -589,13 +641,11 @@ async fn purge<R: FullRepo>(
query: web::Query<AliasQuery>, query: web::Query<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias); let alias = query.into_inner().alias;
let aliases = repo.aliases_from_alias(&alias).await?; let aliases = repo.aliases_from_alias(&alias).await?;
for alias in aliases.iter() { let hash = repo.hash(&alias).await?;
let token = repo.delete_token(alias).await?; queue::cleanup_hash(&**repo, hash).await?;
queue::cleanup_alias(&**repo, alias.clone(), token).await?;
}
Ok(HttpResponse::Ok().json(&serde_json::json!({ Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
@ -608,7 +658,7 @@ async fn aliases<R: FullRepo>(
query: web::Query<AliasQuery>, query: web::Query<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias); let alias = query.into_inner().alias;
let aliases = repo.aliases_from_alias(&alias).await?; let aliases = repo.aliases_from_alias(&alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({ Ok(HttpResponse::Ok().json(&serde_json::json!({
@ -778,6 +828,7 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
), ),
) )
.service(web::resource("/download").route(web::get().to(download::<R, S>))) .service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(web::resource("/cache").route(web::get().to(cache::<R, S>)))
.service( .service(
web::resource("/delete/{delete_token}/{filename}") web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>)) .route(web::delete().to(delete::<R>))

View file

@ -85,6 +85,11 @@ where
let aliases = repo.aliases(hash.clone()).await?; let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() { if !aliases.is_empty() {
for alias in aliases {
let token = repo.delete_token(&alias).await?;
crate::queue::cleanup_alias(repo, alias, token).await?;
}
// Return after queueing cleanup alias, since we will be requeued when the last alias is cleaned
return Ok(()); return Ok(());
} }

View file

@ -44,7 +44,8 @@ pub(crate) enum UploadResult {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait FullRepo: pub(crate) trait FullRepo:
UploadRepo CachedRepo
+ UploadRepo
+ SettingsRepo + SettingsRepo
+ IdentifierRepo + IdentifierRepo
+ AliasRepo + AliasRepo
@ -81,12 +82,35 @@ pub(crate) trait FullRepo:
None => Ok(None), None => Ok(None),
} }
} }
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
CachedRepo::create(self, hash).await
}
async fn check_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
let hashes = CachedRepo::update(self, hash).await?;
for hash in hashes {
crate::queue::cleanup_hash(self, hash).await?;
}
Ok(())
}
} }
pub(crate) trait BaseRepo { pub(crate) trait BaseRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>> + Clone; type Bytes: AsRef<[u8]> + From<Vec<u8>> + Clone;
} }
#[async_trait::async_trait(?Send)]
pub(crate) trait CachedRepo: BaseRepo {
async fn create(&self, hash: Self::Bytes) -> Result<(), Error>;
async fn update(&self, hash: Self::Bytes) -> Result<Vec<Self::Bytes>, Error>;
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo { pub(crate) trait UploadRepo: BaseRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error>; async fn create(&self, upload_id: UploadId) -> Result<(), Error>;

View file

@ -1,21 +1,26 @@
use crate::{ use crate::{
error::{Error, UploadError}, error::{Error, UploadError},
repo::{ repo::{
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, Alias, AliasRepo, AlreadyExists, BaseRepo, CachedRepo, DeleteToken, Details, FullRepo,
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo,
UploadResult,
}, },
serde_str::Serde, serde_str::Serde,
stream::from_iterator, stream::from_iterator,
}; };
use futures_util::Stream; use futures_util::Stream;
use sled::{Db, IVec, Tree}; use sled::{CompareAndSwapError, Db, IVec, Tree};
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
pin::Pin, pin::Pin,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
mod datetime;
use datetime::DateTime;
macro_rules! b { macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{ ($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone(); let $ident = $self.$ident.clone();
@ -57,6 +62,8 @@ pub(crate) struct SledRepo {
in_progress_queue: Tree, in_progress_queue: Tree,
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>, queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
uploads: Tree, uploads: Tree,
cache: Tree,
cache_inverse: Tree,
db: Db, db: Db,
} }
@ -77,6 +84,8 @@ impl SledRepo {
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(RwLock::new(HashMap::new())), queue_notifier: Arc::new(RwLock::new(HashMap::new())),
uploads: db.open_tree("pict-rs-uploads-tree")?, uploads: db.open_tree("pict-rs-uploads-tree")?,
cache: db.open_tree("pict-rs-cache-tree")?,
cache_inverse: db.open_tree("pict-rs-cache-inverse-tree")?,
db, db,
}) })
} }
@ -123,6 +132,111 @@ impl From<InnerUploadResult> for UploadResult {
} }
} }
#[derive(serde::Serialize, serde::Deserialize)]
struct Bucket {
inner: HashSet<Vec<u8>>,
}
#[async_trait::async_trait(?Send)]
impl CachedRepo for SledRepo {
async fn create(&self, hash: Self::Bytes) -> Result<(), Error> {
let now = DateTime::now();
let bytes = serde_json::to_vec(&now)?;
let cache_inverse = self.cache_inverse.clone();
b!(self.cache, {
cache.insert(hash.clone(), bytes.clone())?;
let mut old = cache_inverse.get(bytes.clone())?;
loop {
let new: Option<Vec<u8>> = if let Some(old) = old.as_ref() {
let mut bucket = serde_cbor::from_slice::<Bucket>(old)?;
bucket.inner.insert(hash.as_ref().to_vec());
let vec = serde_cbor::to_vec(&bucket)?;
Some(vec)
} else {
let mut bucket = Bucket {
inner: HashSet::new(),
};
bucket.inner.insert(hash.as_ref().to_vec());
let vec = serde_cbor::to_vec(&bucket)?;
Some(vec)
};
let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?;
if let Err(CompareAndSwapError { current, .. }) = res {
old = current;
} else {
break;
}
}
Ok(()) as Result<(), Error>
});
Ok(())
}
async fn update(&self, hash: Self::Bytes) -> Result<Vec<Self::Bytes>, Error> {
let now = DateTime::now();
let bytes = serde_json::to_vec(&now)?;
let to_clean = now.min_cache_date();
let to_clean_bytes = serde_json::to_vec(&to_clean)?;
let cache_inverse = self.cache_inverse.clone();
let hashes = b!(self.cache, {
let prev_value = cache
.fetch_and_update(hash.clone(), |prev_value| prev_value.map(|_| bytes.clone()))?;
if let Some(prev_value) = prev_value {
let mut old = cache_inverse.get(prev_value.clone())?;
loop {
let new = if let Some(bucket_bytes) = old.as_ref() {
if let Ok(mut bucket) = serde_cbor::from_slice::<Bucket>(bucket_bytes) {
bucket.inner.remove(hash.as_ref());
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
Some(bucket_bytes)
} else {
None
}
} else {
None
};
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(prev_value.clone(), old, new)?
{
old = current;
} else {
break;
}
}
}
let mut hashes: Vec<Self::Bytes> = Vec::new();
for (date_bytes, bucket_bytes) in
cache_inverse.range(..to_clean_bytes).filter_map(Result::ok)
{
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(&bucket_bytes) {
for hash in bucket.inner {
// Best effort cleanup
let _ = cache.remove(&hash);
hashes.push(hash.into());
}
}
cache_inverse.remove(date_bytes)?;
}
Ok(hashes) as Result<_, Error>
});
Ok(hashes)
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo { impl UploadRepo for SledRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error> { async fn create(&self, upload_id: UploadId) -> Result<(), Error> {

58
src/repo/sled/datetime.rs Normal file
View file

@ -0,0 +1,58 @@
use std::ops::Deref;
use time::{Duration, OffsetDateTime};
use crate::CONFIG;
const SECONDS: i64 = 1;
const MINUTES: i64 = 60 * SECONDS;
const HOURS: i64 = 60 * MINUTES;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize)]
pub(super) struct DateTime {
#[serde(with = "time::serde::rfc3339")]
inner_date: OffsetDateTime,
}
impl DateTime {
pub(super) fn now() -> Self {
DateTime {
inner_date: OffsetDateTime::now_utc(),
}
}
pub(super) fn min_cache_date(&self) -> Self {
let cache_duration = Duration::new(CONFIG.media.cache_duration * HOURS, 0);
Self {
inner_date: self
.checked_sub(cache_duration)
.expect("Should never be older than Jan 7, 1970"),
}
}
}
impl std::fmt::Display for DateTime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner_date.fmt(f)
}
}
impl AsRef<OffsetDateTime> for DateTime {
fn as_ref(&self) -> &OffsetDateTime {
&self.inner_date
}
}
impl Deref for DateTime {
type Target = OffsetDateTime;
fn deref(&self) -> &Self::Target {
&self.inner_date
}
}
impl From<OffsetDateTime> for DateTime {
fn from(inner_date: OffsetDateTime) -> Self {
Self { inner_date }
}
}