diff --git a/defaults.toml b/defaults.toml index cd28a2c..fccd46f 100644 --- a/defaults.toml +++ b/defaults.toml @@ -1,6 +1,7 @@ [server] address = "0.0.0.0:8080" worker_id = "pict-rs-1" +client_pool_size = 100 [tracing.logging] format = "normal" @@ -33,7 +34,6 @@ filters = [ "thumbnail", ] skip_validate_imports = false -cache_duration = 168 [media.gif] max_width = 128 diff --git a/pict-rs.toml b/pict-rs.toml index 4fae677..afc1ff8 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -20,6 +20,19 @@ worker_id = 'pict-rs-1' # Not specifying api_key disables internal endpoints api_key = 'API_KEY' +## Optional: connection pool size for internal http client +# environment variable: PICTRS__SERVER__CLIENT_POOL_SIZE +# default: 100 +# +# This number is multiplied the number of cores available to pict-rs. Running on a 2 core machine +# with the default value will result in 200 pooled connections. Running on a 32 core machine with +# the default value will result in 3200 pooled connections. +# +# This number can be lowered to keep pict-rs within ulimit bounds if you encounter errors related to +# "Too many open files". Alternatively, increasing the ulimit of your system can solve this problem +# as well. +client_pool_size = 100 + ## Logging configuration [tracing.logging] diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 6556a58..4d880ec 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -45,6 +45,7 @@ impl Args { address, api_key, worker_id, + client_pool_size, media_preprocess_steps, media_skip_validate_imports, media_max_width, @@ -67,6 +68,7 @@ impl Args { address, api_key, worker_id, + client_pool_size, }; let gif = if media_gif_max_width.is_none() && media_gif_max_height.is_none() @@ -281,6 +283,8 @@ struct Server { worker_id: Option, #[serde(skip_serializing_if = "Option::is_none")] api_key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + client_pool_size: Option, } #[derive(Debug, Default, serde::Serialize)] @@ -438,6 +442,14 @@ struct Run { #[arg(long)] worker_id: Option, + /// Number of connections the internel HTTP client should maintain in its pool + /// + /// This number defaults to 100, and the total number is multiplied by the number of cores + /// available to the program. This means that running on a 2 core system will result in 200 + /// pooled connections, and running on a 32 core system will result in 3200 pooled connections. + #[arg(long)] + client_pool_size: Option, + /// Optional pre-processing steps for uploaded media. /// /// All still images will be put through these steps before saving diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 6240279..1b67355 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -20,6 +20,7 @@ pub(crate) struct Defaults { struct ServerDefaults { address: SocketAddr, worker_id: String, + client_pool_size: usize, } #[derive(Clone, Debug, Default, serde::Serialize)] @@ -115,6 +116,7 @@ impl Default for ServerDefaults { ServerDefaults { address: "0.0.0.0:8080".parse().expect("Valid address string"), worker_id: String::from("pict-rs-1"), + client_pool_size: 100, } } } diff --git a/src/config/file.rs b/src/config/file.rs index 7cfd5a1..c2deb6f 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -38,6 +38,9 @@ pub(crate) struct Server { #[serde(skip_serializing_if = "Option::is_none")] pub(crate) api_key: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) client_pool_size: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] diff --git a/src/lib.rs b/src/lib.rs index e3eb03f..946656b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,7 @@ use actix_web::{ http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; -use awc::Client; +use awc::{Client, Connector}; use futures_util::{ stream::{empty, once}, Stream, StreamExt, TryStreamExt, @@ -61,14 +61,14 @@ use self::{ middleware::{Deadline, Internal}, queue::queue_generate, repo::{ - Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo, UploadId, - UploadResult, + Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo, + UploadId, UploadResult, }, serde_str::Serde, store::{ file_store::FileStore, object_store::{ObjectStore, ObjectStoreConfig}, - Identifier, Store, StoreConfig, + Identifier, Store, }, stream::{StreamLimit, StreamTimeout}, }; @@ -981,7 +981,14 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { } fn build_client() -> awc::Client { + let connector = CONFIG + .server + .client_pool_size + .map(|size| Connector::new().limit(size)) + .unwrap_or_else(Connector::new); + Client::builder() + .connector(connector) .wrap(Tracing) .add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .timeout(Duration::from_secs(30)) @@ -996,15 +1003,88 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +fn configure_endpoints( + config: &mut web::ServiceConfig, repo: R, - store_config: SC, -) -> color_eyre::Result<()> { - repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) - .await?; + store: S, + client: Client, +) { + config + .app_data(web::Data::new(repo)) + .app_data(web::Data::new(store)) + .app_data(web::Data::new(client)) + .route("/healthz", web::get().to(healthz::)) + .service( + web::scope("/image") + .service( + web::resource("") + .guard(guard::Post()) + .route(web::post().to(upload::)), + ) + .service( + web::scope("/backgrounded") + .service( + web::resource("") + .guard(guard::Post()) + .route(web::post().to(upload_backgrounded::)), + ) + .service( + web::resource("/claim").route(web::get().to(claim_upload::)), + ), + ) + .service(web::resource("/download").route(web::get().to(download::))) + .service( + web::resource("/delete/{delete_token}/{filename}") + .route(web::delete().to(delete::)) + .route(web::get().to(delete::)), + ) + .service( + web::resource("/original/{filename}") + .route(web::get().to(serve::)) + .route(web::head().to(serve_head::)), + ) + .service( + web::resource("/process.{ext}") + .route(web::get().to(process::)) + .route(web::head().to(process_head::)), + ) + .service( + web::resource("/process_backgrounded.{ext}") + .route(web::get().to(process_backgrounded::)), + ) + .service( + web::scope("/details") + .service( + web::resource("/original/{filename}") + .route(web::get().to(details::)), + ) + .service( + web::resource("/process.{ext}") + .route(web::get().to(process_details::)), + ), + ), + ) + .service( + web::scope("/internal") + .wrap(Internal( + CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), + )) + .service(web::resource("/import").route(web::post().to(import::))) + .service(web::resource("/variants").route(web::delete().to(clean_variants::))) + .service(web::resource("/purge").route(web::post().to(purge::))) + .service(web::resource("/aliases").route(web::get().to(aliases::))) + .service(web::resource("/identifier").route(web::get().to(identifier::))), + ); +} +async fn launch_file_store( + repo: R, + store: FileStore, +) -> std::io::Result<()> { HttpServer::new(move || { - let store = store_config.clone().build(); + let client = build_client(); + + let store = store.clone(); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1025,91 +1105,51 @@ async fn launch( App::new() .wrap(TracingLogger::default()) .wrap(Deadline) - .app_data(web::Data::new(repo)) - .app_data(web::Data::new(store)) - .app_data(web::Data::new(build_client())) - .route("/healthz", web::get().to(healthz::)) - .service( - web::scope("/image") - .service( - web::resource("") - .guard(guard::Post()) - .route(web::post().to(upload::)), - ) - .service( - web::scope("/backgrounded") - .service( - web::resource("") - .guard(guard::Post()) - .route(web::post().to(upload_backgrounded::)), - ) - .service( - web::resource("/claim") - .route(web::get().to(claim_upload::)), - ), - ) - .service( - web::resource("/download").route(web::get().to(download::)), - ) - .service( - web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete::)) - .route(web::get().to(delete::)), - ) - .service( - web::resource("/original/{filename}") - .route(web::get().to(serve::)) - .route(web::head().to(serve_head::)), - ) - .service( - web::resource("/process.{ext}") - .route(web::get().to(process::)) - .route(web::head().to(process_head::)), - ) - .service( - web::resource("/process_backgrounded.{ext}") - .route(web::get().to(process_backgrounded::)), - ) - .service( - web::scope("/details") - .service( - web::resource("/original/{filename}") - .route(web::get().to(details::)), - ) - .service( - web::resource("/process.{ext}") - .route(web::get().to(process_details::)), - ), - ), - ) - .service( - web::scope("/internal") - .wrap(Internal( - CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), - )) - .service(web::resource("/import").route(web::post().to(import::))) - .service( - web::resource("/variants").route(web::delete().to(clean_variants::)), - ) - .service(web::resource("/purge").route(web::post().to(purge::))) - .service(web::resource("/aliases").route(web::get().to(aliases::))) - .service( - web::resource("/identifier") - .route(web::get().to(identifier::)), - ), - ) + .configure(move |sc| configure_endpoints(sc, repo, store, client)) }) .bind(CONFIG.server.address)? .run() - .await?; + .await +} - self::tmp_file::remove_tmp_dir().await?; +async fn launch_object_store( + repo: R, + store_config: ObjectStoreConfig, +) -> std::io::Result<()> { + HttpServer::new(move || { + let client = build_client(); - Ok(()) + let store = store_config.clone().build(client.clone()); + let repo = repo.clone(); + + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_cleanup( + repo.clone(), + store.clone(), + next_worker_id(), + )) + }); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn(queue::process_images( + repo.clone(), + store.clone(), + next_worker_id(), + )) + }); + + App::new() + .wrap(TracingLogger::default()) + .wrap(Deadline) + .configure(move |sc| configure_endpoints(sc, repo, store, client)) + }) + .bind(CONFIG.server.address)? + .run() + .await } async fn migrate_inner( repo: &Repo, + client: Client, from: S1, to: config::Store, skip_missing_files: bool, @@ -1119,7 +1159,7 @@ where { match to { config::Store::Filesystem(config::Filesystem { path }) => { - let to = FileStore::build(path.clone(), repo.clone()).await?.build(); + let to = FileStore::build(path.clone(), repo.clone()).await?; match repo { Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, @@ -1149,7 +1189,7 @@ where repo.clone(), ) .await? - .build(); + .build(client); match repo { Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, @@ -1229,10 +1269,12 @@ pub async fn run() -> color_eyre::Result<()> { from, to, } => { + let client = build_client(); + match from { config::Store::Filesystem(config::Filesystem { path }) => { - let from = FileStore::build(path.clone(), repo.clone()).await?.build(); - migrate_inner(&repo, from, to, skip_missing_files).await?; + let from = FileStore::build(path.clone(), repo.clone()).await?; + migrate_inner(&repo, client, from, to, skip_missing_files).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, @@ -1258,9 +1300,9 @@ pub async fn run() -> color_eyre::Result<()> { repo.clone(), ) .await? - .build(); + .build(client.clone()); - migrate_inner(&repo, from, to, skip_missing_files).await?; + migrate_inner(&repo, client, from, to, skip_missing_files).await?; } } @@ -1274,7 +1316,13 @@ pub async fn run() -> color_eyre::Result<()> { let store = FileStore::build(path, repo.clone()).await?; match repo { - Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await, + Repo::Sled(sled_repo) => { + sled_repo + .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) + .await?; + + launch_file_store(sled_repo, store).await?; + } } } config::Store::ObjectStorage(config::ObjectStorage { @@ -1303,10 +1351,20 @@ pub async fn run() -> color_eyre::Result<()> { .await?; match repo { - Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await, + Repo::Sled(sled_repo) => { + sled_repo + .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) + .await?; + + launch_object_store(sled_repo, store).await?; + } } } } + + self::tmp_file::remove_tmp_dir().await?; + + Ok(()) } const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; diff --git a/src/store.rs b/src/store.rs index a15a04a..1f2307a 100644 --- a/src/store.rs +++ b/src/store.rs @@ -62,12 +62,6 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { fn string_repr(&self) -> String; } -pub(crate) trait StoreConfig: Send + Sync + Clone { - type Store: Store; - - fn build(self) -> Self::Store; -} - #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { type Identifier: Identifier + 'static; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index b186fdf..90be306 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -1,7 +1,7 @@ use crate::{ file::File, repo::{Repo, SettingsRepo}, - store::{Store, StoreConfig}, + store::Store, }; use actix_web::web::Bytes; use futures_util::stream::Stream; @@ -49,14 +49,6 @@ pub(crate) struct FileStore { repo: Repo, } -impl StoreConfig for FileStore { - type Store = FileStore; - - fn build(self) -> Self::Store { - self - } -} - #[async_trait::async_trait(?Send)] impl Store for FileStore { type Identifier = FileId; diff --git a/src/store/object_store.rs b/src/store/object_store.rs index c4634ff..b849f4a 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,7 +1,7 @@ use crate::{ bytes_stream::BytesStream, repo::{Repo, SettingsRepo}, - store::{Store, StoreConfig}, + store::Store, }; use actix_rt::task::JoinError; use actix_web::{ @@ -116,16 +116,14 @@ struct InitiateMultipartUploadResponse { upload_id: String, } -impl StoreConfig for ObjectStoreConfig { - type Store = ObjectStore; - - fn build(self) -> Self::Store { +impl ObjectStoreConfig { + pub(crate) fn build(self, client: Client) -> ObjectStore { ObjectStore { path_gen: self.path_gen, repo: self.repo, bucket: self.bucket, credentials: self.credentials, - client: crate::build_client(), + client, } } }