relay/src/jobs/mod.rs

134 lines
3 KiB
Rust
Raw Normal View History

2020-03-30 17:10:04 +00:00
pub mod apub;
mod cache_media;
mod contact;
mod deliver;
mod deliver_many;
mod instance;
mod nodeinfo;
mod process_listeners;
2020-03-30 17:10:04 +00:00
2021-02-10 04:17:20 +00:00
pub(crate) use self::{
cache_media::CacheMedia, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany,
instance::QueryInstance, nodeinfo::QueryNodeinfo,
};
use crate::{
config::Config,
2021-02-10 04:05:06 +00:00
data::{ActorCache, MediaCache, NodeCache, State},
2020-03-22 21:18:36 +00:00
db::Db,
2021-09-18 17:55:39 +00:00
error::{Error, ErrorKind},
2020-07-11 01:00:31 +00:00
jobs::process_listeners::Listeners,
requests::Requests,
};
2020-07-11 01:00:31 +00:00
use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig};
use std::time::Duration;
2021-02-10 04:17:20 +00:00
pub(crate) fn create_server() -> JobServer {
2020-07-11 01:00:31 +00:00
let shared = background_jobs::create_server(Storage::new());
shared.every(Duration::from_secs(60 * 5), Listeners);
JobServer::new(shared)
}
2021-02-10 04:17:20 +00:00
pub(crate) fn create_workers(
2020-03-30 17:10:04 +00:00
db: Db,
state: State,
actors: ActorCache,
job_server: JobServer,
2021-02-10 04:05:06 +00:00
media: MediaCache,
config: Config,
) {
let remote_handle = job_server.remote.clone();
WorkerConfig::new(move || {
JobState::new(
2020-03-30 17:10:04 +00:00
db.clone(),
state.clone(),
actors.clone(),
job_server.clone(),
media.clone(),
config.clone(),
)
})
2020-04-21 00:56:50 +00:00
.register::<Deliver>()
2020-04-21 17:15:33 +00:00
.register::<DeliverMany>()
2020-04-21 00:56:50 +00:00
.register::<QueryNodeinfo>()
.register::<QueryInstance>()
.register::<Listeners>()
.register::<CacheMedia>()
.register::<QueryContact>()
2020-04-21 00:56:50 +00:00
.register::<apub::Announce>()
.register::<apub::Follow>()
.register::<apub::Forward>()
.register::<apub::Reject>()
.register::<apub::Undo>()
2020-07-25 15:41:39 +00:00
.set_worker_count("default", 16)
.start(remote_handle);
}
2021-09-18 17:55:39 +00:00
#[derive(Clone, Debug)]
2021-02-10 04:17:20 +00:00
pub(crate) struct JobState {
2020-03-30 17:10:04 +00:00
db: Db,
requests: Requests,
state: State,
2020-03-23 22:17:53 +00:00
actors: ActorCache,
config: Config,
2021-02-10 04:05:06 +00:00
media: MediaCache,
node_cache: NodeCache,
job_server: JobServer,
}
#[derive(Clone)]
2021-02-10 04:17:20 +00:00
pub(crate) struct JobServer {
remote: QueueHandle,
}
2021-09-18 17:55:39 +00:00
impl std::fmt::Debug for JobServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobServer")
.field("queue_handle", &"QueueHandle")
.finish()
}
}
impl JobState {
fn new(
2020-03-30 17:10:04 +00:00
db: Db,
state: State,
actors: ActorCache,
job_server: JobServer,
2021-02-10 04:05:06 +00:00
media: MediaCache,
config: Config,
) -> Self {
JobState {
2021-09-21 18:26:31 +00:00
requests: state.requests(&config),
node_cache: state.node_cache(),
2020-03-30 17:10:04 +00:00
db,
2020-03-23 22:17:53 +00:00
actors,
config,
media,
state,
job_server,
}
}
}
impl JobServer {
fn new(remote_handle: QueueHandle) -> Self {
JobServer {
remote: remote_handle,
}
}
2021-09-18 17:55:39 +00:00
pub(crate) fn queue<J>(&self, job: J) -> Result<(), Error>
where
J: Job,
{
2021-09-18 17:55:39 +00:00
self.remote
.queue(job)
.map_err(ErrorKind::Queue)
.map_err(Into::into)
}
}