181 lines
5.3 KiB
Rust
181 lines
5.3 KiB
Rust
use actix_web::{App, HttpServer};
|
|
use awc::{Client, Connector};
|
|
use clap::Parser;
|
|
use console_subscriber::ConsoleLayer;
|
|
use opentelemetry::KeyValue;
|
|
use opentelemetry_otlp::WithExportConfig;
|
|
use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
|
|
use pict_rs_aggregator::Tls;
|
|
use rustls::ServerConfig;
|
|
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
|
use tracing::subscriber::set_global_default;
|
|
use tracing_actix_web::TracingLogger;
|
|
use tracing_awc::Tracing;
|
|
use tracing_error::ErrorLayer;
|
|
use tracing_log::LogTracer;
|
|
use tracing_subscriber::{
|
|
filter::Targets, layer::SubscriberExt, registry::LookupSpan, Layer, Registry,
|
|
};
|
|
use url::Url;
|
|
|
|
#[actix_rt::main]
|
|
async fn main() -> color_eyre::Result<()> {
|
|
let config = pict_rs_aggregator::Config::parse();
|
|
|
|
init_logger(
|
|
config.opentelemetry_url(),
|
|
config.console_address(),
|
|
config.console_event_buffer_size(),
|
|
)?;
|
|
|
|
let mut db_path = config.db_path().to_owned();
|
|
db_path.push("sled");
|
|
db_path.push("db-0-34");
|
|
let db = sled::Config::new()
|
|
.path(db_path)
|
|
.cache_capacity(config.sled_cache_capacity())
|
|
.open()?;
|
|
|
|
let bind_address = config.bind_address();
|
|
|
|
let rustls_client_config = config.build_rustls_client_config().await?;
|
|
|
|
let tls = Tls::from_config(&config);
|
|
|
|
let state = pict_rs_aggregator::state(config, "", db)?;
|
|
|
|
let server = HttpServer::new(move || {
|
|
let connector = Connector::new().rustls_021(Arc::new(rustls_client_config.clone()));
|
|
|
|
let client = Client::builder()
|
|
.wrap(Tracing)
|
|
.timeout(Duration::from_secs(30))
|
|
.add_default_header(("User-Agent", "pict_rs_aggregator-v0.5.0"))
|
|
.disable_redirects()
|
|
.connector(connector)
|
|
.finish();
|
|
|
|
App::new()
|
|
.wrap(TracingLogger::default())
|
|
.configure(|cfg| pict_rs_aggregator::configure(cfg, state.clone(), client))
|
|
});
|
|
|
|
if let Some(tls) = tls {
|
|
let key = tls.open_keys().await?;
|
|
|
|
let (tx, rx) = rustls_channel_resolver::channel::<32>(key);
|
|
|
|
let handle = actix_rt::spawn(async move {
|
|
let mut interval = actix_rt::time::interval(Duration::from_secs(30));
|
|
interval.tick().await;
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
match tls.open_keys().await {
|
|
Ok(key) => tx.update(key),
|
|
Err(e) => tracing::error!("Failed to open keys for TLS {e}"),
|
|
}
|
|
}
|
|
});
|
|
|
|
let server_config = ServerConfig::builder()
|
|
.with_safe_defaults()
|
|
.with_no_client_auth()
|
|
.with_cert_resolver(rx);
|
|
|
|
tracing::info!("Serving pict-rs-aggregator over TLS on {bind_address}");
|
|
server
|
|
.bind_rustls_021(bind_address, server_config)?
|
|
.run()
|
|
.await?;
|
|
|
|
handle.abort();
|
|
let _ = handle.await;
|
|
} else {
|
|
tracing::info!("Serving pict-rs-aggregator on {bind_address}");
|
|
server.bind(bind_address)?.run().await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn init_logger(
|
|
opentelemetry_url: Option<&Url>,
|
|
console_addr: Option<SocketAddr>,
|
|
console_event_buffer_size: Option<usize>,
|
|
) -> color_eyre::Result<()> {
|
|
color_eyre::install()?;
|
|
|
|
LogTracer::init()?;
|
|
|
|
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
|
|
|
|
let targets: Targets = std::env::var("RUST_LOG")
|
|
.unwrap_or_else(|_| "info".into())
|
|
.parse()?;
|
|
|
|
let format_layer = tracing_subscriber::fmt::layer().with_filter(targets.clone());
|
|
|
|
let subscriber = Registry::default()
|
|
.with(format_layer)
|
|
.with(ErrorLayer::default());
|
|
|
|
if let Some(addr) = console_addr {
|
|
let builder = ConsoleLayer::builder().with_default_env().server_addr(addr);
|
|
|
|
let console_layer = if let Some(buffer_size) = console_event_buffer_size {
|
|
builder.event_buffer_capacity(buffer_size).spawn()
|
|
} else {
|
|
builder.spawn()
|
|
};
|
|
|
|
let subscriber = subscriber.with(console_layer);
|
|
|
|
init_subscriber(subscriber, targets, opentelemetry_url)?;
|
|
tracing::info!("Serving tokio-console endpoint on {addr}");
|
|
} else {
|
|
init_subscriber(subscriber, targets, opentelemetry_url)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn init_subscriber<S>(
|
|
subscriber: S,
|
|
targets: Targets,
|
|
opentelemetry_url: Option<&Url>,
|
|
) -> color_eyre::Result<()>
|
|
where
|
|
S: SubscriberExt + Send + Sync,
|
|
for<'a> S: LookupSpan<'a>,
|
|
{
|
|
if let Some(url) = opentelemetry_url {
|
|
let tracer = opentelemetry_otlp::new_pipeline()
|
|
.tracing()
|
|
.with_trace_config(
|
|
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![
|
|
KeyValue::new("service.name", "pict-rs-aggregator"),
|
|
])),
|
|
)
|
|
.with_exporter(
|
|
opentelemetry_otlp::new_exporter()
|
|
.tonic()
|
|
.with_endpoint(url.as_str()),
|
|
)
|
|
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
|
|
|
|
let otel_layer = tracing_opentelemetry::layer()
|
|
.with_tracer(tracer)
|
|
.with_filter(targets);
|
|
|
|
let subscriber = subscriber.with(otel_layer);
|
|
|
|
set_global_default(subscriber)?;
|
|
} else {
|
|
set_global_default(subscriber)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|