More migration work for rusty-s3
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
asonix 2022-09-24 14:18:49 -05:00
parent ab7fd9aaf7
commit ff1771e016
6 changed files with 116 additions and 48 deletions

View file

@ -543,13 +543,30 @@ struct Filesystem {
#[derive(Clone, Debug, Parser, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct ObjectStorage {
/// The base endpoint for the object storage
///
/// Examples:
/// - `http://localhost:9000`
/// - `https://s3.dualstack.eu-west-1.amazonaws.com`
#[clap(short, long)]
endpoint: Url,
/// Determines whether to use path style or virtualhost style for accessing objects
///
/// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object}
/// When false, objects will be fetched from {bucket_name}.{endpoint}/{object}
#[clap(short, long)]
use_path_style: bool,
/// The bucket in which to store media
#[clap(short, long)]
bucket_name: Option<String>,
/// The region the bucket is located in
///
/// For minio deployments, this can just be 'minio'
#[clap(short, long)]
region: Option<Serde<s3::Region>>,
region: Option<String>,
/// The Access Key for the user accessing the bucket
#[clap(short, long)]

View file

@ -1,8 +1,8 @@
use crate::magick::ValidInputType;
use crate::serde_str::Serde;
use clap::ArgEnum;
use std::{fmt::Display, path::PathBuf, str::FromStr};
use tracing::Level;
use url::Url;
#[derive(
Clone,
@ -63,13 +63,28 @@ pub(crate) struct Filesystem {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, clap::Parser)]
#[serde(rename_all = "snake_case")]
pub(crate) struct ObjectStorage {
/// The base endpoint for the object storage
///
/// Examples:
/// - `http://localhost:9000`
/// - `https://s3.dualstack.eu-west-1.amazonaws.com`
#[clap(short, long)]
pub(crate) endpoint: Url,
/// Determines whether to use path style or virtualhost style for accessing objects
///
/// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object}
/// When false, objects will be fetched from {bucket_name}.{endpoint}/{object}
#[clap(short, long)]
pub(crate) use_path_style: bool,
/// The bucket in which to store media
#[clap(short, long)]
pub(crate) bucket_name: String,
/// The region the bucket is located in
#[clap(short, long)]
pub(crate) region: Serde<s3::Region>,
pub(crate) region: String,
/// The Access Key for the user accessing the bucket
#[clap(short, long)]
@ -219,7 +234,8 @@ impl Display for LogFormat {
#[cfg(test)]
mod tests {
use super::{Serde, Targets};
use super::Targets;
use crate::serde_str::Serde;
#[test]
fn builds_info_targets() {

View file

@ -10,6 +10,7 @@ use futures_util::{
Stream, StreamExt, TryStreamExt,
};
use once_cell::sync::Lazy;
use rusty_s3::UrlStyle;
use std::{
future::ready,
path::PathBuf,
@ -970,15 +971,15 @@ fn next_worker_id() -> String {
format!("{}-{}", CONFIG.server.worker_id, next_id)
}
async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
async fn launch<R: FullRepo + 'static, S: Store + 'static>(
repo: R,
store: S,
store: S::Config,
) -> color_eyre::Result<()> {
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
HttpServer::new(move || {
let store = store.clone();
let store = S::init(store.clone());
let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
@ -1081,6 +1082,8 @@ where
match to {
config::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?;
let to = FileStore::init(to);
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
}
@ -1088,25 +1091,30 @@ where
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
url_style,
use_path_style,
region,
access_key,
secret_key,
session_token,
}) => {
let to = ObjectStore::build(
endpoint,
endpoint.clone(),
bucket_name,
url_style,
if *use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region.as_ref(),
Some(access_key.clone()),
Some(secret_key.clone()),
session_token.clone(),
repo.clone(),
build_client(),
)
.await?;
let to = ObjectStore::init(to);
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
}
@ -1129,29 +1137,34 @@ async fn main() -> color_eyre::Result<()> {
match from {
config::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
let from = FileStore::init(from);
migrate_inner(&repo, from, &to).await?;
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
url_style,
use_path_style,
region,
access_key,
secret_key,
session_token,
}) => {
let from = ObjectStore::build(
endpoint,
endpoint.clone(),
&bucket_name,
url_style,
if *use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
Serde::into_inner(region),
Some(access_key),
Some(secret_key),
session_token,
repo.clone(),
build_client(),
)
.await?;
let from = ObjectStore::init(from);
migrate_inner(&repo, from, &to).await?;
}
@ -1167,33 +1180,36 @@ async fn main() -> color_eyre::Result<()> {
let store = FileStore::build(path, repo.clone()).await?;
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
bucket_name,
url_style,
use_path_style,
region,
access_key,
secret_key,
session_token,
}) => {
let store = ObjectStore::build(
endpoint,
endpoint.clone(),
&bucket_name,
url_style,
if *use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
Serde::into_inner(region),
Some(access_key),
Some(secret_key),
session_token,
repo.clone(),
build_client(),
)
.await?;
match repo {
Repo::Sled(sled_repo) => launch(sled_repo, store).await,
Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await,
}
}
}

View file

@ -16,10 +16,13 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug {
pub(crate) trait Store: Clone + Debug {
type Config: Send + Sync + Clone;
type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + 'static;
fn init(config: Self::Config) -> Self;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin;

View file

@ -49,9 +49,14 @@ pub(crate) struct FileStore {
#[async_trait::async_trait(?Send)]
impl Store for FileStore {
type Config = Self;
type Identifier = FileId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
fn init(config: Self::Config) -> Self {
config
}
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where

View file

@ -12,15 +12,11 @@ use actix_web::{
};
use awc::{Client, ClientRequest};
use futures_util::{Stream, TryStreamExt};
use rusty_s3::{
actions::{PutObject, S3Action},
Bucket, Credentials, UrlStyle,
};
use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle};
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
use storage_path_generator::{Generator, Path};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use tracing::Instrument;
use url::Url;
mod object_id;
@ -55,24 +51,41 @@ pub(crate) struct ObjectStore {
client: Client,
}
#[derive(Clone)]
pub(crate) struct ObjectStoreConfig {
path_gen: Generator,
repo: Repo,
bucket: Bucket,
credentials: Credentials,
}
#[async_trait::async_trait(?Send)]
impl Store for ObjectStore {
type Config = ObjectStoreConfig;
type Identifier = ObjectId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
fn init(config: Self::Config) -> Self {
ObjectStore {
path_gen: config.path_gen,
repo: config.repo,
bucket: config.bucket,
credentials: config.credentials,
client: crate::build_client(),
}
}
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
{
let response = self
.put_object_request()
.await?
.send_stream(ReaderStream::new(reader))
.await?;
let (req, object_id) = self.put_object_request().await?;
let response = req.send_stream(ReaderStream::new(reader)).await?;
if response.status().is_success() {
return Ok(ObjectId::from_string(path));
return Ok(object_id);
}
Err(ObjectError::Status(response.status()).into())
@ -80,12 +93,12 @@ impl Store for ObjectStore {
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let req = self.put_object_request().await?;
let (req, object_id) = self.put_object_request().await?;
let response = req.send_body(bytes).await?;
if response.status().is_success() {
return Ok(ObjectId::from_string(path));
return Ok(object_id);
}
Err(ObjectError::Status(response.status()).into())
@ -103,7 +116,7 @@ impl Store for ObjectStore {
.send()
.await?;
if response.status.is_success() {
if response.status().is_success() {
return Ok(Box::pin(response));
}
@ -120,11 +133,11 @@ impl Store for ObjectStore {
Writer: AsyncWrite + Send + Unpin,
{
let response = self
.get_object_request(identifier, from_start, len)
.get_object_request(identifier, None, None)
.send()
.await?;
if !response.status.is_success() {
if !response.status().is_success() {
return Err(ObjectError::Status(response.status()).into());
}
@ -141,7 +154,7 @@ impl Store for ObjectStore {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let response = self.head_object_request(identifier).send().await?;
if !response.status.is_success() {
if !response.status().is_success() {
return Err(ObjectError::Status(response.status()).into());
}
@ -161,7 +174,7 @@ impl Store for ObjectStore {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let response = self.delete_object_request(identifier).send().await?;
if !response.status.is_success() {
if !response.status().is_success() {
return Err(ObjectError::Status(response.status()).into());
}
@ -180,29 +193,27 @@ impl ObjectStore {
secret_key: Option<String>,
session_token: Option<String>,
repo: Repo,
client: reqwest::Client,
) -> Result<ObjectStore, Error> {
) -> Result<ObjectStoreConfig, Error> {
let path_gen = init_generator(&repo).await?;
Ok(ObjectStore {
Ok(ObjectStoreConfig {
path_gen,
repo,
bucket: Bucket::new(endpoint, url_style, bucket_name, region)
.map_err(ObjectError::from)?,
credentials: Credentials::new_with_token(access_key, secret_key, session_token),
client,
})
}
async fn put_object_request(&self) -> Result<ClientRequest, Error> {
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> {
let path = self.next_file().await?;
let action = self.bucket.put_object(Some(&self.credentials), &path);
Ok(self.build_request(action))
Ok((self.build_request(action), ObjectId::from_string(path)))
}
fn build_request<A: S3Action>(&self, action: A) -> ClientRequest {
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest {
let method = match A::METHOD {
rusty_s3::Method::Head => awc::http::Method::HEAD,
rusty_s3::Method::Get => awc::http::Method::GET,
@ -243,7 +254,7 @@ impl ObjectStore {
};
if let Some(range) = range {
req.insert_header(Range::Bytes(vec![range])).send().await?;
req.insert_header(Range::Bytes(vec![range]))
} else {
req
}