pub mod apub; mod contact; mod deliver; mod deliver_many; mod instance; mod nodeinfo; mod process_listeners; mod record_last_online; pub(crate) use self::{ contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, }; use crate::{ config::Config, data::{ActorCache, MediaCache, NodeCache, State}, error::{Error, ErrorKind}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, requests::Requests, }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, Job, Manager, QueueHandle, WorkerConfig, }; use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { let mut object = &activity["object"]["type"]; if object.is_null() { object = &activity["object"]["id"]; } if object.is_null() { object = &activity["object"]; } object } pub(crate) fn create_workers( state: State, actors: ActorCache, media: MediaCache, config: Config, ) -> (Manager, JobServer) { let parallelism = std::thread::available_parallelism() .unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero")); let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), actors.clone(), JobServer::new(queue_handle), media.clone(), config.clone(), ) }) .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .register::() .set_worker_count("maintenance", 2) .set_worker_count("apub", 2) .set_worker_count("deliver", 8) .start_with_threads(parallelism); shared.every(Duration::from_secs(60 * 5), Listeners); shared.every(Duration::from_secs(60 * 10), RecordLastOnline); let job_server = JobServer::new(shared.queue_handle().clone()); (shared, job_server) } #[derive(Clone, Debug)] pub(crate) struct JobState { requests: Requests, state: State, actors: ActorCache, config: Config, media: MediaCache, node_cache: NodeCache, job_server: JobServer, } #[derive(Clone)] pub(crate) struct JobServer { remote: QueueHandle, } impl std::fmt::Debug for JobServer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("JobServer") .field("queue_handle", &"QueueHandle") .finish() } } impl JobState { fn new( state: State, actors: ActorCache, job_server: JobServer, media: MediaCache, config: Config, ) -> Self { JobState { requests: state.requests(&config), node_cache: state.node_cache(), actors, config, media, state, job_server, } } } impl JobServer { fn new(remote_handle: QueueHandle) -> Self { JobServer { remote: remote_handle, } } pub(crate) async fn queue(&self, job: J) -> Result<(), Error> where J: Job, { self.remote .queue(job) .await .map_err(ErrorKind::Queue) .map_err(Into::into) } } struct Boolish { inner: bool, } impl std::ops::Deref for Boolish { type Target = bool; fn deref(&self) -> &Self::Target { &self.inner } } impl<'de> serde::Deserialize<'de> for Boolish { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { #[derive(serde::Deserialize)] #[serde(untagged)] enum BoolThing { Bool(bool), String(String), } let thing: BoolThing = serde::Deserialize::deserialize(deserializer)?; match thing { BoolThing::Bool(inner) => Ok(Boolish { inner }), BoolThing::String(s) if s.to_lowercase() == "false" => Ok(Boolish { inner: false }), BoolThing::String(_) => Ok(Boolish { inner: true }), } } } #[cfg(test)] mod tests { use super::Boolish; #[test] fn boolish_works() { const CASES: &[(&str, bool)] = &[ ("false", false), ("\"false\"", false), ("\"FALSE\"", false), ("true", true), ("\"true\"", true), ("\"anything else\"", true), ]; for (case, output) in CASES { let b: Boolish = serde_json::from_str(case).unwrap(); assert_eq!(*b, *output); } } }