Simplify the cancel-safe processor
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2023-07-22 10:44:43 -05:00
parent 6459c4f50a
commit 8d35c2449d
3 changed files with 81 additions and 39 deletions

34
Cargo.lock generated
View file

@ -780,6 +780,19 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -1354,6 +1367,15 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -1635,6 +1657,7 @@ dependencies = [
"config", "config",
"console-subscriber", "console-subscriber",
"dashmap", "dashmap",
"flume",
"futures-util", "futures-util",
"hex", "hex",
"md-5", "md-5",
@ -1957,7 +1980,7 @@ dependencies = [
"cc", "cc",
"libc", "libc",
"once_cell", "once_cell",
"spin", "spin 0.5.2",
"untrusted", "untrusted",
"web-sys", "web-sys",
"winapi", "winapi",
@ -2270,6 +2293,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "storage-path-generator" name = "storage-path-generator"
version = "0.1.1" version = "0.1.1"

View file

@ -26,6 +26,7 @@ color-eyre = "0.6"
config = "0.13.0" config = "0.13.0"
console-subscriber = "0.1" console-subscriber = "0.1"
dashmap = "5.1.0" dashmap = "5.1.0"
flume = "0.10.14"
futures-util = "0.3.17" futures-util = "0.3.17"
hex = "0.4.3" hex = "0.4.3"
md-5 = "0.10.5" md-5 = "0.10.5"

View file

@ -4,6 +4,7 @@ use crate::{
}; };
use actix_web::web; use actix_web::web;
use dashmap::{mapref::entry::Entry, DashMap}; use dashmap::{mapref::entry::Entry, DashMap};
use flume::{r#async::RecvFut, Receiver, Sender};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{ use std::{
future::Future, future::Future,
@ -11,21 +12,35 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::sync::oneshot::{Receiver, Sender};
use tracing::Span; use tracing::Span;
type OutcomeSender = Sender<(Details, web::Bytes)>; type OutcomeReceiver = Receiver<(Details, web::Bytes)>;
type ProcessMapKey = (Vec<u8>, PathBuf); type ProcessMapKey = (Vec<u8>, PathBuf);
type ProcessMap = DashMap<ProcessMapKey, Vec<OutcomeSender>>; type ProcessMap = DashMap<ProcessMapKey, OutcomeReceiver>;
static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new); static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new);
struct CancelToken { struct CancelToken {
span: Span, span: Span,
key: ProcessMapKey, key: ProcessMapKey,
receiver: Option<Receiver<(Details, web::Bytes)>>, state: CancelState,
}
enum CancelState {
Sender {
sender: Sender<(Details, web::Bytes)>,
},
Receiver {
receiver: RecvFut<'static, (Details, web::Bytes)>,
},
}
impl CancelState {
const fn is_sender(&self) -> bool {
matches!(self, Self::Sender { .. })
}
} }
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
@ -44,39 +59,38 @@ where
pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self { pub(super) fn new(hash: &[u8], path: PathBuf, fut: F) -> Self {
let key = (hash.to_vec(), path.clone()); let key = (hash.to_vec(), path.clone());
let (sender, receiver) = flume::bounded(1);
let entry = PROCESS_MAP.entry(key.clone()); let entry = PROCESS_MAP.entry(key.clone());
let (receiver, span) = match entry { let (state, span) = match entry {
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
vacant.insert(Vec::new()); vacant.insert(receiver);
let span = tracing::info_span!( let span = tracing::info_span!(
"Processing image", "Processing image",
hash = &tracing::field::debug(&hex::encode(hash)), hash = &tracing::field::debug(&hex::encode(hash)),
path = &tracing::field::debug(&path), path = &tracing::field::debug(&path),
completed = &tracing::field::Empty, completed = &tracing::field::Empty,
); );
(None, span) (CancelState::Sender { sender }, span)
} }
Entry::Occupied(mut occupied) => { Entry::Occupied(receiver) => {
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(tokio::sync::oneshot::channel);
occupied.get_mut().push(tx);
let span = tracing::info_span!( let span = tracing::info_span!(
"Waiting for processed image", "Waiting for processed image",
hash = &tracing::field::debug(&hex::encode(hash)), hash = &tracing::field::debug(&hex::encode(hash)),
path = &tracing::field::debug(&path), path = &tracing::field::debug(&path),
); );
(Some(rx), span) (
CancelState::Receiver {
receiver: receiver.get().clone().into_recv_async(),
},
span,
)
} }
}; };
CancelSafeProcessor { CancelSafeProcessor {
cancel_token: CancelToken { cancel_token: CancelToken { span, key, state },
span,
key,
receiver,
},
fut, fut,
} }
} }
@ -92,35 +106,30 @@ where
let this = self.as_mut().project(); let this = self.as_mut().project();
let span = &this.cancel_token.span; let span = &this.cancel_token.span;
let receiver = &mut this.cancel_token.receiver; let state = &mut this.cancel_token.state;
let key = &this.cancel_token.key; let key = &this.cancel_token.key;
let fut = this.fut; let fut = this.fut;
span.in_scope(|| { span.in_scope(|| match state {
if let Some(ref mut rx) = receiver { CancelState::Sender { sender } => fut.poll(cx).map(|res| {
Pin::new(rx) PROCESS_MAP.remove(key);
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into())) if let Ok(tup) = &res {
} else { let _ = sender.try_send(tup.clone());
fut.poll(cx).map(|res| { }
let opt = PROCESS_MAP.remove(key);
res.map(|tup| { res
if let Some((_, vec)) = opt { }),
for sender in vec { CancelState::Receiver { ref mut receiver } => Pin::new(receiver)
let _ = sender.send(tup.clone()); .poll(cx)
} .map(|res| res.map_err(|_| UploadError::Canceled.into())),
}
tup
})
})
}
}) })
} }
} }
impl Drop for CancelToken { impl Drop for CancelToken {
fn drop(&mut self) { fn drop(&mut self) {
if self.receiver.is_none() { if self.state.is_sender() {
let completed = PROCESS_MAP.remove(&self.key).is_none(); let completed = PROCESS_MAP.remove(&self.key).is_none();
self.span.record("completed", completed); self.span.record("completed", completed);
} }