use actix_web::web::Bytes; use futures_core::Stream; use std::{fmt::Debug, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{error_code::ErrorCode, stream::LocalBoxStream}; pub(crate) mod file_store; pub(crate) mod object_store; #[derive(Debug, thiserror::Error)] pub(crate) enum StoreError { #[error("Error in file store")] FileStore(#[source] crate::store::file_store::FileError), #[error("Error in object store")] ObjectStore(#[source] crate::store::object_store::ObjectError), #[error("Error in DB")] Repo(#[from] crate::repo::RepoError), #[error("Error in 0.4 DB")] Repo04(#[from] crate::repo_04::RepoError), #[error("Requested file is not found")] FileNotFound(#[source] std::io::Error), #[error("Requested object is not found")] ObjectNotFound(#[source] crate::store::object_store::ObjectError), } impl StoreError { pub(crate) const fn error_code(&self) -> ErrorCode { match self { Self::FileStore(e) => e.error_code(), Self::ObjectStore(e) => e.error_code(), Self::Repo(e) => e.error_code(), Self::Repo04(_) => ErrorCode::OLD_REPO_ERROR, Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND, } } pub(crate) const fn is_not_found(&self) -> bool { matches!(self, Self::FileNotFound(_)) || matches!(self, Self::ObjectNotFound(_)) } pub(crate) const fn is_disconnected(&self) -> bool { match self { Self::Repo(e) => e.is_disconnected(), _ => false, } } } impl From for StoreError { fn from(value: crate::store::file_store::FileError) -> Self { match value { crate::store::file_store::FileError::Io(e) if e.kind() == std::io::ErrorKind::NotFound => { Self::FileNotFound(e) } e => Self::FileStore(e), } } } impl From for StoreError { fn from(value: crate::store::object_store::ObjectError) -> Self { match value { e @ crate::store::object_store::ObjectError::Status( actix_web::http::StatusCode::NOT_FOUND, _, ) => Self::ObjectNotFound(e), e => Self::ObjectStore(e), } } } #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { async fn health_check(&self) -> Result<(), StoreError>; async fn save_async_read( &self, reader: Reader, content_type: mime::Mime, ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static; async fn save_stream( &self, stream: S, content_type: mime::Mime, ) -> Result, StoreError> where S: Stream> + Unpin + 'static; async fn save_bytes( &self, bytes: Bytes, content_type: mime::Mime, ) -> Result, StoreError>; fn public_url(&self, _: &Arc) -> Option; async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError>; async fn read_into( &self, identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin; async fn len(&self, identifier: &Arc) -> Result; async fn remove(&self, identifier: &Arc) -> Result<(), StoreError>; } #[async_trait::async_trait(?Send)] impl Store for actix_web::web::Data where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_async_read( &self, reader: Reader, content_type: mime::Mime, ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader, content_type).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { T::save_stream(self, stream, content_type).await } async fn save_bytes( &self, bytes: Bytes, content_type: mime::Mime, ) -> Result, StoreError> { T::save_bytes(self, bytes, content_type).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn read_into( &self, identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } #[async_trait::async_trait(?Send)] impl Store for Arc where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_async_read( &self, reader: Reader, content_type: mime::Mime, ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader, content_type).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { T::save_stream(self, stream, content_type).await } async fn save_bytes( &self, bytes: Bytes, content_type: mime::Mime, ) -> Result, StoreError> { T::save_bytes(self, bytes, content_type).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn read_into( &self, identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } } #[async_trait::async_trait(?Send)] impl<'a, T> Store for &'a T where T: Store, { async fn health_check(&self) -> Result<(), StoreError> { T::health_check(self).await } async fn save_async_read( &self, reader: Reader, content_type: mime::Mime, ) -> Result, StoreError> where Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader, content_type).await } async fn save_stream( &self, stream: S, content_type: mime::Mime, ) -> Result, StoreError> where S: Stream> + Unpin + 'static, { T::save_stream(self, stream, content_type).await } async fn save_bytes( &self, bytes: Bytes, content_type: mime::Mime, ) -> Result, StoreError> { T::save_bytes(self, bytes, content_type).await } fn public_url(&self, identifier: &Arc) -> Option { T::public_url(self, identifier) } async fn to_stream( &self, identifier: &Arc, from_start: Option, len: Option, ) -> Result>, StoreError> { T::to_stream(self, identifier, from_start, len).await } async fn read_into( &self, identifier: &Arc, writer: &mut Writer, ) -> Result<(), std::io::Error> where Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } async fn len(&self, identifier: &Arc) -> Result { T::len(self, identifier).await } async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { T::remove(self, identifier).await } }