Save blurhashes in repo, improve some error responses, simplify extracting aliases
Some checks failed
/ tests (push) Successful in 1m47s
/ clippy (push) Failing after 59s
/ check (aarch64-unknown-linux-musl) (push) Successful in 1m55s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 1m54s
/ check (x86_64-unknown-linux-musl) (push) Successful in 1m50s

This commit is contained in:
asonix 2024-03-02 13:27:58 -06:00
parent 348f4ce0a3
commit 3ecefcb64e
9 changed files with 210 additions and 160 deletions

View file

@ -123,6 +123,9 @@ pub(crate) enum UploadError {
#[error("No files present in upload")]
NoFiles,
#[error("Media has not been proxied")]
MissingProxy,
#[error("Requested a file that doesn't exist")]
MissingAlias,
@ -186,6 +189,7 @@ impl UploadError {
Self::Semaphore => ErrorCode::PROCESS_SEMAPHORE_CLOSED,
Self::Canceled => ErrorCode::PANIC,
Self::NoFiles => ErrorCode::VALIDATE_NO_FILES,
Self::MissingProxy => ErrorCode::PROXY_NOT_FOUND,
Self::MissingAlias => ErrorCode::ALIAS_NOT_FOUND,
Self::MissingIdentifier => ErrorCode::LOST_FILE,
Self::InvalidToken => ErrorCode::INVALID_DELETE_TOKEN,
@ -256,7 +260,7 @@ impl ResponseError for Error {
Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::Exiftool(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::Process(e)) if e.is_client_error() => StatusCode::BAD_REQUEST,
Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND,
Some(UploadError::MissingProxy | UploadError::MissingAlias) => StatusCode::NOT_FOUND,
Some(UploadError::Ffmpeg(e)) if e.is_not_found() => StatusCode::NOT_FOUND,
Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN,
Some(UploadError::Range) => StatusCode::RANGE_NOT_SATISFIABLE,

View file

@ -119,6 +119,9 @@ impl ErrorCode {
pub(crate) const VALIDATE_NO_FILES: ErrorCode = ErrorCode {
code: "validate-no-files",
};
pub(crate) const PROXY_NOT_FOUND: ErrorCode = ErrorCode {
code: "proxy-not-found",
};
pub(crate) const ALIAS_NOT_FOUND: ErrorCode = ErrorCode {
code: "alias-not-found",
};

View file

@ -255,6 +255,14 @@ fn describe_postgres() {
POSTGRES_VARIANTS_REMOVE,
"Timings for removing a variant for a provided hash"
);
metrics::describe_histogram!(
POSTGRES_HASHES_RELATE_BLURHASH,
"Timings for relating a blurhash with a provided hash"
);
metrics::describe_histogram!(
POSTGRES_HASHES_BLURHASH,
"Timings for fetching a blurhash for a provided hash"
);
metrics::describe_histogram!(
POSTGRES_HASHES_RELATE_MOTION_IDENTIFIER,
"Timings for relating a still image identifier for a provided hash representing a video"
@ -438,6 +446,8 @@ pub(crate) const POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER: &str =
pub(crate) const POSTGRES_VARIANTS_IDENTIFIER: &str = "pict-rs.postgres.variants.identifier";
pub(crate) const POSTGRES_VARIANTS_FOR_HASH: &str = "pict-rs.postgres.variants.for-hash";
pub(crate) const POSTGRES_VARIANTS_REMOVE: &str = "pict-rs.postgres.variants.remove";
pub(crate) const POSTGRES_HASHES_RELATE_BLURHASH: &str = "pict-rs.postgres.hashes.relate-blurhash";
pub(crate) const POSTGRES_HASHES_BLURHASH: &str = "pict-rs.postgres.hashes.blurhash";
pub(crate) const POSTGRES_HASHES_RELATE_MOTION_IDENTIFIER: &str =
"pict-rs.postgres.hashes.relate-motion-identifier";
pub(crate) const POSTGRES_HASHES_MOTION_IDENTIFIER: &str =

View file

@ -713,28 +713,15 @@ async fn process_details<S: Store>(
ext: web::Path<String>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match source {
ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => {
Serde::into_inner(alias)
}
ProcessSource::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().json(&serde_json::json!({
"msg": "No images associated with provided proxy url"
})));
};
alias
}
};
let alias = alias_from_query(source.into(), &state).await?;
let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?;
let Some(hash) = state.repo.hash(&alias).await? else {
// Invalid alias
return Ok(HttpResponse::NotFound().json(&serde_json::json!({
"msg": "No images associated with provided alias",
})));
};
let hash = state
.repo
.hash(&alias)
.await?
.ok_or(UploadError::MissingAlias)?;
let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
@ -785,32 +772,7 @@ async fn process<S: Store + 'static>(
state: web::Data<State<S>>,
process_map: web::Data<ProcessMap>,
) -> Result<HttpResponse, Error> {
let alias = match source {
ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => {
Serde::into_inner(alias)
}
ProcessSource::Proxy { proxy } => {
let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? {
alias
} else if !state.config.server.read_only {
let stream = download_stream(proxy.as_str(), &state).await?;
let (alias, _, _) = ingest_inline(stream, &state).await?;
state.repo.relate_url(proxy, alias.clone()).await?;
alias
} else {
return Err(UploadError::ReadOnly.into());
};
if !state.config.server.read_only {
state.repo.accessed_alias(alias.clone()).await?;
}
alias
}
};
let alias = proxy_alias_from_query(source.into(), &state).await?;
let (format, thumbnail_path, thumbnail_args) =
prepare_process(&state.config, operations, ext.as_str())?;
@ -981,20 +943,10 @@ async fn process_backgrounded<S: Store>(
/// Fetch file details
#[tracing::instrument(name = "Fetching query details", skip(state))]
async fn details_query<S: Store + 'static>(
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().json(&serde_json::json!({
"msg": "Provided proxy URL has not been cached",
})));
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
let details = ensure_details(&state, &alias).await?;
@ -1016,33 +968,10 @@ async fn details<S: Store + 'static>(
#[tracing::instrument(name = "Serving file query", skip(state))]
async fn serve_query<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? {
alias
} else if !state.config.server.read_only {
let stream = download_stream(proxy.as_str(), &state).await?;
let (alias, _, _) = ingest_inline(stream, &state).await?;
state.repo.relate_url(proxy, alias.clone()).await?;
alias
} else {
return Err(UploadError::ReadOnly.into());
};
if !state.config.server.read_only {
state.repo.accessed_alias(alias.clone()).await?;
}
alias
}
};
let alias = proxy_alias_from_query(query, &state).await?;
do_serve(range, alias, state).await
}
@ -1092,18 +1021,10 @@ async fn do_serve<S: Store + 'static>(
#[tracing::instrument(name = "Serving query file headers", skip(state))]
async fn serve_query_head<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().finish());
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
do_serve_head(range, alias, state).await
}
@ -1275,12 +1196,13 @@ fn srv_head(
builder
}
async fn blurhash<S: Store + 'static>(
web::Query(alias_query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
#[tracing::instrument(level = "DEBUG", skip(state))]
async fn proxy_alias_from_query<S: Store + 'static>(
alias_query: AliasQuery,
state: &State<S>,
) -> Result<Alias, Error> {
match alias_query {
AliasQuery::Alias { alias } => Ok(Serde::into_inner(alias)),
AliasQuery::Proxy { proxy } => {
let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? {
alias
@ -1300,16 +1222,37 @@ async fn blurhash<S: Store + 'static>(
state.repo.accessed_alias(alias.clone()).await?;
}
alias
Ok(alias)
}
};
}
}
let details = ensure_details(&state, &alias).await?;
let blurhash = blurhash::generate(&state, &alias, &details).await?;
async fn blurhash<S: Store + 'static>(
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = proxy_alias_from_query(query, &state).await?;
let hash = state
.repo
.hash(&alias)
.await?
.ok_or(UploadError::MissingAlias)?;
let blurhash = if let Some(blurhash) = state.repo.blurhash(hash.clone()).await? {
blurhash
} else {
let details = ensure_details(&state, &alias).await?;
let blurhash = blurhash::generate(&state, &alias, &details).await?;
let blurhash: Arc<str> = Arc::from(blurhash);
state.repo.relate_blurhash(hash, blurhash.clone()).await?;
blurhash
};
Ok(HttpResponse::Ok().json(serde_json::json!({
"msg": "ok",
"blurhash": blurhash,
"blurhash": blurhash.as_ref(),
})))
}
@ -1378,6 +1321,17 @@ enum AliasQuery {
Alias { alias: Serde<Alias> },
}
impl From<ProcessSource> for AliasQuery {
fn from(value: ProcessSource) -> Self {
match value {
ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => {
Self::Alias { alias }
}
ProcessSource::Proxy { proxy } => Self::Proxy { proxy },
}
}
}
#[tracing::instrument(name = "Setting 404 Image", skip(state))]
async fn set_not_found<S>(
json: web::Json<AliasQuery>,
@ -1392,15 +1346,16 @@ async fn set_not_found<S>(
AliasQuery::Proxy { .. } => {
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
"msg": "Cannot use proxied media as Not Found image",
"code": "proxy-not-allowed",
})));
}
};
if state.repo.hash(&alias).await?.is_none() {
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
"msg": "No hash associated with provided alias"
})));
}
state
.repo
.hash(&alias)
.await?
.ok_or(UploadError::MissingAlias)?;
state
.repo
@ -1412,32 +1367,44 @@ async fn set_not_found<S>(
})))
}
async fn alias_from_query<S>(alias_query: AliasQuery, state: &State<S>) -> Result<Alias, Error> {
match alias_query {
AliasQuery::Alias { alias } => Ok(Serde::into_inner(alias)),
AliasQuery::Proxy { proxy } => {
let alias = state
.repo
.related(proxy)
.await?
.ok_or(UploadError::MissingProxy)?;
if !state.config.server.read_only {
state.repo.accessed_alias(alias.clone()).await?;
}
Ok(alias)
}
}
}
#[tracing::instrument(name = "Purging file", skip(state))]
async fn purge<S>(
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
if state.config.server.read_only {
return Err(UploadError::ReadOnly.into());
}
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().finish());
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
let aliases = state.repo.aliases_from_alias(&alias).await?;
let Some(hash) = state.repo.hash(&alias).await? else {
return Ok(HttpResponse::BadRequest().json(&serde_json::json!({
"msg": "No images associated with provided alias",
})));
};
let hash = state
.repo
.hash(&alias)
.await?
.ok_or(UploadError::MissingAlias)?;
queue::cleanup_hash(&state.repo, hash).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
@ -1448,22 +1415,14 @@ async fn purge<S>(
#[tracing::instrument(name = "Deleting alias", skip(state))]
async fn delete_alias<S>(
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
if state.config.server.read_only {
return Err(UploadError::ReadOnly.into());
}
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().finish());
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
if let Some(token) = state.repo.delete_token(&alias).await? {
queue::cleanup_alias(&state.repo, alias, token).await?;
@ -1478,18 +1437,10 @@ async fn delete_alias<S>(
#[tracing::instrument(name = "Fetching aliases", skip(state))]
async fn aliases<S>(
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().finish());
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
let aliases = state.repo.aliases_from_alias(&alias).await?;
@ -1501,25 +1452,16 @@ async fn aliases<S>(
#[tracing::instrument(name = "Fetching identifier", skip(state))]
async fn identifier<S>(
web::Query(alias_query): web::Query<AliasQuery>,
web::Query(query): web::Query<AliasQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let alias = match alias_query {
AliasQuery::Alias { alias } => Serde::into_inner(alias),
AliasQuery::Proxy { proxy } => {
let Some(alias) = state.repo.related(proxy).await? else {
return Ok(HttpResponse::NotFound().finish());
};
alias
}
};
let alias = alias_from_query(query, &state).await?;
let Some(identifier) = state.repo.identifier_from_alias(&alias).await? else {
// Invalid alias
return Ok(HttpResponse::NotFound().json(serde_json::json!({
"msg": "No identifiers associated with provided alias"
})));
};
let identifier = state
.repo
.identifier_from_alias(&alias)
.await?
.ok_or(UploadError::MissingAlias)?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok",

View file

@ -658,6 +658,9 @@ pub(crate) trait HashRepo: BaseRepo {
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError>;
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError>;
async fn relate_motion_identifier(
&self,
hash: Hash,
@ -739,6 +742,14 @@ where
T::remove_variant(self, hash, variant).await
}
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
T::relate_blurhash(self, hash, blurhash).await
}
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::blurhash(self, hash).await
}
async fn relate_motion_identifier(
&self,
hash: Hash,

View file

@ -930,6 +930,51 @@ impl HashRepo for PostgresRepo {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_blurhash(
&self,
input_hash: Hash,
input_blurhash: Arc<str>,
) -> Result<(), RepoError> {
use schema::hashes::dsl::*;
let mut conn = self.get_connection().await?;
diesel::update(hashes)
.filter(hash.eq(&input_hash))
.set(blurhash.eq(input_blurhash.as_ref()))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_HASHES_RELATE_BLURHASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn blurhash(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
use schema::hashes::dsl::*;
let mut conn = self.get_connection().await?;
let opt = hashes
.select(blurhash)
.filter(hash.eq(&input_hash))
.get_result::<Option<String>>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_HASHES_BLURHASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.flatten()
.map(Arc::from);
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_motion_identifier(
&self,

View file

@ -0,0 +1,15 @@
use barrel::backend::Pg;
use barrel::{types, Migration};
pub(crate) fn migration() -> String {
let mut m = Migration::new();
m.change_table("hashes", |t| {
t.add_column(
"blurhash",
types::text().size(60).nullable(true).unique(false),
);
});
m.make::<Pg>().to_string()
}

View file

@ -27,6 +27,7 @@ diesel::table! {
identifier -> Text,
motion_identifier -> Nullable<Text>,
created_at -> Timestamp,
blurhash -> Nullable<Text>,
}
}

View file

@ -91,6 +91,7 @@ pub(crate) struct SledRepo {
hash_identifiers: Tree,
hash_variant_identifiers: Tree,
hash_motion_identifiers: Tree,
hash_blurhashes: Tree,
aliases: Tree,
alias_hashes: Tree,
alias_delete_tokens: Tree,
@ -132,6 +133,7 @@ impl SledRepo {
hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?,
hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?,
hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?,
hash_blurhashes: db.open_tree("pict-rs-hash-blurhashes-tree")?,
aliases: db.open_tree("pict-rs-aliases-tree")?,
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
@ -1354,6 +1356,23 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
b!(
self.hash_blurhashes,
hash_blurhashes.insert(hash.to_bytes(), blurhash.as_bytes())
);
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
let opt = b!(self.hash_blurhashes, hash_blurhashes.get(hash.to_ivec()));
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_motion_identifier(
&self,