From 80044616f92371c34a17474f48939ce22be93e87 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sun, 5 Sep 2021 19:05:58 -0500 Subject: [PATCH] Make CancelSafeProcessor impl more concise, format code --- src/main.rs | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 53f59fd..1fe5c2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,11 +9,18 @@ use awc::Client; use dashmap::{mapref::entry::Entry, DashMap}; use futures_core::stream::Stream; use once_cell::sync::{Lazy, OnceCell}; -use std::{collections::HashSet, future::{Future, ready}, path::PathBuf, time::SystemTime, task::{Context, Poll}, pin::Pin}; +use std::{ + collections::HashSet, + future::{ready, Future}, + path::PathBuf, + pin::Pin, + task::{Context, Poll}, + time::SystemTime, +}; use structopt::StructOpt; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::oneshot::{Sender, Receiver}, + sync::oneshot::{Receiver, Sender}, }; use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; @@ -97,7 +104,11 @@ where } }; - CancelSafeProcessor { path, receiver, fut } + CancelSafeProcessor { + path, + receiver, + fut, + } } } @@ -109,25 +120,21 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if let Some(ref mut rx) = self.receiver { - Pin::new(rx).poll(cx).map(|res| res.map_err(|_| UploadError::Canceled)) + Pin::new(rx) + .poll(cx) + .map(|res| res.map_err(|_| UploadError::Canceled)) } else { - match Pin::new(&mut self.fut).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(res) => { - let opt = PROCESS_MAP.remove(&self.path); - match res { - Err(e) => Poll::Ready(Err(e)), - Ok(tup) => { - if let Some((_, vec)) = opt { - for sender in vec { - let _ = sender.send(tup.clone()); - } - } - Poll::Ready(Ok(tup)) + Pin::new(&mut self.fut).poll(cx).map(|res| { + let opt = PROCESS_MAP.remove(&self.path); + res.map(|tup| { + if let Some((_, vec)) = opt { + for sender in vec { + let _ = sender.send(tup.clone()); } } - } - } + tup + }) + }) } } } @@ -533,8 +540,8 @@ async fn process( Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError> }; - let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?; - + let (details, bytes) = + CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?; return Ok(srv_response( HttpResponse::Ok(),