Use custom threadpool for client signatures

This commit is contained in:
asonix 2023-07-26 18:03:21 -05:00
parent 8ff4961ded
commit d97cc4e5a4
7 changed files with 189 additions and 23 deletions

58
Cargo.lock generated
View file

@ -409,6 +409,7 @@ dependencies = [
"console-subscriber",
"dashmap",
"dotenv",
"flume",
"futures-util",
"http-signature-normalization-actix",
"lru",
@ -1165,6 +1166,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.8",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1315,8 +1329,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -1444,9 +1460,9 @@ dependencies = [
[[package]]
name = "http-signature-normalization-actix"
version = "0.8.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dc95d9ca3b4e2f93a97e5ccf9f26992c69a272e0abad8807180f0a9e9b59e31"
checksum = "218124b6b0c6ef27526493f50faf00b7cf8a3840bb1d5268f6ee8eef753b8225"
dependencies = [
"actix-http",
"actix-rt",
@ -1677,7 +1693,7 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin",
"spin 0.5.2",
]
[[package]]
@ -1943,6 +1959,15 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "never"
version = "0.1.0"
@ -2685,7 +2710,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -2848,9 +2873,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.101.1"
version = "0.101.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e"
checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59"
dependencies = [
"ring",
"untrusted",
@ -2892,18 +2917,18 @@ checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
[[package]]
name = "serde"
version = "1.0.175"
version = "1.0.176"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d25439cd7397d044e2748a6fe2432b5e85db703d6d097bd014b3c0ad1ebff0b"
checksum = "76dc28c9523c5d70816e393136b86d48909cfb27cecaa902d338c19ed47164dc"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.175"
version = "1.0.176"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4"
checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f"
dependencies = [
"proc-macro2",
"quote",
@ -2912,9 +2937,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.103"
version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b"
checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c"
dependencies = [
"itoa",
"ryu",
@ -3063,6 +3088,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.7.2"

View file

@ -40,6 +40,7 @@ config = "0.13.0"
console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0"
dotenv = "0.15.0"
flume = "0.10.14"
futures-util = "0.3.17"
lru = "0.11.0"
metrics = "0.21.0"
@ -88,7 +89,7 @@ default-features = false
features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix]
version = "0.8.0"
version = "0.9.1"
default-features = false
features = ["client", "server", "sha-2"]

View file

@ -40,7 +40,7 @@ impl State {
self.node_cache.clone()
}
pub(crate) fn requests(&self, config: &Config) -> Requests {
pub(crate) fn requests(&self, config: &Config, spawner: crate::requests::Spawner) -> Requests {
Requests::new(
config.generate_url(UrlKind::MainKey).to_string(),
self.private_key.clone(),
@ -49,6 +49,7 @@ impl State {
self.last_online.clone(),
config.client_pool_size(),
config.client_timeout(),
spawner,
)
}

View file

@ -242,3 +242,9 @@ impl From<rsa::errors::Error> for ErrorKind {
ErrorKind::Rsa(e)
}
}
impl From<http_signature_normalization_actix::Canceled> for ErrorKind {
fn from(_: http_signature_normalization_actix::Canceled) -> Self {
Self::Canceled
}
}

View file

@ -17,7 +17,7 @@ use crate::{
data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind},
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::Requests,
requests::{Requests, Spawner},
};
use background_jobs::{
memory_storage::{ActixTimer, Storage},
@ -44,6 +44,7 @@ pub(crate) fn create_workers(
actors: ActorCache,
media: MediaCache,
config: Config,
spawner: Spawner,
) -> JobServer {
let deliver_concurrency = config.deliver_concurrency();
@ -54,6 +55,7 @@ pub(crate) fn create_workers(
JobServer::new(queue_handle),
media.clone(),
config.clone(),
spawner.clone(),
)
})
.register::<Deliver>()
@ -110,9 +112,10 @@ impl JobState {
job_server: JobServer,
media: MediaCache,
config: Config,
spawner: Spawner,
) -> Self {
JobState {
requests: state.requests(&config),
requests: state.requests(&config, spawner),
node_cache: state.node_cache(),
actors,
config,

View file

@ -33,6 +33,8 @@ mod requests;
mod routes;
mod telegram;
use crate::requests::Spawner;
use self::{
args::Args,
config::Config,
@ -257,12 +259,19 @@ async fn do_server_main(
let keys = config.open_keys()?;
let spawner = Spawner::build()?;
let bind_address = config.bind_address();
let server = HttpServer::new(move || {
let requests = state.requests(&config);
let requests = state.requests(&config, spawner.clone());
let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
let job_server = create_workers(
state.clone(),
actors.clone(),
media.clone(),
config.clone(),
spawner.clone(),
);
let app = App::new()
.app_data(web::Data::new(db.clone()))

View file

@ -7,7 +7,7 @@ use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse, Connector};
use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*;
use http_signature_normalization_actix::{prelude::*, Canceled, Spawn};
use rand::thread_rng;
use rsa::{
pkcs1v15::SigningKey,
@ -16,7 +16,12 @@ use rsa::{
RsaPrivateKey,
};
use std::{
sync::Arc,
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::JoinHandle,
time::{Duration, SystemTime},
};
use tracing_awc::Tracing;
@ -145,7 +150,7 @@ pub(crate) struct Requests {
key_id: String,
user_agent: String,
private_key: RsaPrivateKey,
config: Config,
config: Config<Spawner>,
breakers: Breakers,
last_online: Arc<LastOnline>,
}
@ -192,6 +197,7 @@ impl Requests {
last_online: Arc<LastOnline>,
pool_size: usize,
timeout_seconds: u64,
spawner: Spawner,
) -> Self {
Requests {
pool_size,
@ -199,7 +205,7 @@ impl Requests {
key_id,
user_agent,
private_key,
config: Config::default().mastodon_compat(),
config: Config::new().mastodon_compat().spawner(spawner),
breakers,
last_online,
}
@ -415,3 +421,109 @@ impl Signer {
Ok(STANDARD.encode(signature.to_bytes().as_ref()))
}
}
fn signature_thread(
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
shutdown: flume::Receiver<()>,
) {
let stopping = AtomicBool::new(false);
while !stopping.load(Ordering::Acquire) {
flume::Selector::new()
.recv(&receiver, |res| match res {
Ok(f) => {
let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
(f)();
}));
if let Err(e) = res {
tracing::warn!("Signature fn panicked: {e:?}");
}
}
Err(_) => {
tracing::warn!("Receive error, stopping");
stopping.store(true, Ordering::Release)
}
})
.recv(&shutdown, |_| {
tracing::warn!("Stopping");
stopping.store(true, Ordering::Release)
})
.wait();
}
}
#[derive(Clone, Debug)]
pub(crate) struct Spawner {
sender: flume::Sender<Box<dyn FnOnce() + Send>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>,
shutdown: flume::Sender<()>,
}
impl Spawner {
pub(crate) fn build() -> std::io::Result<Self> {
let threads = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let (sender, receiver) = flume::bounded(8);
let (shutdown, shutdown_rx) = flume::bounded(threads);
let threads = (0..threads)
.map(|i| {
let receiver = receiver.clone();
let shutdown_rx = shutdown_rx.clone();
std::thread::Builder::new()
.name(format!("signature-thread-{i}"))
.spawn(move || {
signature_thread(receiver, shutdown_rx);
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Spawner {
sender,
threads: Some(Arc::new(threads)),
shutdown,
})
}
}
impl Drop for Spawner {
fn drop(&mut self) {
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
for _ in &threads {
let _ = self.shutdown.send(());
}
for thread in threads {
let _ = thread.join();
}
}
}
}
impl Spawn for Spawner {
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let sender = self.sender.clone();
Box::pin(async move {
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.send((func)()).is_err() {
tracing::warn!("Requestor hung up");
}
}))
.await;
rx.recv_async().await.map_err(|_| Canceled)
})
}
}