From d862bf81062c3eac503e1bed26dc12b402ae9f35 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 14 Jan 2024 15:56:07 -0500 Subject: [PATCH] Use tokio rather than actix-rt --- Cargo.lock | 44 +++++++++++++++++++++----------------------- Cargo.toml | 5 ++--- src/db.rs | 7 ++++++- src/error.rs | 2 +- src/jobs.rs | 27 ++++++++++++++++----------- src/main.rs | 28 ++++++++-------------------- src/spawner.rs | 2 +- src/telegram.rs | 2 +- 8 files changed, 56 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16dcd31..ebce92f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,7 +388,6 @@ version = "0.3.106-beta.2" dependencies = [ "activitystreams", "activitystreams-ext", - "actix-rt", "actix-web", "actix-webfinger", "ammonia", @@ -567,35 +566,17 @@ dependencies = [ [[package]] name = "background-jobs" version = "0.17.0" -source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" +source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93" dependencies = [ - "background-jobs-actix", "background-jobs-core", "background-jobs-metrics", -] - -[[package]] -name = "background-jobs-actix" -version = "0.17.0" -source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" -dependencies = [ - "actix-rt", - "anyhow", - "async-trait", - "background-jobs-core", - "metrics", - "serde", - "serde_json", - "thiserror", - "tokio", - "tracing", - "uuid", + "background-jobs-tokio", ] [[package]] name = "background-jobs-core" version = "0.17.0" -source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" +source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93" dependencies = [ "anyhow", "async-trait", @@ -612,7 +593,7 @@ dependencies = [ [[package]] name = "background-jobs-metrics" version = "0.17.0" -source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" +source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93" dependencies = [ "async-trait", "background-jobs-core", @@ -622,6 +603,22 @@ dependencies = [ "uuid", ] +[[package]] +name = "background-jobs-tokio" +version = "0.17.0" +source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93" +dependencies = [ + "anyhow", + "async-trait", + "background-jobs-core", + "metrics", + "serde", + "serde_json", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -3790,6 +3787,7 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", diff --git a/Cargo.toml b/Cargo.toml index 20559dc..0355957 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ default = [] [dependencies] anyhow = "1.0" -actix-rt = "2.7.0" actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] } actix-webfinger = { version = "0.5.0", default-features = false } activitystreams = "0.7.0-alpha.25" @@ -79,7 +78,7 @@ tracing-subscriber = { version = "0.3", features = [ "env-filter", "fmt", ] } -tokio = { version = "1", features = ["macros", "sync"] } +tokio = { version = "1", features = ["full", "tracing"] } uuid = { version = "1", features = ["v4", "serde"] } streem = "0.2.0" @@ -87,7 +86,7 @@ streem = "0.2.0" version = "0.17.0" git = "https://git.asonix.dog/asonix/background-jobs" default-features = false -features = ["background-jobs-actix", "background-jobs-metrics", "error-logging"] +features = ["error-logging", "metrics", "tokio"] [dependencies.http-signature-normalization-actix] version = "0.11.0" diff --git a/src/db.rs b/src/db.rs index edf00c1..ba3b7fb 100644 --- a/src/db.rs +++ b/src/db.rs @@ -750,6 +750,11 @@ mod tests { { let db = Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap(); - actix_rt::System::new().block_on((f)(db)); + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on((f)(db)); } } diff --git a/src/error.rs b/src/error.rs index e1461db..f83d931 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,4 @@ use activitystreams::checked::CheckError; -use actix_rt::task::JoinError; use actix_web::{ error::{BlockingError, ResponseError}, http::StatusCode, @@ -7,6 +6,7 @@ use actix_web::{ }; use http_signature_normalization_reqwest::SignError; use std::{convert::Infallible, fmt::Debug, io}; +use tokio::task::JoinError; use tracing_error::SpanTrace; pub(crate) struct Error { diff --git a/src/jobs.rs b/src/jobs.rs index 4e3ed39..7307b3c 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -19,9 +19,10 @@ use crate::{ jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, }; use background_jobs::{ - memory_storage::{ActixTimer, Storage}, + memory_storage::{Storage, TokioTimer}, metrics::MetricsStorage, - Job, QueueHandle, WorkerConfig, + tokio::{QueueHandle, WorkerConfig}, + Job, }; use std::time::Duration; @@ -44,11 +45,15 @@ pub(crate) fn create_workers( actors: ActorCache, media: MediaCache, config: Config, -) -> JobServer { +) -> std::io::Result { + let parallelism = std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1) as u64; + let deliver_concurrency = config.deliver_concurrency(); let queue_handle = WorkerConfig::new( - MetricsStorage::wrap(Storage::new(ActixTimer)), + MetricsStorage::wrap(Storage::new(TokioTimer)), move |queue_handle| { JobState::new( state.clone(), @@ -71,15 +76,15 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .set_worker_count("maintenance", 2) - .set_worker_count("apub", 2) - .set_worker_count("deliver", deliver_concurrency) - .start(); + .set_worker_count("maintenance", 2 * parallelism) + .set_worker_count("apub", 2 * parallelism) + .set_worker_count("deliver", deliver_concurrency * parallelism) + .start()?; - queue_handle.every(Duration::from_secs(60 * 5), Listeners); - queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline); + queue_handle.every(Duration::from_secs(60 * 5), Listeners)?; + queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?; - JobServer::new(queue_handle) + Ok(JobServer::new(queue_handle)) } #[derive(Clone, Debug)] diff --git a/src/main.rs b/src/main.rs index b286f3e..13fc32c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ use std::time::Duration; use activitystreams::iri_string::types::IriString; -use actix_rt::task::JoinHandle; use actix_web::{middleware::Compress, web, App, HttpServer}; use collector::MemoryCollector; #[cfg(feature = "console")] @@ -18,6 +17,7 @@ use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::Resource; use reqwest_middleware::ClientWithMiddleware; use rustls::ServerConfig; +use tokio::task::JoinHandle; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; use tracing_log::LogTracer; @@ -141,7 +141,7 @@ fn build_client( Ok(client_with_middleware) } -#[actix_rt::main] +#[tokio::main] async fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); @@ -162,7 +162,7 @@ async fn main() -> Result<(), anyhow::Error> { .with_http_listener(bind_addr) .build()?; - actix_rt::spawn(exporter); + tokio::spawn(exporter); let recorder = FanoutBuilder::default() .add_recorder(recorder) .add_recorder(collector.clone()) @@ -179,7 +179,7 @@ async fn main() -> Result<(), anyhow::Error> { let actors = ActorCache::new(db.clone()); let media = MediaCache::new(db.clone()); - server_main(db, actors, media, collector, config).await??; + server_main(db, actors, media, collector, config).await?; tracing::warn!("Application exit"); @@ -187,7 +187,7 @@ async fn main() -> Result<(), anyhow::Error> { } fn client_main(config: Config, args: Args) -> JoinHandle> { - actix_rt::spawn(do_client_main(config, args)) + tokio::spawn(do_client_main(config, args)) } async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { @@ -273,19 +273,9 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> Ok(()) } -fn server_main( - db: Db, - actors: ActorCache, - media: MediaCache, - collector: MemoryCollector, - config: Config, -) -> JoinHandle> { - actix_rt::spawn(do_server_main(db, actors, media, collector, config)) -} - const VERIFY_RATIO: usize = 7; -async fn do_server_main( +async fn server_main( db: Db, actors: ActorCache, media: MediaCache, @@ -327,10 +317,8 @@ async fn do_server_main( let bind_address = config.bind_address(); let sign_spawner2 = sign_spawner.clone(); let verify_spawner2 = verify_spawner.clone(); + let job_server = create_workers(state.clone(), actors.clone(), media.clone(), config.clone())?; let server = HttpServer::new(move || { - let job_server = - create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); - let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) @@ -339,7 +327,7 @@ async fn do_server_main( )) .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) - .app_data(web::Data::new(job_server)) + .app_data(web::Data::new(job_server.clone())) .app_data(web::Data::new(media.clone())) .app_data(web::Data::new(collector.clone())) .app_data(web::Data::new(verify_spawner.clone())); diff --git a/src/spawner.rs b/src/spawner.rs index b967e9b..b19aeff 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -36,7 +36,7 @@ where metrics::counter!("relay.spawner.wait-timer.start").increment(1); - let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + let mut interval = tokio::time::interval(Duration::from_secs(5)); // pass the first tick (instant) interval.tick().await; diff --git a/src/telegram.rs b/src/telegram.rs index f3cce43..9a80edc 100644 --- a/src/telegram.rs +++ b/src/telegram.rs @@ -46,7 +46,7 @@ pub(crate) fn start(admin_handle: String, db: Db, token: &str) { let bot = Bot::new(token); let admin_handle = Arc::new(admin_handle); - actix_rt::spawn(async move { + tokio::spawn(async move { let command_handler = teloxide::filter_command::().endpoint( move |bot: Bot, msg: Message, cmd: Command| { let admin_handle = admin_handle.clone();