Unite launch-with-store fns
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
asonix 2024-02-03 18:42:34 -06:00
parent c176e4c686
commit 835647d290

View file

@ -49,6 +49,7 @@ use middleware::{Metrics, Payload};
use repo::ArcRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use rustls_channel_resolver::ChannelSender;
use rusty_s3::UrlStyle;
use state::State;
use std::{
@ -59,6 +60,7 @@ use std::{
time::{Duration, SystemTime},
};
use streem::IntoStreamer;
use sync::DropHandle;
use tmp_file::{ArcTmpDir, TmpDir};
use tokio::sync::Semaphore;
use tracing::Instrument;
@ -1664,8 +1666,27 @@ where
crate::sync::spawn("process-worker", queue::process_images(state, process_map));
}
async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
state: State<FileStore>,
fn watch_keys(tls: Tls, sender: ChannelSender) -> DropHandle<()> {
crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
match tls.open_keys().await {
Ok(certified_key) => sender.update(certified_key),
Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")),
}
}
}))
}
async fn launch<
S: Store + Send + 'static,
F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static,
>(
state: State<S>,
extra_config: F,
) -> color_eyre::Result<()> {
let process_map = ProcessMap::new();
@ -1698,19 +1719,7 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
let (tx, rx) = rustls_channel_resolver::channel::<32>(certified_key);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
match tls.open_keys().await {
Ok(certified_key) => tx.update(certified_key),
Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")),
}
}
}));
let handle = watch_keys(tls, tx);
let config = rustls_021::ServerConfig::builder()
.with_safe_defaults()
@ -1732,70 +1741,6 @@ async fn launch_file_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'stat
Ok(())
}
async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static>(
state: State<ObjectStore>,
extra_config: F,
) -> color_eyre::Result<()> {
let process_map = ProcessMap::new();
let address = state.config.server.address;
let tls = Tls::from_config(&state.config);
spawn_cleanup(state.clone());
let server = HttpServer::new(move || {
let extra_config = extra_config.clone();
let state = state.clone();
let process_map = process_map.clone();
spawn_workers(state.clone(), process_map.clone());
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.wrap(Metrics)
.wrap(Payload::new())
.configure(move |sc| {
configure_endpoints(sc, state.clone(), process_map.clone(), extra_config)
})
});
if let Some(tls) = tls {
let certified_key = tls.open_keys().await?;
let (tx, rx) = rustls_channel_resolver::channel::<32>(certified_key);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
match tls.open_keys().await {
Ok(certified_key) => tx.update(certified_key),
Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")),
}
}
}));
let config = rustls_021::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(rx);
server.bind_rustls_021(address, config)?.run().await?;
handle.abort();
let _ = handle.await;
} else {
server.bind(address)?.run().await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn migrate_inner<S1>(
config: Configuration,
@ -2143,13 +2088,10 @@ impl PictRsConfiguration {
match repo {
Repo::Sled(sled_repo) => {
launch_file_store(state, move |sc| {
sled_extra_config(sc, sled_repo.clone())
})
.await?;
launch(state, move |sc| sled_extra_config(sc, sled_repo.clone())).await?;
}
Repo::Postgres(_) => {
launch_file_store(state, |_| {}).await?;
launch(state, |_| {}).await?;
}
}
}
@ -2209,13 +2151,10 @@ impl PictRsConfiguration {
match repo {
Repo::Sled(sled_repo) => {
launch_object_store(state, move |sc| {
sled_extra_config(sc, sled_repo.clone())
})
.await?;
launch(state, move |sc| sled_extra_config(sc, sled_repo.clone())).await?;
}
Repo::Postgres(_) => {
launch_object_store(state, |_| {}).await?;
launch(state, |_| {}).await?;
}
}
}