Remove associated Bytes type

This commit is contained in:
asonix 2023-08-14 20:00:00 -05:00
parent da876fd553
commit 26ca3a7195
3 changed files with 23 additions and 34 deletions

View file

@ -213,7 +213,6 @@ async fn process_jobs<R, S, F>(
callback: F,
) where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
@ -274,7 +273,6 @@ async fn job_loop<R, S, F>(
) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a Configuration, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
@ -315,7 +313,6 @@ async fn process_image_jobs<R, S, F>(
callback: F,
) where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(
&'a R,
@ -353,7 +350,6 @@ async fn image_job_loop<R, S, F>(
) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(
&'a R,

View file

@ -4,7 +4,7 @@ use crate::{
store::{Identifier, StoreError},
};
use futures_util::Stream;
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use url::Url;
use uuid::Uuid;
@ -131,16 +131,9 @@ where
}
}
pub(crate) trait BaseRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>> + Clone;
}
pub(crate) trait BaseRepo {}
impl<T> BaseRepo for actix_web::web::Data<T>
where
T: BaseRepo,
{
type Bytes = T::Bytes;
}
impl<T> BaseRepo for actix_web::web::Data<T> where T: BaseRepo {}
#[async_trait::async_trait(?Send)]
pub(crate) trait ProxyRepo: BaseRepo {
@ -301,9 +294,9 @@ impl JobId {
#[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo {
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<JobId, RepoError>;
async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result<JobId, RepoError>;
async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError>;
async fn pop(&self, queue: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError>;
async fn heartbeat(&self, queue: &'static str, job_id: JobId) -> Result<(), RepoError>;
@ -315,11 +308,11 @@ impl<T> QueueRepo for actix_web::web::Data<T>
where
T: QueueRepo,
{
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<JobId, RepoError> {
async fn push(&self, queue: &'static str, job: Arc<[u8]>) -> Result<JobId, RepoError> {
T::push(self, queue, job).await
}
async fn pop(&self, queue: &'static str) -> Result<(JobId, Self::Bytes), RepoError> {
async fn pop(&self, queue: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError> {
T::pop(self, queue).await
}
@ -334,8 +327,8 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError>;
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
}
@ -344,11 +337,11 @@ impl<T> SettingsRepo for actix_web::web::Data<T>
where
T: SettingsRepo,
{
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> {
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> {
T::set(self, key, value).await
}
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError> {
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
T::get(self, key).await
}

View file

@ -181,9 +181,7 @@ impl SledRepo {
}
}
impl BaseRepo for SledRepo {
type Bytes = IVec;
}
impl BaseRepo for SledRepo {}
#[async_trait::async_trait(?Send)]
impl FullRepo for SledRepo {
@ -661,7 +659,7 @@ fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> {
#[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo {
#[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))]
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<JobId, RepoError> {
async fn push(&self, queue_name: &'static str, job: Arc<[u8]>) -> Result<JobId, RepoError> {
let metrics_guard = PushMetricsGuard::guard(queue_name);
let id = JobId::gen();
@ -674,7 +672,7 @@ impl QueueRepo for SledRepo {
(&queue, &job_state).transaction(|(queue, job_state)| {
let state = JobState::pending();
queue.insert(&key[..], &job)?;
queue.insert(&key[..], &job[..])?;
job_state.insert(&key[..], state.as_bytes())?;
Ok(())
@ -706,7 +704,7 @@ impl QueueRepo for SledRepo {
}
#[tracing::instrument(skip(self))]
async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Self::Bytes), RepoError> {
async fn pop(&self, queue_name: &'static str) -> Result<(JobId, Arc<[u8]>), RepoError> {
let metrics_guard = PopMetricsGuard::guard(queue_name);
let now = time::OffsetDateTime::now_utc();
@ -754,9 +752,11 @@ impl QueueRepo for SledRepo {
let job_id = JobId::from_bytes(id_bytes);
let opt = queue.get(&key)?.map(|job_bytes| (job_id, job_bytes));
let opt = queue
.get(&key)?
.map(|job_bytes| (job_id, Arc::from(job_bytes.to_vec())));
return Ok(opt) as Result<Option<(JobId, Self::Bytes)>, SledError>;
return Ok(opt) as Result<Option<(JobId, Arc<[u8]>)>, SledError>;
}
Ok(None)
@ -842,17 +842,17 @@ impl QueueRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(value))]
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> {
b!(self.settings, settings.insert(key, value));
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> {
b!(self.settings, settings.insert(key, &value[..]));
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError> {
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
let opt = b!(self.settings, settings.get(key));
Ok(opt)
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "trace", skip(self))]