diff --git a/Cargo.lock b/Cargo.lock index 371f821..b9f9af0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,6 +780,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1354,6 +1367,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "nom" version = "7.1.3" @@ -1635,6 +1657,7 @@ dependencies = [ "config", "console-subscriber", "dashmap", + "flume", "futures-util", "hex", "md-5", @@ -1957,7 +1980,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2270,6 +2293,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "storage-path-generator" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index b923d00..98c3b47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ color-eyre = "0.6" config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" +flume = "0.10.14" futures-util = "0.3.17" hex = "0.4.3" md-5 = "0.10.5" diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index aa5deff..52c4b62 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -4,6 +4,7 @@ use crate::{ }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; +use flume::{r#async::RecvFut, Receiver, Sender}; use once_cell::sync::Lazy; use std::{ future::Future, @@ -11,21 +12,35 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::sync::oneshot::{Receiver, Sender}; use tracing::Span; -type OutcomeSender = Sender<(Details, web::Bytes)>; +type OutcomeReceiver = Receiver<(Details, web::Bytes)>; type ProcessMapKey = (Vec, PathBuf); -type ProcessMap = DashMap>; +type ProcessMap = DashMap; static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); struct CancelToken { span: Span, key: ProcessMapKey, - receiver: Option>, + state: CancelState, +} + +enum CancelState { + Sender { + sender: Sender<(Details, web::Bytes)>, + }, + Receiver { + receiver: RecvFut<'static, (Details, web::Bytes)>, + }, +} + +impl CancelState { + const fn is_sender(&self) -> bool { + matches!(self, Self::Sender { .. }) + } } pin_project_lite::pin_project! { @@ -44,39 +59,38 @@ where pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self { let key = (hash.to_vec(), path.clone()); + let (sender, receiver) = flume::bounded(1); + let entry = PROCESS_MAP.entry(key.clone()); - let (receiver, span) = match entry { + let (state, span) = match entry { Entry::Vacant(vacant) => { - vacant.insert(Vec::new()); + vacant.insert(receiver); let span = tracing::info_span!( "Processing image", hash = &tracing::field::debug(&hex::encode(hash)), path = &tracing::field::debug(&path), completed = &tracing::field::Empty, ); - (None, span) + (CancelState::Sender { sender }, span) } - Entry::Occupied(mut occupied) => { - let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") - .in_scope(tokio::sync::oneshot::channel); - - occupied.get_mut().push(tx); + Entry::Occupied(receiver) => { let span = tracing::info_span!( "Waiting for processed image", hash = &tracing::field::debug(&hex::encode(hash)), path = &tracing::field::debug(&path), ); - (Some(rx), span) + ( + CancelState::Receiver { + receiver: receiver.get().clone().into_recv_async(), + }, + span, + ) } }; CancelSafeProcessor { - cancel_token: CancelToken { - span, - key, - receiver, - }, + cancel_token: CancelToken { span, key, state }, fut, } } @@ -92,35 +106,30 @@ where let this = self.as_mut().project(); let span = &this.cancel_token.span; - let receiver = &mut this.cancel_token.receiver; + let state = &mut this.cancel_token.state; let key = &this.cancel_token.key; let fut = this.fut; - span.in_scope(|| { - if let Some(ref mut rx) = receiver { - Pin::new(rx) - .poll(cx) - .map(|res| res.map_err(|_| UploadError::Canceled.into())) - } else { - fut.poll(cx).map(|res| { - let opt = PROCESS_MAP.remove(key); - res.map(|tup| { - if let Some((_, vec)) = opt { - for sender in vec { - let _ = sender.send(tup.clone()); - } - } - tup - }) - }) - } + span.in_scope(|| match state { + CancelState::Sender { sender } => fut.poll(cx).map(|res| { + PROCESS_MAP.remove(key); + + if let Ok(tup) = &res { + let _ = sender.try_send(tup.clone()); + } + + res + }), + CancelState::Receiver { ref mut receiver } => Pin::new(receiver) + .poll(cx) + .map(|res| res.map_err(|_| UploadError::Canceled.into())), }) } } impl Drop for CancelToken { fn drop(&mut self) { - if self.receiver.is_none() { + if self.state.is_sender() { let completed = PROCESS_MAP.remove(&self.key).is_none(); self.span.record("completed", completed); }