From 77a400c7ca8f82f4d3ff21adf226d4bf8f9e8c4a Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sat, 2 Apr 2022 17:41:00 -0500 Subject: [PATCH] Implement UploadRepo --- src/repo.rs | 5 ---- src/repo/sled.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/repo.rs b/src/repo.rs index 934576e1..7137b653 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -431,11 +431,6 @@ impl UploadId { pub(crate) fn as_bytes(&self) -> &[u8] { &self.id.as_bytes()[..] } - - pub(crate) fn from_bytes(&self, bytes: &[u8]) -> Option { - let id = Uuid::from_slice(bytes).ok()?; - Some(Self { id }) - } } impl From for UploadId { diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 262fa814..99850b8e 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,9 +1,10 @@ use crate::{ - error::Error, + error::{Error, UploadError}, repo::{ Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, + serde_str::Serde, stream::from_iterator, }; use futures_util::Stream; @@ -55,6 +56,7 @@ pub(crate) struct SledRepo { queue: Tree, in_progress_queue: Tree, queue_notifier: Arc>>>, + uploads: Tree, db: Db, } @@ -74,6 +76,7 @@ impl SledRepo { queue: db.open_tree("pict-rs-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), + uploads: db.open_tree("pict-rs-uploads-tree")?, db, }) } @@ -85,18 +88,68 @@ impl BaseRepo for SledRepo { impl FullRepo for SledRepo {} +#[derive(serde::Deserialize, serde::Serialize)] +enum InnerUploadResult { + Success { + alias: Serde, + token: Serde, + }, + Failure { + message: String, + }, +} + +impl From for InnerUploadResult { + fn from(u: UploadResult) -> Self { + match u { + UploadResult::Success { alias, token } => InnerUploadResult::Success { + alias: Serde::new(alias), + token: Serde::new(token), + }, + UploadResult::Failure { message } => InnerUploadResult::Failure { message }, + } + } +} + +impl From for UploadResult { + fn from(i: InnerUploadResult) -> Self { + match i { + InnerUploadResult::Success { alias, token } => UploadResult::Success { + alias: Serde::into_inner(alias), + token: Serde::into_inner(token), + }, + InnerUploadResult::Failure { message } => UploadResult::Failure { message }, + } + } +} + #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { async fn wait(&self, upload_id: UploadId) -> Result { - unimplemented!("DO THIS") + let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes()); + + while let Some(event) = (&mut subscriber).await { + if let sled::Event::Insert { value, .. } = event { + let result: InnerUploadResult = serde_json::from_slice(&value)?; + return Ok(result.into()); + } + } + + Err(UploadError::Canceled.into()) } async fn claim(&self, upload_id: UploadId) -> Result<(), Error> { - unimplemented!("DO THIS") + b!(self.uploads, uploads.remove(upload_id.as_bytes())); + Ok(()) } async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> { - unimplemented!("DO THIS") + let result: InnerUploadResult = result.into(); + let result = serde_json::to_vec(&result)?; + + b!(self.uploads, uploads.insert(upload_id.as_bytes(), result)); + + Ok(()) } }