2019-09-22 17:41:13 +00:00
|
|
|
#![deny(missing_docs)]
|
|
|
|
|
|
|
|
//! # Background Jobs Sled Storage
|
|
|
|
//! _An implementation of the Background Jobs Storage trait based on the Sled embedded database_
|
|
|
|
//!
|
|
|
|
//! ### Usage
|
|
|
|
//! ```rust
|
|
|
|
//! use background_jobs::{ServerConfig, sled_storage::Storage};
|
|
|
|
//! use sled_extensions::{ConfigBuilder, Db};
|
|
|
|
//!
|
|
|
|
//! let db = Db::start(ConfigBuilder::default().temporary(true).build())?;
|
|
|
|
//! let storage = Storagege::new(db)?;
|
|
|
|
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
|
|
|
|
//! ```
|
|
|
|
|
2019-05-27 17:29:11 +00:00
|
|
|
use background_jobs_core::{JobInfo, Stats, Storage};
|
2019-05-25 23:09:10 +00:00
|
|
|
use chrono::offset::Utc;
|
2019-09-17 01:31:11 +00:00
|
|
|
use sled_extensions::{bincode::Tree, cbor, Db, DbExt};
|
2019-05-25 20:32:14 +00:00
|
|
|
|
2019-09-17 01:31:11 +00:00
|
|
|
pub use sled_extensions::{Error, Result};
|
2019-05-25 20:22:26 +00:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
2019-09-22 17:41:13 +00:00
|
|
|
/// The Sled-backed storage implementation
|
2019-05-25 20:22:26 +00:00
|
|
|
pub struct SledStorage {
|
2019-09-15 20:51:33 +00:00
|
|
|
jobinfo: cbor::Tree<JobInfo>,
|
|
|
|
running: Tree<u64>,
|
|
|
|
running_inverse: Tree<u64>,
|
|
|
|
queue: Tree<String>,
|
|
|
|
stats: Tree<Stats>,
|
|
|
|
lock: Tree<u64>,
|
2019-09-08 23:59:21 +00:00
|
|
|
db: Db,
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Storage for SledStorage {
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
fn generate_id(&mut self) -> Result<u64> {
|
2019-09-15 20:51:33 +00:00
|
|
|
Ok(self.db.generate_id()?)
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn save_job(&mut self, job: JobInfo) -> Result<()> {
|
2019-09-08 23:59:21 +00:00
|
|
|
self.jobinfo
|
|
|
|
.insert(job_key(job.id()).as_bytes(), job)
|
|
|
|
.map(|_| ())
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn fetch_job(&mut self, id: u64) -> Result<Option<JobInfo>> {
|
2019-09-08 23:59:21 +00:00
|
|
|
self.jobinfo.get(job_key(id))
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn fetch_job_from_queue(&mut self, queue: &str) -> Result<Option<JobInfo>> {
|
2019-05-25 21:39:16 +00:00
|
|
|
let queue_tree = self.queue.clone();
|
|
|
|
let job_tree = self.jobinfo.clone();
|
|
|
|
|
|
|
|
self.lock_queue(queue, move || {
|
2019-05-25 23:09:10 +00:00
|
|
|
let now = Utc::now();
|
|
|
|
|
2019-05-25 21:39:16 +00:00
|
|
|
let job = queue_tree
|
|
|
|
.iter()
|
|
|
|
.filter_map(|res| res.ok())
|
|
|
|
.filter_map(|(id, in_queue)| if queue == in_queue { Some(id) } else { None })
|
|
|
|
.filter_map(|id| job_tree.get(id).ok())
|
|
|
|
.filter_map(|opt| opt)
|
2019-05-25 23:09:10 +00:00
|
|
|
.filter(|job| job.is_ready(now))
|
2019-05-25 21:39:16 +00:00
|
|
|
.next();
|
|
|
|
|
|
|
|
if let Some(ref job) = job {
|
2019-08-31 16:09:26 +00:00
|
|
|
queue_tree.remove(&job_key(job.id()))?;
|
2019-05-25 21:39:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(job)
|
|
|
|
})
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn queue_job(&mut self, queue: &str, id: u64) -> Result<()> {
|
2019-08-31 16:09:26 +00:00
|
|
|
if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? {
|
|
|
|
self.running.remove(&runner_key(runner_id))?;
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
2019-08-31 16:09:26 +00:00
|
|
|
self.queue
|
2019-09-08 23:59:21 +00:00
|
|
|
.insert(job_key(id).as_bytes(), queue.to_owned())
|
2019-08-31 16:09:26 +00:00
|
|
|
.map(|_| ())
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn run_job(&mut self, id: u64, runner_id: u64) -> Result<()> {
|
2019-09-08 23:59:21 +00:00
|
|
|
self.queue.remove(job_key(id))?;
|
|
|
|
self.running.insert(runner_key(runner_id).as_bytes(), id)?;
|
|
|
|
self.running_inverse
|
|
|
|
.insert(job_key(id).as_bytes(), runner_id)?;
|
2019-05-25 20:22:26 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn delete_job(&mut self, id: u64) -> Result<()> {
|
2019-08-31 16:09:26 +00:00
|
|
|
self.jobinfo.remove(&job_key(id))?;
|
|
|
|
self.queue.remove(&job_key(id))?;
|
2019-05-25 20:22:26 +00:00
|
|
|
|
2019-08-31 16:09:26 +00:00
|
|
|
if let Some(runner_id) = self.running_inverse.remove(&job_key(id))? {
|
|
|
|
self.running.remove(&runner_key(runner_id))?;
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_stats(&self) -> Result<Stats> {
|
|
|
|
Ok(self.stats.get("stats")?.unwrap_or(Stats::default()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn update_stats<F>(&mut self, f: F) -> Result<()>
|
|
|
|
where
|
|
|
|
F: Fn(Stats) -> Stats,
|
|
|
|
{
|
|
|
|
self.stats.fetch_and_update("stats", |opt| {
|
|
|
|
let stats = match opt {
|
|
|
|
Some(stats) => stats,
|
|
|
|
None => Stats::default(),
|
|
|
|
};
|
|
|
|
|
|
|
|
Some((f)(stats))
|
|
|
|
})?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SledStorage {
|
2019-09-22 17:41:13 +00:00
|
|
|
/// Create a new Storage struct
|
2019-09-08 23:59:21 +00:00
|
|
|
pub fn new(db: Db) -> Result<Self> {
|
2019-05-25 20:22:26 +00:00
|
|
|
Ok(SledStorage {
|
2019-09-08 23:59:21 +00:00
|
|
|
jobinfo: db.open_cbor_tree("background-jobs-jobinfo")?,
|
|
|
|
running: db.open_bincode_tree("background-jobs-running")?,
|
|
|
|
running_inverse: db.open_bincode_tree("background-jobs-running-inverse")?,
|
|
|
|
queue: db.open_bincode_tree("background-jobs-queue")?,
|
|
|
|
stats: db.open_bincode_tree("background-jobs-stats")?,
|
|
|
|
lock: db.open_bincode_tree("background-jobs-lock")?,
|
2019-05-25 20:22:26 +00:00
|
|
|
db,
|
|
|
|
})
|
|
|
|
}
|
2019-05-25 21:39:16 +00:00
|
|
|
|
|
|
|
fn lock_queue<T, F>(&self, queue: &str, f: F) -> Result<T>
|
|
|
|
where
|
|
|
|
F: Fn() -> Result<T>,
|
|
|
|
{
|
|
|
|
let id = self.db.generate_id()?;
|
|
|
|
|
2019-05-27 17:29:11 +00:00
|
|
|
let mut prev;
|
2019-05-25 21:39:16 +00:00
|
|
|
while {
|
|
|
|
prev = self.lock.fetch_and_update(queue, move |opt| match opt {
|
|
|
|
Some(_) => opt,
|
|
|
|
None => Some(id),
|
|
|
|
})?;
|
|
|
|
|
|
|
|
prev.is_some()
|
|
|
|
} {}
|
|
|
|
|
|
|
|
let res = (f)();
|
|
|
|
|
|
|
|
self.lock.fetch_and_update(queue, |_| None)?;
|
|
|
|
|
|
|
|
res
|
|
|
|
}
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
2019-05-25 20:32:14 +00:00
|
|
|
fn job_key(id: u64) -> String {
|
|
|
|
format!("job-{}", id)
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|
|
|
|
|
2019-05-25 20:32:14 +00:00
|
|
|
fn runner_key(runner_id: u64) -> String {
|
|
|
|
format!("runner-{}", runner_id)
|
2019-05-25 20:22:26 +00:00
|
|
|
}
|