diff --git a/README.md b/README.md index 9ed6648..2642f59 100644 --- a/README.md +++ b/README.md @@ -766,10 +766,17 @@ There's a few required configuration options for object storage. I will try to e - secret-key: this is a second secret your cloud provider will give to you in order to access the bucket +Additionally, there's a commandline argument that can be set to change the default level of +concurrency for the migration. pict-rs will attempt to migrate 32 hashes at a time, but for large +deployments, it may be worth trying to increase this value. Setting it to 128, 256, or even 512 +could be useful. Note that the bigger this value, the more concurrent connections to the object +storage provider will be made. + The command will look something like this: ```bash $ pict-rs \ migrate-store \ + --concurrency 32 \ filesystem \ -p /path/to/files \ object-storage \ diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 4f1aea2..f850a71 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -249,6 +249,7 @@ impl Args { } Command::MigrateStore(MigrateStore { skip_missing_files, + concurrency, store, }) => { let server = Server::default(); @@ -271,6 +272,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -291,6 +293,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -315,6 +318,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -338,6 +342,7 @@ impl Args { }, operation: Operation::MigrateStore { skip_missing_files, + concurrency, from: from.into(), to: to.into(), }, @@ -450,6 +455,7 @@ pub(crate) enum Operation { Run, MigrateStore { skip_missing_files: bool, + concurrency: usize, from: crate::config::primitives::Store, to: crate::config::primitives::Store, }, @@ -1092,6 +1098,12 @@ struct MigrateStore { #[arg(long)] skip_missing_files: bool, + /// How many hashes pict-rs should attempt to migrate at the same time. This does not + /// correspond to a thread count, but instead how many in-flight migrations can happen. + /// Increasing this number may improve throughput + #[arg(long, default_value = "32")] + concurrency: usize, + #[command(subcommand)] store: MigrateStoreFrom, } diff --git a/src/lib.rs b/src/lib.rs index 7fca794..0ff61f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1840,6 +1840,7 @@ async fn migrate_inner( to: config::primitives::Store, skip_missing_files: bool, timeout: u64, + concurrency: usize, ) -> color_eyre::Result<()> where S1: Store + 'static, @@ -1848,7 +1849,7 @@ where config::primitives::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; - migrate_store(repo, from, to, skip_missing_files, timeout).await? + migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await? } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1882,7 +1883,7 @@ where .await? .build(client); - migrate_store(repo, from, to, skip_missing_files, timeout).await? + migrate_store(repo, from, to, skip_missing_files, timeout, concurrency).await? } } @@ -1984,6 +1985,7 @@ impl PictRsConfiguration { Operation::Run => (), Operation::MigrateStore { skip_missing_files, + concurrency, from, to, } => { @@ -1999,6 +2001,7 @@ impl PictRsConfiguration { to, skip_missing_files, config.media.process_timeout, + concurrency, ) .await?; } @@ -2043,6 +2046,7 @@ impl PictRsConfiguration { to, skip_missing_files, config.media.process_timeout, + concurrency, ) .await?; } diff --git a/src/migrate_store.rs b/src/migrate_store.rs index b0b0fe1..6c62a66 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -22,6 +22,7 @@ pub(super) async fn migrate_store( to: S2, skip_missing_files: bool, timeout: u64, + concurrency: usize, ) -> Result<(), Error> where S1: Store + Clone + 'static, @@ -48,6 +49,7 @@ where to.clone(), skip_missing_files, timeout, + concurrency, ) .await { @@ -88,6 +90,7 @@ async fn do_migrate_store( to: S2, skip_missing_files: bool, timeout: u64, + concurrency: usize, ) -> Result<(), Error> where S1: Store + 'static, @@ -129,7 +132,7 @@ where while let Some(hash) = stream.next().await { let hash = hash?; - if joinset.len() >= 32 { + if joinset.len() >= concurrency { if let Some(res) = joinset.join_next().await { res.map_err(|_| UploadError::Canceled)??; }