diff --git a/src/main.rs b/src/main.rs index bedd4b9..2598ca2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use activitystreams::iri_string::types::IriString; use actix_web::{web, App, HttpServer}; +use collector::MemoryCollector; #[cfg(feature = "console")] use console_subscriber::ConsoleLayer; use opentelemetry::{sdk::Resource, KeyValue}; @@ -92,84 +93,116 @@ fn init_subscriber( Ok(()) } -#[actix_rt::main] -async fn main() -> Result<(), anyhow::Error> { - actix_rt::spawn(do_main()).await??; - tracing::warn!("Application exit"); - Ok(()) -} - -async fn do_main() -> Result<(), anyhow::Error> { +fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); let config = Config::build()?; init_subscriber(Config::software_name(), config.opentelemetry_url())?; - let collector = collector::MemoryCollector::new(); + let collector = MemoryCollector::new(); collector.install()?; let args = Args::new(); if args.any() { - let client = requests::build_client(&config.user_agent()); - - if !args.blocks().is_empty() || !args.allowed().is_empty() { - if args.undo() { - admin::client::unblock(&client, &config, args.blocks().to_vec()).await?; - admin::client::disallow(&client, &config, args.allowed().to_vec()).await?; - } else { - admin::client::block(&client, &config, args.blocks().to_vec()).await?; - admin::client::allow(&client, &config, args.allowed().to_vec()).await?; - } - println!("Updated lists"); - } - - if args.list() { - let (blocked, allowed, connected) = tokio::try_join!( - admin::client::blocked(&client, &config), - admin::client::allowed(&client, &config), - admin::client::connected(&client, &config) - )?; - - let mut report = String::from("Report:\n"); - if !allowed.allowed_domains.is_empty() { - report += "\nAllowed\n\t"; - report += &allowed.allowed_domains.join("\n\t"); - } - if !blocked.blocked_domains.is_empty() { - report += "\n\nBlocked\n\t"; - report += &blocked.blocked_domains.join("\n\t"); - } - if !connected.connected_actors.is_empty() { - report += "\n\nConnected\n\t"; - report += &connected.connected_actors.join("\n\t"); - } - report += "\n"; - println!("{report}"); - } - - if args.stats() { - let stats = admin::client::stats(&client, &config).await?; - stats.present(); - } - - return Ok(()); + return client_main(config, args); } let db = Db::build(&config)?; - let media = MediaCache::new(db.clone()); - let state = State::build(db.clone()).await?; let actors = ActorCache::new(db.clone()); + let media = MediaCache::new(db.clone()); + server_main(db, actors, media, collector, config)?; + + tracing::warn!("Application exit"); + + Ok(()) +} + +#[actix_rt::main] +async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { + actix_rt::spawn(do_client_main(config, args)).await? +} + +async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { + let client = requests::build_client(&config.user_agent()); + + if !args.blocks().is_empty() || !args.allowed().is_empty() { + if args.undo() { + admin::client::unblock(&client, &config, args.blocks().to_vec()).await?; + admin::client::disallow(&client, &config, args.allowed().to_vec()).await?; + } else { + admin::client::block(&client, &config, args.blocks().to_vec()).await?; + admin::client::allow(&client, &config, args.allowed().to_vec()).await?; + } + println!("Updated lists"); + } + + if args.list() { + let (blocked, allowed, connected) = tokio::try_join!( + admin::client::blocked(&client, &config), + admin::client::allowed(&client, &config), + admin::client::connected(&client, &config) + )?; + + let mut report = String::from("Report:\n"); + if !allowed.allowed_domains.is_empty() { + report += "\nAllowed\n\t"; + report += &allowed.allowed_domains.join("\n\t"); + } + if !blocked.blocked_domains.is_empty() { + report += "\n\nBlocked\n\t"; + report += &blocked.blocked_domains.join("\n\t"); + } + if !connected.connected_actors.is_empty() { + report += "\n\nConnected\n\t"; + report += &connected.connected_actors.join("\n\t"); + } + report += "\n"; + println!("{report}"); + } + + if args.stats() { + let stats = admin::client::stats(&client, &config).await?; + stats.present(); + } + + return Ok(()); +} + +#[actix_rt::main] +async fn server_main( + db: Db, + actors: ActorCache, + media: MediaCache, + collector: MemoryCollector, + config: Config, +) -> Result<(), anyhow::Error> { + actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await? +} + +async fn do_server_main( + db: Db, + actors: ActorCache, + media: MediaCache, + collector: MemoryCollector, + config: Config, +) -> Result<(), anyhow::Error> { + tracing::warn!("Creating state"); + let state = State::build(db.clone()).await?; + + tracing::warn!("Creating workers"); let (manager, job_server) = create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); if let Some((token, admin_handle)) = config.telegram_info() { + tracing::warn!("Creating telegram handler"); telegram::start(admin_handle.to_owned(), db.clone(), token); } let bind_address = config.bind_address(); + tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1); HttpServer::new(move || { let app = App::new() .app_data(web::Data::new(db.clone()))