diff --git a/docs/postgres-planning.md b/docs/postgres-planning.md index 49b4d6f..36801d0 100644 --- a/docs/postgres-planning.md +++ b/docs/postgres-planning.md @@ -165,8 +165,8 @@ CREATE TABLE queue ( ); -CREATE INDEX queue_status_index ON queue INCLUDE status; -CREATE INDEX heartbeat_index ON queue +CREATE INDEX queue_status_index ON queue INCLUDE queue, status; +CREATE INDEX heartbeat_index ON queue INCLUDE heartbeat; ``` claiming a job can be diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 9ffd3f4..190ac2b 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -9,7 +9,7 @@ use diesel_async::{ deadpool::{BuildError, Pool, PoolError}, AsyncDieselConnectionManager, }, - AsyncPgConnection, RunQueryDsl, + AsyncConnection, AsyncPgConnection, RunQueryDsl, }; use url::Url; @@ -223,63 +223,205 @@ impl HashRepo for PostgresRepo { let timestamp = to_primitive(timestamp); - /* - insert_into(hashes).values(( - hash.eq(&input_hash), - identifier.eq(&input_identifier) - )) - */ + let res = diesel::insert_into(hashes) + .values(( + hash.eq(&input_hash), + identifier.eq(input_identifier.as_ref()), + created_at.eq(×tamp), + )) + .execute(&mut conn) + .await; - todo!() + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(HashAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } - async fn update_identifier(&self, hash: Hash, identifier: &Arc) -> Result<(), RepoError> { - todo!() + async fn update_identifier( + &self, + input_hash: Hash, + input_identifier: &Arc, + ) -> Result<(), RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::update(hashes) + .filter(hash.eq(&input_hash)) + .set(identifier.eq(input_identifier.as_ref())) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) } - async fn identifier(&self, hash: Hash) -> Result>, RepoError> { - todo!() + async fn identifier(&self, input_hash: Hash) -> Result>, RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = hashes + .select(identifier) + .filter(hash.eq(&input_hash)) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)?; + + Ok(opt.map(Arc::from)) } async fn relate_variant_identifier( &self, - hash: Hash, - variant: String, - identifier: &Arc, + input_hash: Hash, + input_variant: String, + input_identifier: &Arc, ) -> Result, RepoError> { - todo!() + use schema::variants::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let res = diesel::insert_into(variants) + .values(( + hash.eq(&input_hash), + variant.eq(&input_variant), + identifier.eq(input_identifier.as_ref()), + )) + .execute(&mut conn) + .await; + + match res { + Ok(_) => Ok(Ok(())), + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => Ok(Err(VariantAlreadyExists)), + Err(e) => Err(PostgresError::Diesel(e).into()), + } } async fn variant_identifier( &self, - hash: Hash, - variant: String, + input_hash: Hash, + input_variant: String, ) -> Result>, RepoError> { - todo!() + use schema::variants::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = variants + .select(identifier) + .filter(hash.eq(&input_hash)) + .filter(variant.eq(&input_variant)) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .map(Arc::from); + + Ok(opt) } - async fn variants(&self, hash: Hash) -> Result)>, RepoError> { - todo!() + async fn variants(&self, input_hash: Hash) -> Result)>, RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let vec = variants + .select((variant, identifier)) + .filter(hash.eq(&input_hash)) + .get_results::<(String, String)>(&mut conn) + .await + .map_err(PostgresError::Diesel)? + .into_iter() + .map(|(s, i)| (s, Arc::from(i))) + .collect(); + + Ok(vec) } - async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> { - todo!() + async fn remove_variant( + &self, + input_hash: Hash, + input_variant: String, + ) -> Result<(), RepoError> { + use schema::variants::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::delete(variants) + .filter(hash.eq(&input_hash)) + .filter(variant.eq(&input_variant)) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) } async fn relate_motion_identifier( &self, - hash: Hash, - identifier: &Arc, + input_hash: Hash, + input_identifier: &Arc, ) -> Result<(), RepoError> { - todo!() + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + diesel::update(hashes) + .filter(hash.eq(&input_hash)) + .set(motion_identifier.eq(input_identifier.as_ref())) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) } - async fn motion_identifier(&self, hash: Hash) -> Result>, RepoError> { - todo!() + async fn motion_identifier(&self, input_hash: Hash) -> Result>, RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = hashes + .select(motion_identifier) + .filter(hash.eq(&input_hash)) + .get_result::>(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + .flatten() + .map(Arc::from); + + Ok(opt) } - async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> { - todo!() + async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> { + let mut conn = self.pool.get().await.map_err(PostgresError::Pool)?; + + conn.transaction(|conn| { + Box::pin(async move { + diesel::delete(schema::hashes::dsl::hashes) + .filter(schema::hashes::dsl::hash.eq(&input_hash)) + .execute(conn) + .await?; + + diesel::delete(schema::variants::dsl::variants) + .filter(schema::variants::dsl::hash.eq(&input_hash)) + .execute(conn) + .await + }) + }) + .await + .map_err(PostgresError::Diesel)?; + + Ok(()) } }