pub mod apub; mod contact; mod deliver; mod deliver_many; mod instance; mod nodeinfo; mod process_listeners; mod record_last_online; pub(crate) use self::{ contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, }; use crate::{ config::Config, data::{ActorCache, MediaCache, State}, error::{Error, ErrorKind}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, }; use background_jobs::{ memory_storage::{Storage, TokioTimer}, metrics::MetricsStorage, tokio::{QueueHandle, WorkerConfig}, Job, }; use std::time::Duration; fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { let mut object = &activity["object"]["type"]; if object.is_null() { object = &activity["object"]["id"]; } if object.is_null() { object = &activity["object"]; } object } pub(crate) fn build_storage() -> MetricsStorage> { MetricsStorage::wrap(Storage::new(TokioTimer)) } pub(crate) fn create_workers( storage: MetricsStorage>, state: State, actors: ActorCache, media: MediaCache, config: Config, ) -> std::io::Result { let deliver_concurrency = config.deliver_concurrency(); let queue_handle = WorkerConfig::new(storage, move |queue_handle| { JobState::new( state.clone(), actors.clone(), JobServer::new(queue_handle), media.clone(), config.clone(), ) }) .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .set_worker_count("maintenance", 2) .set_worker_count("apub", 2) .set_worker_count("deliver", deliver_concurrency) .start()?; queue_handle.every(Duration::from_secs(60 * 5), Listeners)?; queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?; Ok(JobServer::new(queue_handle)) } #[derive(Clone, Debug)] pub(crate) struct JobState { state: State, actors: ActorCache, config: Config, media: MediaCache, job_server: JobServer, } #[derive(Clone)] pub(crate) struct JobServer { remote: QueueHandle, } impl std::fmt::Debug for JobServer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("JobServer") .field("queue_handle", &"QueueHandle") .finish() } } impl JobState { fn new( state: State, actors: ActorCache, job_server: JobServer, media: MediaCache, config: Config, ) -> Self { JobState { state, actors, config, media, job_server, } } } impl JobServer { fn new(remote_handle: QueueHandle) -> Self { JobServer { remote: remote_handle, } } pub(crate) async fn queue(&self, job: J) -> Result<(), Error> where J: Job, { self.remote .queue(job) .await .map_err(ErrorKind::Queue) .map_err(Into::into) } } struct Boolish { inner: bool, } impl std::ops::Deref for Boolish { type Target = bool; fn deref(&self) -> &Self::Target { &self.inner } } impl<'de> serde::Deserialize<'de> for Boolish { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { #[derive(serde::Deserialize)] #[serde(untagged)] enum BoolThing { Bool(bool), String(String), } let thing: BoolThing = serde::Deserialize::deserialize(deserializer)?; match thing { BoolThing::Bool(inner) => Ok(Boolish { inner }), BoolThing::String(s) if s.to_lowercase() == "false" => Ok(Boolish { inner: false }), BoolThing::String(_) => Ok(Boolish { inner: true }), } } } #[cfg(test)] mod tests { use super::Boolish; #[test] fn boolish_works() { const CASES: &[(&str, bool)] = &[ ("false", false), ("\"false\"", false), ("\"FALSE\"", false), ("true", true), ("\"true\"", true), ("\"anything else\"", true), ]; for (case, output) in CASES { let b: Boolish = serde_json::from_str(case).unwrap(); assert_eq!(*b, *output); } } }