Enable specifying concurrency for 0.4 to 0.5 migration

This commit is contained in:
asonix 2023-10-03 16:27:19 -05:00
parent 75551dc3a1
commit f223d477c7
4 changed files with 53 additions and 2 deletions

View file

@ -53,6 +53,7 @@ impl Args {
address,
api_key,
client_timeout,
upgrade_concurrency,
metrics_prometheus_address,
media_preprocess_steps,
media_external_validation,
@ -113,6 +114,10 @@ impl Args {
timeout: client_timeout,
};
let upgrade = Upgrade {
concurrency: upgrade_concurrency,
};
let metrics = Metrics {
prometheus_address: metrics_prometheus_address,
};
@ -200,6 +205,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -218,6 +224,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -234,6 +241,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -254,6 +262,7 @@ impl Args {
}) => {
let server = Server::default();
let client = Client::default();
let upgrade = Upgrade::default();
let media = Media::default();
let metrics = Metrics::default();
@ -263,6 +272,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -284,6 +294,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -309,6 +320,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -333,6 +345,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -356,6 +369,7 @@ impl Args {
Command::MigrateRepo(MigrateRepo { repo }) => {
let server = Server::default();
let client = Client::default();
let upgrade = Upgrade::default();
let media = Media::default();
let metrics = Metrics::default();
@ -365,6 +379,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -383,6 +398,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -403,6 +419,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -421,6 +438,7 @@ impl Args {
config_format: ConfigFormat {
server,
client,
upgrade,
old_repo,
tracing,
metrics,
@ -470,6 +488,7 @@ pub(crate) enum Operation {
pub(super) struct ConfigFormat {
server: Server,
client: Client,
upgrade: Upgrade,
#[serde(skip_serializing_if = "Option::is_none")]
old_repo: Option<OldSled>,
tracing: Tracing,
@ -501,6 +520,13 @@ struct Client {
timeout: Option<u64>,
}
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct Upgrade {
#[serde(skip_serializing_if = "Option::is_none")]
concurrency: Option<usize>,
}
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct Tracing {
@ -870,6 +896,12 @@ struct Run {
#[arg(long)]
client_timeout: Option<u64>,
/// How many hashes pict-rs should try to migrate from 0.4 to 0.5 concurrently
///
/// This number defaults to 32, but can be increased for better throughput
#[arg(long)]
upgrade_concurrency: Option<usize>,
/// Whether to enable the prometheus scrape endpoint
#[arg(long)]
metrics_prometheus_address: Option<SocketAddr>,

View file

@ -9,6 +9,7 @@ use std::{net::SocketAddr, path::PathBuf};
pub(crate) struct Defaults {
server: ServerDefaults,
client: ClientDefaults,
upgrade: UpgradeDefaults,
tracing: TracingDefaults,
media: MediaDefaults,
repo: RepoDefaults,
@ -29,6 +30,12 @@ struct ClientDefaults {
timeout: u64,
}
#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct UpgradeDefaults {
concurrency: usize,
}
#[derive(Clone, Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")]
struct TracingDefaults {
@ -185,6 +192,12 @@ impl Default for ClientDefaults {
}
}
impl Default for UpgradeDefaults {
fn default() -> Self {
UpgradeDefaults { concurrency: 32 }
}
}
impl Default for LoggingDefaults {
fn default() -> Self {
LoggingDefaults {

View file

@ -13,6 +13,8 @@ pub(crate) struct ConfigFile {
pub(crate) client: Client,
pub(crate) upgrade: Upgrade,
pub(crate) tracing: Tracing,
#[serde(default)]
@ -119,6 +121,11 @@ pub(crate) struct Client {
pub(crate) timeout: u64,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Upgrade {
pub(crate) concurrency: usize,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Tracing {

View file

@ -15,7 +15,6 @@ use crate::{
store::Store,
};
const MIGRATE_CONCURRENCY: usize = 32;
const GENERATOR_KEY: &str = "last-path";
#[tracing::instrument(skip_all)]
@ -110,7 +109,7 @@ pub(crate) async fn migrate_04<S: Store + 'static>(
tracing::warn!("Failed to read hash, skipping");
}
while set.len() >= MIGRATE_CONCURRENCY {
while set.len() >= config.upgrade.concurrency {
if set.join_next().await.is_some() {
index += 1;