From c64056720692b06f4e5843afa32ac4b517bda23a Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 8 Jan 2024 17:00:15 -0600 Subject: [PATCH] Update to newest background-jobs, implement Job rather than ActixJob --- Cargo.lock | 129 ++++++++++++++++++++------------- Cargo.toml | 6 +- src/future.rs | 1 + src/jobs.rs | 22 +++--- src/jobs/apub/announce.rs | 8 +- src/jobs/apub/follow.rs | 8 +- src/jobs/apub/forward.rs | 8 +- src/jobs/apub/reject.rs | 8 +- src/jobs/apub/undo.rs | 8 +- src/jobs/contact.rs | 8 +- src/jobs/deliver.rs | 8 +- src/jobs/deliver_many.rs | 8 +- src/jobs/instance.rs | 8 +- src/jobs/nodeinfo.rs | 10 +-- src/jobs/process_listeners.rs | 8 +- src/jobs/record_last_online.rs | 9 +-- 16 files changed, 144 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdfa345..bd3900a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,7 +66,7 @@ dependencies = [ "actix-tls", "actix-utils", "ahash 0.8.7", - "base64 0.21.5", + "base64 0.21.6", "bitflags 2.4.1", "brotli", "bytes", @@ -395,7 +395,7 @@ dependencies = [ "anyhow", "async-cpupool", "background-jobs", - "base64 0.21.5", + "base64 0.21.6", "bcrypt", "clap", "config", @@ -438,7 +438,6 @@ dependencies = [ "tracing", "tracing-actix-web", "tracing-error", - "tracing-futures", "tracing-log", "tracing-opentelemetry", "tracing-subscriber", @@ -508,6 +507,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "autocfg" version = "1.1.0" @@ -561,19 +566,18 @@ dependencies = [ [[package]] name = "background-jobs" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "220c1b532c3b8532a43282f0871cf43d6238421f0e72084cb1f6ddb65fc0e8e6" +version = "0.17.0" +source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f" dependencies = [ "background-jobs-actix", "background-jobs-core", + "background-jobs-metrics", ] [[package]] name = "background-jobs-actix" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d084a3dec6f0bd656a7c388e255e988a340b397985bfe7bebdb0ebebb34b50b6" +version = "0.17.0" +source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f" dependencies = [ "actix-rt", "anyhow", @@ -585,17 +589,14 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-futures", "uuid", ] [[package]] name = "background-jobs-core" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c585c87a70e090f8f0b52cd25951ba156e7faca26464e11611fc6ab0d700815c" +version = "0.17.0" +source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f" dependencies = [ - "actix-rt", "anyhow", "async-trait", "event-listener", @@ -605,7 +606,19 @@ dependencies = [ "thiserror", "time", "tracing", - "tracing-futures", + "uuid", +] + +[[package]] +name = "background-jobs-metrics" +version = "0.17.0" +source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f" +dependencies = [ + "async-trait", + "background-jobs-core", + "metrics", + "metrics-util", + "tracing", "uuid", ] @@ -632,9 +645,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9" [[package]] name = "base64-simd" @@ -657,7 +670,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d1c9c15093eb224f0baa400f38fcd713fc1391a6f1c389d886beef146d60a3" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "blowfish", "getrandom", "subtle", @@ -819,9 +832,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.13" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", "clap_derive", @@ -829,9 +842,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.12" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -863,6 +876,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.13.4" @@ -987,44 +1009,37 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" +checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.17" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-utils" -version = "0.8.18" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1251,9 +1266,14 @@ dependencies = [ [[package]] name = "event-listener" -version = "2.5.3" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] [[package]] name = "fastrand" @@ -1511,7 +1531,7 @@ version = "7.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "byteorder", "flate2", "nom", @@ -1584,7 +1604,7 @@ dependencies = [ "actix-http", "actix-rt", "actix-web", - "base64 0.21.5", + "base64 0.21.6", "futures-core", "http-signature-normalization", "ring", @@ -1603,7 +1623,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86048ef6b1d59bcb2cdde0100bb16b1a29ce78ab6dd4a90706ba0791a2831b5a" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.21.6", "http-signature-normalization", "httpdate", "reqwest", @@ -1992,7 +2012,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "hyper", "indexmap 1.9.3", "ipnet", @@ -2398,6 +2418,12 @@ dependencies = [ "vlq", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.11.2" @@ -2981,7 +3007,7 @@ version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "bytes", "encoding_rs", "futures-core", @@ -3129,7 +3155,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ca6a6947c6fe6454c93c3bb65b92f9680e6f9e906e75e30631110f2227344c" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "num-bigint-dig", "rsa", "thiserror", @@ -3158,7 +3184,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b0a930679d54e46fa4e66be3d9a333026da04d2b659e42aab4dfd1586452815" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "bytecount", "itertools 0.11.0", "md5", @@ -3216,7 +3242,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", ] [[package]] @@ -3514,9 +3540,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccbca6f34534eb78dbee83f6b2c9442fea7113f43d9e80ea320f0972ae5dc08d" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "subtle" @@ -3880,7 +3906,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", "axum", - "base64 0.21.5", + "base64 0.21.6", "bytes", "futures-core", "futures-util", @@ -3909,7 +3935,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.5", + "base64 0.21.6", "bytes", "h2", "http", @@ -4162,6 +4188,7 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ + "atomic", "getrandom", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index b04f937..ba1cdcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,6 @@ thiserror = "1.0" time = { version = "0.3.17", features = ["serde"] } tracing = "0.1" tracing-error = "0.2" -tracing-futures = "0.2" tracing-log = "0.2" tracing-opentelemetry = "0.22" tracing-subscriber = { version = "0.3", features = [ @@ -85,9 +84,10 @@ uuid = { version = "1", features = ["v4", "serde"] } streem = "0.2.0" [dependencies.background-jobs] -version = "0.16.0" +version = "0.17.0" +git = "https://git.asonix.dog/asonix/background-jobs" default-features = false -features = ["background-jobs-actix", "error-logging"] +features = ["background-jobs-actix", "background-jobs-metrics", "error-logging"] [dependencies.http-signature-normalization-actix] version = "0.11.0" diff --git a/src/future.rs b/src/future.rs index 905a931..4fb7763 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,3 +1,4 @@ use std::{future::Future, pin::Pin}; pub(crate) type LocalBoxFuture<'a, T> = Pin + 'a>>; +pub(crate) type BoxFuture<'a, T> = Pin + Send + 'a>>; diff --git a/src/jobs.rs b/src/jobs.rs index 6298938..4e3ed39 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -20,6 +20,7 @@ use crate::{ }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, + metrics::MetricsStorage, Job, QueueHandle, WorkerConfig, }; use std::time::Duration; @@ -46,15 +47,18 @@ pub(crate) fn create_workers( ) -> JobServer { let deliver_concurrency = config.deliver_concurrency(); - let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| { - JobState::new( - state.clone(), - actors.clone(), - JobServer::new(queue_handle), - media.clone(), - config.clone(), - ) - }) + let queue_handle = WorkerConfig::new( + MetricsStorage::wrap(Storage::new(ActixTimer)), + move |queue_handle| { + JobState::new( + state.clone(), + actors.clone(), + JobServer::new(queue_handle), + media.clone(), + config.clone(), + ) + }, + ) .register::() .register::() .register::() diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index 480f8db..833c483 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -2,14 +2,14 @@ use crate::{ config::{Config, UrlKind}, db::Actor, error::Error, + future::BoxFuture, jobs::{ apub::{get_inboxes, prepare_activity}, DeliverMany, JobState, }, }; use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString}; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Announce { @@ -62,9 +62,9 @@ fn generate_announce( ) } -impl ActixJob for Announce { +impl Job for Announce { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::apub::Announce"; const QUEUE: &'static str = "apub"; diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs index 10cae22..d916ffc 100644 --- a/src/jobs/apub/follow.rs +++ b/src/jobs/apub/follow.rs @@ -3,6 +3,7 @@ use crate::{ config::{Config, UrlKind}, db::Actor, error::{Error, ErrorKind}, + future::BoxFuture, jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo}, }; use activitystreams::{ @@ -10,8 +11,7 @@ use activitystreams::{ iri_string::types::IriString, prelude::*, }; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Follow { @@ -111,9 +111,9 @@ fn generate_accept_follow( ) } -impl ActixJob for Follow { +impl Job for Follow { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::apub::Follow"; const QUEUE: &'static str = "apub"; diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs index f5e191b..7336fe8 100644 --- a/src/jobs/apub/forward.rs +++ b/src/jobs/apub/forward.rs @@ -2,11 +2,11 @@ use crate::{ apub::AcceptedActivities, db::Actor, error::{Error, ErrorKind}, + future::BoxFuture, jobs::{apub::get_inboxes, DeliverMany, JobState}, }; use activitystreams::prelude::*; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Forward { @@ -47,9 +47,9 @@ impl Forward { } } -impl ActixJob for Forward { +impl Job for Forward { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::apub::Forward"; const QUEUE: &'static str = "apub"; diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs index 2384426..7a1bf40 100644 --- a/src/jobs/apub/reject.rs +++ b/src/jobs/apub/reject.rs @@ -2,10 +2,10 @@ use crate::{ config::UrlKind, db::Actor, error::Error, + future::BoxFuture, jobs::{apub::generate_undo_follow, Deliver, JobState}, }; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Reject(pub(crate) Actor); @@ -33,9 +33,9 @@ impl Reject { } } -impl ActixJob for Reject { +impl Job for Reject { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::apub::Reject"; const QUEUE: &'static str = "apub"; diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs index b55d4ae..e45f06c 100644 --- a/src/jobs/apub/undo.rs +++ b/src/jobs/apub/undo.rs @@ -3,11 +3,11 @@ use crate::{ config::UrlKind, db::Actor, error::Error, + future::BoxFuture, jobs::{apub::generate_undo_follow, Deliver, JobState}, }; use activitystreams::prelude::BaseExt; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Undo { @@ -48,9 +48,9 @@ impl Undo { } } -impl ActixJob for Undo { +impl Job for Undo { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::apub::Undo"; const QUEUE: &'static str = "apub"; diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index c3c3765..7e475f3 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -1,12 +1,12 @@ use crate::{ apub::AcceptedActors, error::{Error, ErrorKind}, + future::BoxFuture, jobs::JobState, requests::BreakerStrategy, }; use activitystreams::{iri_string::types::IriString, object::Image, prelude::*}; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct QueryContact { @@ -85,9 +85,9 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, IriString, Iri Some((username, display_name, url, avatar)) } -impl ActixJob for QueryContact { +impl Job for QueryContact { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::QueryContact"; const QUEUE: &'static str = "maintenance"; diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 0f936e4..74af9f7 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -1,11 +1,11 @@ use crate::{ error::Error, + future::BoxFuture, jobs::{debug_object, JobState}, requests::BreakerStrategy, }; use activitystreams::iri_string::types::IriString; -use background_jobs::{ActixJob, Backoff}; -use std::{future::Future, pin::Pin}; +use background_jobs::{Backoff, Job}; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Deliver { @@ -56,9 +56,9 @@ impl Deliver { } } -impl ActixJob for Deliver { +impl Job for Deliver { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::Deliver"; const QUEUE: &'static str = "deliver"; diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index f932a12..8f7b413 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -1,10 +1,10 @@ use crate::{ error::Error, - future::LocalBoxFuture, + future::BoxFuture, jobs::{debug_object, Deliver, JobState}, }; use activitystreams::iri_string::types::IriString; -use background_jobs::ActixJob; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct DeliverMany { @@ -45,9 +45,9 @@ impl DeliverMany { } } -impl ActixJob for DeliverMany { +impl Job for DeliverMany { type State = JobState; - type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::DeliverMany"; const QUEUE: &'static str = "deliver"; diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index a0f3da1..c7bb12f 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -1,12 +1,12 @@ use crate::{ config::UrlKind, error::{Error, ErrorKind}, + future::BoxFuture, jobs::{Boolish, JobState}, requests::BreakerStrategy, }; use activitystreams::{iri, iri_string::types::IriString}; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct QueryInstance { @@ -165,9 +165,9 @@ impl QueryInstance { } } -impl ActixJob for QueryInstance { +impl Job for QueryInstance { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::QueryInstance"; const QUEUE: &'static str = "maintenance"; diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index f9d78a3..7094852 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -1,18 +1,18 @@ use crate::{ error::{Error, ErrorKind}, + future::BoxFuture, jobs::{Boolish, JobState, QueryContact}, requests::BreakerStrategy, }; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; -use background_jobs::ActixJob; -use std::{fmt::Debug, future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct QueryNodeinfo { actor_id: IriString, } -impl Debug for QueryNodeinfo { +impl std::fmt::Debug for QueryNodeinfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("QueryNodeinfo") .field("actor_id", &self.actor_id.to_string()) @@ -104,9 +104,9 @@ impl QueryNodeinfo { } } -impl ActixJob for QueryNodeinfo { +impl Job for QueryNodeinfo { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::QueryNodeinfo"; const QUEUE: &'static str = "maintenance"; diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index 1cad2e4..96435b3 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -1,9 +1,9 @@ use crate::{ error::Error, + future::BoxFuture, jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}, }; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; +use background_jobs::Job; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Listeners; @@ -23,9 +23,9 @@ impl Listeners { } } -impl ActixJob for Listeners { +impl Job for Listeners { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::Listeners"; const QUEUE: &'static str = "maintenance"; diff --git a/src/jobs/record_last_online.rs b/src/jobs/record_last_online.rs index 3c81b31..59333f4 100644 --- a/src/jobs/record_last_online.rs +++ b/src/jobs/record_last_online.rs @@ -1,6 +1,5 @@ -use crate::{error::Error, jobs::JobState}; -use background_jobs::{ActixJob, Backoff}; -use std::{future::Future, pin::Pin}; +use crate::{error::Error, future::BoxFuture, jobs::JobState}; +use background_jobs::{Backoff, Job}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct RecordLastOnline; @@ -14,9 +13,9 @@ impl RecordLastOnline { } } -impl ActixJob for RecordLastOnline { +impl Job for RecordLastOnline { type State = JobState; - type Future = Pin>>>; + type Future = BoxFuture<'static, anyhow::Result<()>>; const NAME: &'static str = "relay::jobs::RecordLastOnline"; const QUEUE: &'static str = "maintenance";