diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 093d939..a119d97 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,6 +1,6 @@ use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo, - store::Store, stream::LocalBoxStream, + store::Store, stream::LocalBoxStream, sync::DropHandle, }; use actix_web::{ error::BlockingError, @@ -244,149 +244,63 @@ impl Store for ObjectStore { where S: Stream>, { - let mut stream = std::pin::pin!(stream); + match self.start_upload(stream, content_type.clone()).await? { + UploadState::Single(first_chunk) => { + let (req, object_id) = self + .put_object_request(first_chunk.len(), content_type) + .await?; - let first_chunk = read_chunk(&mut stream).await?; + let response = req + .body(Body::wrap_stream(first_chunk)) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + .await + .map_err(ObjectError::from)?; - if first_chunk.len() < CHUNK_SIZE { - drop(stream); - let (req, object_id) = self - .put_object_request(first_chunk.len(), content_type) - .await?; - let response = req - .body(Body::wrap_stream(first_chunk)) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) - .await - .map_err(ObjectError::from)?; + if !response.status().is_success() { + return Err(status_error(response, None).await); + } - if !response.status().is_success() { - return Err(status_error(response, None).await); + return Ok(object_id); } + UploadState::Multi(object_id, upload_id, futures) => { + // hack-ish: use async block as Result boundary + let res = async { + let mut etags = Vec::new(); - return Ok(object_id); - } - - let mut first_chunk = Some(first_chunk); - - let (req, object_id) = self.create_multipart_request(content_type).await?; - let response = req - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let body = response.text().await.map_err(ObjectError::Request)?; - let body = CreateMultipartUpload::parse_response(&body) - .map_err(XmlError::new) - .map_err(ObjectError::Xml)?; - let upload_id = body.upload_id(); - - // hack-ish: use async block as Result boundary - let res = async { - let mut complete = false; - let mut part_number = 0; - let mut futures = Vec::new(); - - while !complete { - tracing::trace!("save_stream: looping"); - - part_number += 1; - - let buf = if let Some(buf) = first_chunk.take() { - buf - } else { - read_chunk(&mut stream).await? - }; - - complete = buf.len() < CHUNK_SIZE; - - let this = self.clone(); - - let object_id2 = object_id.clone(); - let upload_id2 = upload_id.to_string(); - let handle = crate::sync::abort_on_drop(crate::sync::spawn( - "upload-multipart-part", - async move { - let response = this - .create_upload_part_request( - buf.clone(), - &object_id2, - part_number, - &upload_id2, - ) - .await? - .body(Body::wrap_stream(buf)) - .send() - .with_metrics( - crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - // early-drop response to close its tracing spans - drop(response); - - Ok(etag) as Result + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); } - .instrument(tracing::Span::current()), - )); - futures.push(handle); + let response = self + .send_complete_multipart_request( + &object_id, + &upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + Ok(()) as Result<(), StoreError> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, &upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + return Err(e); + } + + Ok(object_id) } - - // early-drop stream to allow the next Part to be polled concurrently - drop(stream); - - let mut etags = Vec::new(); - - for future in futures { - etags.push(future.await.map_err(ObjectError::from)??); - } - - let response = self - .send_complete_multipart_request( - &object_id, - upload_id, - etags.iter().map(|s| s.as_ref()), - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - Ok(()) as Result<(), StoreError> } - .await; - - if let Err(e) = res { - self.create_abort_multipart_request(&object_id, upload_id) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - return Err(e); - } - - Ok(object_id) } #[tracing::instrument(skip_all)] @@ -528,6 +442,15 @@ impl Store for ObjectStore { } } +enum UploadState { + Single(BytesStream), + Multi( + Arc, + String, + Vec>>, + ), +} + impl ObjectStore { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(access_key, secret_key, session_token, repo))] @@ -562,6 +485,128 @@ impl ObjectStore { }) } + #[tracing::instrument(skip_all)] + async fn start_upload( + &self, + stream: S, + content_type: mime::Mime, + ) -> Result + where + S: Stream>, + { + let mut stream = std::pin::pin!(stream); + + let first_chunk = read_chunk(&mut stream).await?; + + if first_chunk.len() < CHUNK_SIZE { + return Ok(UploadState::Single(first_chunk)); + } + + let mut first_chunk = Some(first_chunk); + + let (req, object_id) = self.create_multipart_request(content_type).await?; + let response = req + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let body = response.text().await.map_err(ObjectError::Request)?; + let body = CreateMultipartUpload::parse_response(&body) + .map_err(XmlError::new) + .map_err(ObjectError::Xml)?; + let upload_id = body.upload_id(); + + // hack-ish: use async block as Result boundary + let res = async { + let mut complete = false; + let mut part_number = 0; + let mut futures = Vec::new(); + + while !complete { + tracing::trace!("save_stream: looping"); + + part_number += 1; + + let buf = if let Some(buf) = first_chunk.take() { + buf + } else { + read_chunk(&mut stream).await? + }; + + complete = buf.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.to_string(); + let handle = crate::sync::abort_on_drop(crate::sync::spawn( + "upload-multipart-part", + async move { + let response = this + .create_upload_part_request( + buf.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .body(Body::wrap_stream(buf)) + .send() + .with_metrics( + crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, + ) + .await + .map_err(ObjectError::from)?; + + if !response.status().is_success() { + return Err(status_error(response, None).await); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + // early-drop response to close its tracing spans + drop(response); + + Ok(etag) as Result + } + .instrument(tracing::Span::current()), + )); + + futures.push(handle); + } + + Ok(futures) + } + .await; + + match res { + Ok(futures) => Ok(UploadState::Multi( + object_id, + upload_id.to_string(), + futures, + )), + Err(e) => { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + Err(e) + } + } + } + async fn head_bucket_request(&self) -> Result { let action = self.bucket.head_bucket(Some(&self.credentials));