From bf3c47e4579bbae4fb54e4b2b78b02b7f87e0499 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 22:07:06 -0500 Subject: [PATCH] Improve concurrency for upload streams --- src/store/object_store.rs | 143 ++++++++++++++++++++++++++++---------- 1 file changed, 107 insertions(+), 36 deletions(-) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 5a7c0e3..9fe0425 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -3,8 +3,9 @@ use crate::{ repo::{Repo, SettingsRepo}, store::{Store, StoreConfig}, }; +use actix_rt::task::JoinError; use actix_web::{ - error::PayloadError, + error::{BlockingError, PayloadError}, http::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, @@ -18,6 +19,7 @@ use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::io::ReaderStream; +use tracing::Instrument; use url::Url; mod object_id; @@ -53,6 +55,9 @@ pub(crate) enum ObjectError { #[error("Invalid etag response")] Etag, + #[error("Task cancelled")] + Cancelled, + #[error("Invalid status: {0}\n{1}")] Status(StatusCode, String), } @@ -63,6 +68,18 @@ impl From for ObjectError { } } +impl From for ObjectError { + fn from(_: JoinError) -> Self { + Self::Cancelled + } +} + +impl From for ObjectError { + fn from(_: BlockingError) -> Self { + Self::Cancelled + } +} + #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, @@ -111,6 +128,34 @@ fn payload_to_io_error(e: PayloadError) -> std::io::Error { } } +#[tracing::instrument(skip(stream))] +async fn read_chunk(stream: &mut S) -> std::io::Result +where + S: Stream> + Unpin + 'static, +{ + let mut buf = Vec::new(); + let mut total_len = 0; + + while total_len < CHUNK_SIZE { + if let Some(res) = stream.next().await { + let bytes = res?; + total_len += bytes.len(); + buf.push(bytes); + } else { + break; + } + } + + let bytes = buf + .iter() + .fold(BytesMut::with_capacity(total_len), |mut acc, item| { + acc.extend_from_slice(item); + acc + }); + + Ok(bytes.freeze()) +} + #[async_trait::async_trait(?Send)] impl Store for ObjectStore { type Identifier = ObjectId; @@ -144,43 +189,61 @@ impl Store for ObjectStore { let upload_id = &body.upload_id; let res = async { - let mut etags = Vec::new(); let mut complete = false; let mut part_number = 0; + let mut futures = Vec::new(); while !complete { part_number += 1; - let mut bytes = BytesMut::with_capacity(CHUNK_SIZE); - while bytes.len() < CHUNK_SIZE { - if let Some(res) = stream.next().await { - bytes.extend_from_slice(&res?); - } else { - complete = true; - break; + let bytes = read_chunk(&mut stream).await?; + complete = bytes.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.clone(); + let handle = actix_rt::spawn( + async move { + let mut response = this + .create_upload_part_request( + bytes.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .send_body(bytes) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + drop(response); + + Ok(etag) as Result } - } + .instrument(tracing::info_span!("Upload Part")), + ); - let mut response = self - .create_upload_part_request(&bytes, &object_id, part_number, upload_id) - .send_body(bytes) - .await?; + futures.push(handle); + } - if !response.status().is_success() { - let body = String::from_utf8_lossy(&response.body().await?).to_string(); + let mut etags = Vec::new(); - return Err(ObjectError::Status(response.status(), body).into()); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - etags.push(etag); + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); } let mut response = self @@ -375,13 +438,13 @@ impl ObjectStore { Ok((self.build_request(action), ObjectId::from_string(path))) } - fn create_upload_part_request( + async fn create_upload_part_request( &self, - bytes: &[u8], + bytes: Bytes, object_id: &ObjectId, part_number: u16, upload_id: &str, - ) -> ClientRequest { + ) -> Result { use md5::Digest; let mut action = self.bucket.upload_part( @@ -391,17 +454,25 @@ impl ObjectStore { upload_id, ); - let mut hasher = md5::Md5::new(); - hasher.update(bytes); - let hash = hasher.finalize(); - let hash_string = base64::encode(&hash); + let hashing_span = tracing::info_span!("Hashing request body"); + let hash_string = actix_web::web::block(move || { + let guard = hashing_span.enter(); + let mut hasher = md5::Md5::new(); + hasher.update(&bytes); + let hash = hasher.finalize(); + let hash_string = base64::encode(&hash); + drop(guard); + hash_string + }) + .await + .map_err(ObjectError::from)?; action .headers_mut() .insert("content-type", "application/octet-stream"); action.headers_mut().insert("content-md5", hash_string); - self.build_request(action) + Ok(self.build_request(action)) } fn send_complete_multipart_request<'a, I: Iterator>(