diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 1fee943..7b0d970 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -134,11 +134,7 @@ impl Storage { let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); if job.is_ready(now) { - self.run_job( - &buckets, - inner_txn, - std::str::from_utf8(key).unwrap().parse().unwrap(), - )?; + self.run_job(&buckets, inner_txn, key)?; jobs.push(job); } @@ -163,11 +159,11 @@ impl Storage { pub fn store_job(&self, mut job: JobInfo) -> Result<(), Error> { let job_id = match job.id() { - Some(id) => id, + Some(id) => id.to_string(), None => { let id = self.get_new_id()?; job.set_id(id); - id + id.to_string() } }; @@ -198,10 +194,10 @@ impl Storage { trace!("Set value"); match status { - JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id)?, - JobStatus::Running => self.run_job(&buckets, &mut txn, job_id)?, - JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id)?, - JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id)?, + JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref())?, + JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref())?, + JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref())?, + JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id.as_ref())?, } trace!("Committing"); @@ -216,7 +212,7 @@ impl Storage { &self, buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { self.add_job_to(&buckets.queued, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; @@ -230,7 +226,7 @@ impl Storage { &self, buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { self.add_job_to(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; @@ -244,7 +240,7 @@ impl Storage { &self, buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { self.add_job_to(&buckets.running, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; @@ -258,7 +254,7 @@ impl Storage { &self, buckets: &'env Buckets<'env>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { self.add_job_to(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; @@ -272,13 +268,9 @@ impl Storage { &self, bucket: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { - txn.set( - bucket, - id.to_string().as_ref(), - Json::to_value_buf(self.runner_id)?, - )?; + txn.set(bucket, id, Json::to_value_buf(self.runner_id)?)?; trace!("Set value"); Ok(()) @@ -288,9 +280,9 @@ impl Storage { &self, bucket: &'env Bucket<&[u8], ValueBuf>>, txn: &mut Txn<'env>, - id: usize, + id: &[u8], ) -> Result<(), Error> { - match txn.del(bucket, id.to_string().as_ref()) { + match txn.del(bucket, id) { Ok(_) => (), Err(e) => match e { Error::NotFound => (),