use actix_web::{App, HttpServer}; use awc::{Client, Connector}; use clap::Parser; use console_subscriber::ConsoleLayer; use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource}; use pict_rs_aggregator::Tls; use rustls::ServerConfig; use std::{net::SocketAddr, sync::Arc, time::Duration}; use tracing::subscriber::set_global_default; use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; use tracing_error::ErrorLayer; use tracing_log::LogTracer; use tracing_subscriber::{ filter::Targets, layer::SubscriberExt, registry::LookupSpan, Layer, Registry, }; use url::Url; #[actix_rt::main] async fn main() -> color_eyre::Result<()> { let config = pict_rs_aggregator::Config::parse(); init_logger( config.opentelemetry_url(), config.console_address(), config.console_event_buffer_size(), )?; let mut db_path = config.db_path().to_owned(); db_path.push("sled"); db_path.push("db-0-34"); let db = sled::Config::new() .path(db_path) .cache_capacity(config.sled_cache_capacity()) .open()?; let bind_address = config.bind_address(); let rustls_client_config = config.build_rustls_client_config().await?; let tls = Tls::from_config(&config); let state = pict_rs_aggregator::state(config, "", db)?; let server = HttpServer::new(move || { let connector = Connector::new().rustls_021(Arc::new(rustls_client_config.clone())); let client = Client::builder() .wrap(Tracing) .timeout(Duration::from_secs(30)) .add_default_header(("User-Agent", "pict_rs_aggregator-v0.5.0")) .disable_redirects() .connector(connector) .finish(); App::new() .wrap(TracingLogger::default()) .configure(|cfg| pict_rs_aggregator::configure(cfg, state.clone(), client)) }); if let Some(tls) = tls { let key = tls.open_keys().await?; let (tx, rx) = rustls_channel_resolver::channel::<32>(key); let handle = actix_rt::spawn(async move { let mut interval = actix_rt::time::interval(Duration::from_secs(30)); interval.tick().await; loop { interval.tick().await; match tls.open_keys().await { Ok(key) => tx.update(key), Err(e) => tracing::error!("Failed to open keys for TLS {e}"), } } }); let server_config = ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_cert_resolver(rx); tracing::info!("Serving pict-rs-aggregator over TLS on {bind_address}"); server .bind_rustls_021(bind_address, server_config)? .run() .await?; handle.abort(); let _ = handle.await; } else { tracing::info!("Serving pict-rs-aggregator on {bind_address}"); server.bind(bind_address)?.run().await?; } Ok(()) } fn init_logger( opentelemetry_url: Option<&Url>, console_addr: Option, console_event_buffer_size: Option, ) -> color_eyre::Result<()> { color_eyre::install()?; LogTracer::init()?; opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); let targets: Targets = std::env::var("RUST_LOG") .unwrap_or_else(|_| "info".into()) .parse()?; let format_layer = tracing_subscriber::fmt::layer().with_filter(targets.clone()); let subscriber = Registry::default() .with(format_layer) .with(ErrorLayer::default()); if let Some(addr) = console_addr { let builder = ConsoleLayer::builder().with_default_env().server_addr(addr); let console_layer = if let Some(buffer_size) = console_event_buffer_size { builder.event_buffer_capacity(buffer_size).spawn() } else { builder.spawn() }; let subscriber = subscriber.with(console_layer); init_subscriber(subscriber, targets, opentelemetry_url)?; tracing::info!("Serving tokio-console endpoint on {addr}"); } else { init_subscriber(subscriber, targets, opentelemetry_url)?; } Ok(()) } fn init_subscriber( subscriber: S, targets: Targets, opentelemetry_url: Option<&Url>, ) -> color_eyre::Result<()> where S: SubscriberExt + Send + Sync, for<'a> S: LookupSpan<'a>, { if let Some(url) = opentelemetry_url { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config( opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![ KeyValue::new("service.name", "pict-rs-aggregator"), ])), ) .with_exporter( opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(url.as_str()), ) .install_batch(opentelemetry_sdk::runtime::Tokio)?; let otel_layer = tracing_opentelemetry::layer() .with_tracer(tracer) .with_filter(targets); let subscriber = subscriber.with(otel_layer); set_global_default(subscriber)?; } else { set_global_default(subscriber)?; } Ok(()) }