relay/src/jobs.rs

202 lines
4.7 KiB
Rust
Raw Normal View History

2020-03-30 17:10:04 +00:00
pub mod apub;
mod contact;
mod deliver;
mod deliver_many;
mod instance;
mod nodeinfo;
mod process_listeners;
mod record_last_online;
2020-03-30 17:10:04 +00:00
2021-02-10 04:17:20 +00:00
pub(crate) use self::{
2022-11-20 03:32:45 +00:00
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
nodeinfo::QueryNodeinfo,
};
use crate::{
config::Config,
data::{ActorCache, MediaCache, State},
2021-09-18 17:55:39 +00:00
error::{Error, ErrorKind},
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
};
2022-07-02 19:07:25 +00:00
use background_jobs::{
2024-01-14 20:56:07 +00:00
memory_storage::{Storage, TokioTimer},
metrics::MetricsStorage,
2024-01-14 20:56:07 +00:00
tokio::{QueueHandle, WorkerConfig},
Job,
2022-07-02 19:07:25 +00:00
};
2023-06-23 20:08:59 +00:00
use std::time::Duration;
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
let mut object = &activity["object"]["type"];
if object.is_null() {
object = &activity["object"]["id"];
}
if object.is_null() {
object = &activity["object"];
}
object
}
pub(crate) fn build_storage() -> MetricsStorage<Storage<TokioTimer>> {
MetricsStorage::wrap(Storage::new(TokioTimer))
}
2021-02-10 04:17:20 +00:00
pub(crate) fn create_workers(
storage: MetricsStorage<Storage<TokioTimer>>,
state: State,
actors: ActorCache,
2021-02-10 04:05:06 +00:00
media: MediaCache,
config: Config,
2024-01-14 20:56:07 +00:00
) -> std::io::Result<JobServer> {
2023-07-25 19:45:15 +00:00
let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new(storage, move |queue_handle| {
JobState::new(
state.clone(),
actors.clone(),
JobServer::new(queue_handle),
media.clone(),
config.clone(),
)
})
2020-04-21 00:56:50 +00:00
.register::<Deliver>()
2020-04-21 17:15:33 +00:00
.register::<DeliverMany>()
2020-04-21 00:56:50 +00:00
.register::<QueryNodeinfo>()
.register::<QueryInstance>()
.register::<Listeners>()
.register::<QueryContact>()
.register::<RecordLastOnline>()
2020-04-21 00:56:50 +00:00
.register::<apub::Announce>()
.register::<apub::Follow>()
.register::<apub::Forward>()
.register::<apub::Reject>()
.register::<apub::Undo>()
2024-01-18 17:31:26 +00:00
.set_worker_count("maintenance", 2)
.set_worker_count("apub", 2)
.set_worker_count("deliver", deliver_concurrency)
2024-01-14 20:56:07 +00:00
.start()?;
2021-10-30 00:26:57 +00:00
2024-01-14 20:56:07 +00:00
queue_handle.every(Duration::from_secs(60 * 5), Listeners)?;
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?;
2021-11-23 18:43:52 +00:00
2024-01-14 20:56:07 +00:00
Ok(JobServer::new(queue_handle))
}
2021-09-18 17:55:39 +00:00
#[derive(Clone, Debug)]
2021-02-10 04:17:20 +00:00
pub(crate) struct JobState {
state: State,
2020-03-23 22:17:53 +00:00
actors: ActorCache,
config: Config,
2021-02-10 04:05:06 +00:00
media: MediaCache,
job_server: JobServer,
}
#[derive(Clone)]
2021-02-10 04:17:20 +00:00
pub(crate) struct JobServer {
remote: QueueHandle,
}
2021-09-18 17:55:39 +00:00
impl std::fmt::Debug for JobServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobServer")
.field("queue_handle", &"QueueHandle")
.finish()
}
}
impl JobState {
fn new(
state: State,
actors: ActorCache,
job_server: JobServer,
2021-02-10 04:05:06 +00:00
media: MediaCache,
config: Config,
) -> Self {
JobState {
state,
2020-03-23 22:17:53 +00:00
actors,
config,
media,
job_server,
}
}
}
impl JobServer {
fn new(remote_handle: QueueHandle) -> Self {
JobServer {
remote: remote_handle,
}
}
2021-10-11 19:19:32 +00:00
pub(crate) async fn queue<J>(&self, job: J) -> Result<(), Error>
where
J: Job,
{
2021-09-18 17:55:39 +00:00
self.remote
.queue(job)
2021-10-11 19:19:32 +00:00
.await
2021-09-18 17:55:39 +00:00
.map_err(ErrorKind::Queue)
.map_err(Into::into)
}
}
2022-11-15 19:47:31 +00:00
struct Boolish {
inner: bool,
}
impl std::ops::Deref for Boolish {
type Target = bool;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<'de> serde::Deserialize<'de> for Boolish {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(serde::Deserialize)]
#[serde(untagged)]
enum BoolThing {
Bool(bool),
String(String),
}
let thing: BoolThing = serde::Deserialize::deserialize(deserializer)?;
match thing {
BoolThing::Bool(inner) => Ok(Boolish { inner }),
BoolThing::String(s) if s.to_lowercase() == "false" => Ok(Boolish { inner: false }),
BoolThing::String(_) => Ok(Boolish { inner: true }),
}
}
}
#[cfg(test)]
mod tests {
use super::Boolish;
#[test]
fn boolish_works() {
const CASES: &[(&str, bool)] = &[
("false", false),
("\"false\"", false),
("\"FALSE\"", false),
("true", true),
("\"true\"", true),
("\"anything else\"", true),
];
for (case, output) in CASES {
let b: Boolish = serde_json::from_str(case).unwrap();
assert_eq!(*b, *output);
}
}
}