Compare commits

...

5 commits

10 changed files with 162 additions and 83 deletions

View file

@ -32,8 +32,11 @@ async fn main() -> anyhow::Result<()> {
.init();
// Set up our Storage
let storage =
Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?;
let storage = Storage::connect(
"postgres://postgres:postgres@localhost:5432/db".parse()?,
None,
)
.await?;
let arbiter = Arbiter::new();

View file

@ -23,5 +23,6 @@ tokio = { version = "1", default-features = false, features = [
"macros",
"rt",
"sync",
"tracing",
] }
uuid = { version = "1", features = ["v7", "serde"] }

View file

@ -1,13 +1,14 @@
use std::future::Future;
use background_jobs_core::{JoinError, UnsendSpawner};
use tokio::task::JoinHandle;
/// Provide a spawner for actix-based systems for Unsend Jobs
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActixSpawner;
#[doc(hidden)]
pub struct ActixHandle<T>(actix_rt::task::JoinHandle<T>);
pub struct ActixHandle<T>(Option<JoinHandle<T>>);
impl UnsendSpawner for ActixSpawner {
type Handle<T> = ActixHandle<T> where T: Send;
@ -17,7 +18,7 @@ impl UnsendSpawner for ActixSpawner {
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
ActixHandle(actix_rt::spawn(future))
ActixHandle(crate::spawn::spawn("job-task", future).ok())
}
}
@ -30,14 +31,19 @@ impl<T> Future for ActixHandle<T> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(std::pin::Pin::new(&mut self.0).poll(cx));
std::task::Poll::Ready(res.map_err(|_| JoinError))
if let Some(mut handle) = self.0.as_mut() {
let res = std::task::ready!(std::pin::Pin::new(&mut handle).poll(cx));
std::task::Poll::Ready(res.map_err(|_| JoinError))
} else {
std::task::Poll::Ready(Err(JoinError))
}
}
}
impl<T> Drop for ActixHandle<T> {
fn drop(&mut self) {
self.0.abort();
if let Some(handle) = &self.0 {
handle.abort();
}
}
}

View file

@ -1,7 +1,7 @@
use crate::QueueHandle;
use actix_rt::time::{interval_at, Instant};
use background_jobs_core::Job;
use std::time::Duration;
use tokio::time::{interval_at, Instant};
/// A type used to schedule recurring jobs.
///

View file

@ -131,6 +131,7 @@ use tokio::sync::Notify;
mod actix_job;
mod every;
mod server;
mod spawn;
mod storage;
mod worker;
@ -148,9 +149,7 @@ impl Timer for ActixTimer {
where
F: std::future::Future + Send + Sync,
{
actix_rt::time::timeout(duration, future)
.await
.map_err(|_| ())
tokio::time::timeout(duration, future).await.map_err(|_| ())
}
}
@ -444,12 +443,12 @@ where
let extras_2 = extras.clone();
arbiter.spawn_fn(move || {
actix_rt::spawn(worker::local_worker(
queue,
processors.cached(),
server,
extras_2,
));
if let Err(e) = spawn::spawn(
"local-worker",
worker::local_worker(queue, processors.cached(), server, extras_2),
) {
tracing::error!("Failed to spawn worker {e}");
}
});
}
}
@ -496,10 +495,10 @@ impl QueueHandle {
///
/// This job will be added to it's queue on the server once every `Duration`. It will be
/// processed whenever workers are free to do so.
pub fn every<J>(&self, duration: Duration, job: J)
pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
where
J: Job + Clone + Send + 'static,
{
actix_rt::spawn(every(self.clone(), duration, job));
spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
}
}

20
jobs-actix/src/spawn.rs Normal file
View file

@ -0,0 +1,20 @@
use std::future::Future;
use tokio::task::JoinHandle;
#[cfg(tokio_unstable)]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + 'static,
{
tokio::task::Builder::new().name(name).spawn_local(future)
}
#[cfg(not(tokio_unstable))]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + 'static,
{
let _ = name;
Ok(tokio::task::spawn_local(future))
}

View file

@ -24,14 +24,21 @@ impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State,
let extras = self.extras.take().unwrap();
if let Ok(true) = res {
actix_rt::spawn(local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
extras,
));
if let Err(e) = crate::spawn::spawn(
"local-worker",
local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
extras,
),
) {
tracing::error!("Failed to re-spawn local worker: {e}");
} else {
metrics::counter!("background-jobs.actix.worker.restart").increment(1);
}
} else {
tracing::warn!("Not restarting worker, Arbiter is dead");
tracing::info!("Shutting down worker");
drop(extras);
}
}
@ -57,8 +64,7 @@ async fn heartbeat_job<F: Future>(
runner_id: Uuid,
heartbeat_interval: u64,
) -> F::Output {
let mut interval =
actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut future = std::pin::pin!(future);
@ -86,7 +92,7 @@ async fn heartbeat_job<F: Future>(
}
async fn time_job<F: Future>(future: F, job_id: Uuid) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.tick().await;
let mut count = 0;
@ -147,7 +153,7 @@ pub(crate) async fn local_worker<State, Extras>(
let id = Uuid::now_v7();
let log_on_drop = RunOnDrop(|| {
make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing"));
make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing"));
});
loop {

View file

@ -56,16 +56,38 @@ struct DropHandle<T> {
handle: JoinHandle<T>,
}
fn spawn<F: Future + Send + 'static>(name: &str, future: F) -> DropHandle<F::Output>
fn spawn<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<DropHandle<F::Output>>
where
F::Output: Send,
{
DropHandle {
handle: tokio::task::Builder::new()
.name(name)
.spawn(future)
.expect("Spawned task"),
}
Ok(DropHandle {
handle: spawn_detach(name, future)?,
})
}
#[cfg(tokio_unstable)]
fn spawn_detach<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<JoinHandle<F::Output>>
where
F::Output: Send,
{
tokio::task::Builder::new().name(name).spawn(future)
}
#[cfg(not(tokio_unstable))]
fn spawn_detach<F: Future + Send + 'static>(
name: &str,
future: F,
) -> std::io::Result<JoinHandle<F::Output>>
where
F::Output: Send,
{
Ok(tokio::spawn(future))
}
#[derive(Debug)]
@ -78,6 +100,9 @@ pub enum ConnectPostgresError {
/// Error constructing the connection pool
BuildPool(BuildError),
/// Error spawning tokio task
SpawnTask(std::io::Error),
}
#[derive(Debug)]
@ -441,14 +466,23 @@ impl background_jobs_core::Storage for Storage {
}
impl Storage {
pub async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> {
pub async fn connect(
postgres_url: Url,
migration_table: Option<&str>,
) -> Result<Self, ConnectPostgresError> {
let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls)
.await
.map_err(ConnectPostgresError::ConnectForMigration)?;
let handle = spawn("postgres-migrations", conn);
let handle = spawn("postgres-migrations", conn)?;
embedded::migrations::runner()
let mut runner = embedded::migrations::runner();
if let Some(table_name) = migration_table {
runner.set_migration_table_name(table_name);
}
runner
.run_async(&mut client)
.await
.map_err(ConnectPostgresError::Migration)?;
@ -495,7 +529,7 @@ impl Storage {
let handle = spawn(
"postgres-delegate-notifications",
delegate_notifications(rx, inner.clone(), parallelism * 8),
);
)?;
let drop_handle = Arc::new(handle);
@ -556,7 +590,7 @@ impl<'a> JobNotifierState<'a> {
.or_insert_with(|| Arc::new(Notify::const_new()))
.notify_one();
metrics::counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1);
metrics::counter!("background-jobs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1);
}
}
@ -574,7 +608,7 @@ async fn delegate_notifications(
while let Ok(notification) = receiver.recv_async().await {
tracing::trace!("delegate_notifications: looping");
metrics::counter!("pict-rs.postgres.notification").increment(1);
metrics::counter!("background-jobs.postgres.notification").increment(1);
match notification.channel() {
"queue_status_channel" => {
@ -623,7 +657,7 @@ fn spawn_db_notification_task(
sender: flume::Sender<Notification>,
mut conn: Connection<Socket, NoTlsStream>,
) -> std::io::Result<()> {
tokio::task::Builder::new().name("postgres-notifications").spawn(async move {
spawn_detach("postgres-notifications", async move {
while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
tracing::trace!("db_notification_task: looping");
@ -699,6 +733,12 @@ impl From<BuildError> for ConnectPostgresError {
}
}
impl From<std::io::Error> for ConnectPostgresError {
fn from(value: std::io::Error) -> Self {
Self::SpawnTask(value)
}
}
impl std::fmt::Display for ConnectPostgresError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -707,6 +747,7 @@ impl std::fmt::Display for ConnectPostgresError {
write!(f, "Failed to connect to postgres for migrations")
}
Self::Migration(_) => write!(f, "Failed to run migrations"),
Self::SpawnTask(_) => write!(f, "Failed to spawn task"),
}
}
}
@ -717,6 +758,7 @@ impl std::error::Error for ConnectPostgresError {
Self::BuildPool(e) => Some(e),
Self::ConnectForMigration(e) => Some(e),
Self::Migration(e) => Some(e),
Self::SpawnTask(e) => Some(e),
}
}
}

View file

@ -11,7 +11,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.0.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
bincode = "1.2"
@ -20,6 +19,6 @@ serde = { version = "1", features = ["derive"] }
serde_cbor = "0.11"
time = { version = "0.3", features = ["serde-human-readable"] }
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt", "sync"] }
tokio = { version = "1", default-features = false, features = ["rt", "sync", "tracing"] }
tracing = "0.1"
uuid = { version = "1", features = ["v7", "serde"] }

View file

@ -13,7 +13,6 @@
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
//! ```
use actix_rt::task::JoinError;
use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
use sled::{Db, Tree};
use std::{
@ -22,7 +21,10 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::Notify;
use tokio::{
sync::Notify,
task::{JoinError, JoinHandle},
};
use uuid::{NoContext, Timestamp, Uuid};
/// The error produced by sled storage calls
@ -95,6 +97,25 @@ struct Inner {
_db: Db,
}
#[cfg(tokio_unstable)]
fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
tokio::task::Builder::new().name(name).spawn_blocking(f)
}
#[cfg(not(tokio_unstable))]
fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let _ = name;
Ok(tokio::task::spawn_blocking(f))
}
#[async_trait::async_trait]
impl background_jobs_core::Storage for Storage {
type Error = Error;
@ -103,20 +124,14 @@ impl background_jobs_core::Storage for Storage {
async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-info")
.spawn_blocking(move || this.get(job_id))?
.await?
spawn_blocking("jobs-info", move || this.get(job_id))?.await?
}
#[tracing::instrument(skip_all)]
async fn push(&self, job: NewJobInfo) -> Result<Uuid> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-push")
.spawn_blocking(move || this.insert(job.build()))?
.await?
spawn_blocking("jobs-push", move || this.insert(job.build()))?.await?
}
#[tracing::instrument(skip(self))]
@ -126,22 +141,18 @@ impl background_jobs_core::Storage for Storage {
let this = self.clone();
let queue2 = queue.to_string();
if let Some(job) = tokio::task::Builder::new()
.name("jobs-try-pop")
.spawn_blocking(move || this.try_pop(queue2, runner_id))?
.await??
if let Some(job) =
spawn_blocking("jobs-try-pop", move || this.try_pop(queue2, runner_id))?.await??
{
return Ok(job);
}
let this = self.clone();
let queue2 = queue.to_string();
let duration = tokio::task::Builder::new()
.name("jobs-next-duration")
.spawn_blocking(move || {
this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
})?
.await?;
let duration = spawn_blocking("jobs-next-duration", move || {
this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
})?
.await?;
match tokio::time::timeout(duration, notifier.notified()).await {
Ok(()) => {
@ -158,19 +169,17 @@ impl background_jobs_core::Storage for Storage {
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-heartbeat")
.spawn_blocking(move || this.set_heartbeat(job_id, runner_id))?
.await?
spawn_blocking("jobs-heartbeat", move || {
this.set_heartbeat(job_id, runner_id)
})?
.await?
}
#[tracing::instrument(skip(self))]
async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<bool> {
let this = self.clone();
let mut job = if let Some(job) = tokio::task::Builder::new()
.name("jobs-remove")
.spawn_blocking(move || this.remove_job(id))?
.await??
let mut job = if let Some(job) =
spawn_blocking("jobs-remove", move || this.remove_job(id))?.await??
{
job
} else {
@ -183,19 +192,13 @@ impl background_jobs_core::Storage for Storage {
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unexecuted | JobResult::Unregistered => {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-requeue")
.spawn_blocking(move || this.insert(job))?
.await??;
spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
Ok(false)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-requeue")
.spawn_blocking(move || this.insert(job))?
.await??;
spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
Ok(false)
}
// dead jobs are removed