From a6adde874e1a45d50fa2ee694354a3b2f307d7d9 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 20:33:59 -0500 Subject: [PATCH] Implement s3 multipart uploads --- Cargo.lock | 2 + Cargo.toml | 2 + src/store/object_store.rs | 236 ++++++++++++++++++++++++++++++++++---- 3 files changed, 217 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e07d1d5..aa85935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1568,12 +1568,14 @@ dependencies = [ "console-subscriber", "dashmap", "futures-util", + "md-5", "mime", "num_cpus", "once_cell", "opentelemetry", "opentelemetry-otlp", "pin-project-lite", + "quick-xml", "rusty-s3", "serde", "serde_cbor", diff --git a/Cargo.toml b/Cargo.toml index cc58ce3..b747da4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" futures-util = "0.3.17" +md-5 = "0.10.5" mime = "0.3.1" num_cpus = "1.13" once_cell = "1.4.0" @@ -43,6 +44,7 @@ rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" +quick-xml = { version = "0.24.1", features = ["serialize"] } sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a124808..5a7c0e3 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -9,9 +9,9 @@ use actix_web::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, }, - web::Bytes, + web::{Bytes, BytesMut}, }; -use awc::{error::SendRequestError, Client, ClientRequest}; +use awc::{error::SendRequestError, Client, ClientRequest, SendClientRequest}; use futures_util::{Stream, StreamExt, TryStreamExt}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; @@ -23,6 +23,8 @@ use url::Url; mod object_id; pub(crate) use object_id::ObjectId; +const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); + // - Settings Tree // - last-path -> last generated path @@ -42,11 +44,17 @@ pub(crate) enum ObjectError { #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), + #[error("Failed to parse xml")] + Xml(#[from] quick_xml::de::DeError), + #[error("Invalid length")] Length, - #[error("Invalid status")] - Status(StatusCode), + #[error("Invalid etag response")] + Etag, + + #[error("Invalid status: {0}\n{1}")] + Status(StatusCode, String), } impl From for ObjectError { @@ -72,6 +80,16 @@ pub(crate) struct ObjectStoreConfig { credentials: Credentials, } +#[derive(serde::Deserialize, Debug)] +struct InitiateMultipartUploadResponse { + #[serde(rename = "Bucket")] + _bucket: String, + #[serde(rename = "Key")] + _key: String, + #[serde(rename = "UploadId")] + upload_id: String, +} + impl StoreConfig for ObjectStoreConfig { type Store = ObjectStore; @@ -107,32 +125,105 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(stream))] - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, mut stream: S) -> Result where S: Stream> + Unpin + 'static, { - let (req, object_id) = self.put_object_request().await?; + let (req, object_id) = self.create_multipart_request().await?; + let mut response = req.send().await.map_err(ObjectError::from)?; - let response = req.send_stream(stream).await.map_err(ObjectError::from)?; + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); - if response.status().is_success() { - return Ok(object_id); + return Err(ObjectError::Status(response.status(), body).into()); } - Err(ObjectError::Status(response.status()).into()) + let body = response.body().await?; + let body: InitiateMultipartUploadResponse = + quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; + let upload_id = &body.upload_id; + + let res = async { + let mut etags = Vec::new(); + let mut complete = false; + let mut part_number = 0; + + while !complete { + part_number += 1; + let mut bytes = BytesMut::with_capacity(CHUNK_SIZE); + + while bytes.len() < CHUNK_SIZE { + if let Some(res) = stream.next().await { + bytes.extend_from_slice(&res?); + } else { + complete = true; + break; + } + } + + let mut response = self + .create_upload_part_request(&bytes, &object_id, part_number, upload_id) + .send_body(bytes) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + etags.push(etag); + } + + let mut response = self + .send_complete_multipart_request( + &object_id, + upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + Ok(()) as Result<(), Error> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .await?; + return Err(e); + } + + Ok(object_id) } #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; - let response = req.send_body(bytes).await.map_err(ObjectError::from)?; + let mut response = req.send_body(bytes).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); } - Err(ObjectError::Status(response.status()).into()) + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument] @@ -142,7 +233,7 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result { - let response = self + let mut response = self .get_object_request(identifier, from_start, len) .send() .await @@ -152,7 +243,9 @@ impl Store for ObjectStore { return Ok(Box::pin(response.map_err(payload_to_io_error))); } - Err(ObjectError::Status(response.status()).into()) + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument(skip(writer))] @@ -171,9 +264,12 @@ impl Store for ObjectStore { .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; if !response.status().is_success() { + let body = response.body().await.map_err(payload_to_io_error)?; + let body = String::from_utf8_lossy(&body).to_string(); + return Err(std::io::Error::new( std::io::ErrorKind::Other, - ObjectError::Status(response.status()), + ObjectError::Status(response.status(), body), )); } @@ -188,14 +284,16 @@ impl Store for ObjectStore { #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let response = self + let mut response = self .head_object_request(identifier) .send() .await .map_err(ObjectError::from)?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); } let length = response @@ -212,10 +310,12 @@ impl Store for ObjectStore { #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let response = self.delete_object_request(identifier).send().await?; + let mut response = self.delete_object_request(identifier).send().await?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); } Ok(()) @@ -252,12 +352,100 @@ impl ObjectStore { async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { let path = self.next_file().await?; - let action = self.bucket.put_object(Some(&self.credentials), &path); + let mut action = self.bucket.put_object(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request<'a, A: S3Action<'a>>(&'a self, mut action: A) -> ClientRequest { + async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + let path = self.next_file().await?; + + let mut action = self + .bucket + .create_multipart_upload(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + Ok((self.build_request(action), ObjectId::from_string(path))) + } + + fn create_upload_part_request( + &self, + bytes: &[u8], + object_id: &ObjectId, + part_number: u16, + upload_id: &str, + ) -> ClientRequest { + use md5::Digest; + + let mut action = self.bucket.upload_part( + Some(&self.credentials), + object_id.as_str(), + part_number, + upload_id, + ); + + let mut hasher = md5::Md5::new(); + hasher.update(bytes); + let hash = hasher.finalize(); + let hash_string = base64::encode(&hash); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + action.headers_mut().insert("content-md5", hash_string); + + self.build_request(action) + } + + fn send_complete_multipart_request<'a, I: Iterator>( + &'a self, + object_id: &'a ObjectId, + upload_id: &'a str, + etags: I, + ) -> SendClientRequest { + let mut action = self.bucket.complete_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + etags, + ); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + let (req, action) = self.build_request_inner(action); + + req.send_body(action.body()) + } + + fn create_abort_multipart_request( + &self, + object_id: &ObjectId, + upload_id: &str, + ) -> ClientRequest { + let action = self.bucket.abort_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + ); + + self.build_request(action) + } + + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + let (req, _) = self.build_request_inner(action); + req + } + + fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -270,10 +458,12 @@ impl ObjectStore { let req = self.client.request(method, url.as_str()); - action + let req = action .headers_mut() .iter() - .fold(req, |req, tup| req.insert_header(tup)) + .fold(req, |req, tup| req.insert_header(tup)); + + (req, action) } fn get_object_request(