Initial postgres work

This commit is contained in:
asonix 2023-09-02 11:52:55 -05:00
parent c9ba4672a2
commit 8c532c97e6
13 changed files with 282 additions and 75 deletions

7
Cargo.lock generated
View file

@ -388,6 +388,12 @@ dependencies = [
"rustc-demangle", "rustc-demangle",
] ]
[[package]]
name = "barrel"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad9e605929a6964efbec5ac0884bd0fe93f12a3b1eb271f52c251316640c68d9"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.13.1" version = "0.13.1"
@ -1788,6 +1794,7 @@ dependencies = [
"actix-web", "actix-web",
"anyhow", "anyhow",
"async-trait", "async-trait",
"barrel",
"base64 0.21.3", "base64 0.21.3",
"clap", "clap",
"color-eyre", "color-eyre",

View file

@ -20,6 +20,7 @@ actix-server = "2.0.0"
actix-web = { version = "4.0.0", default-features = false } actix-web = { version = "4.0.0", default-features = false }
anyhow = "1.0" anyhow = "1.0"
async-trait = "0.1.51" async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] }
base64 = "0.21.0" base64 = "0.21.0"
clap = { version = "4.0.2", features = ["derive"] } clap = { version = "4.0.2", features = ["derive"] }
color-eyre = "0.6" color-eyre = "0.6"

View file

@ -32,6 +32,7 @@
cargo cargo
cargo-outdated cargo-outdated
clippy clippy
diesel-cli
exiftool exiftool
ffmpeg_6-full ffmpeg_6-full
garage garage

View file

@ -12,8 +12,8 @@ use defaults::Defaults;
pub(crate) use commandline::Operation; pub(crate) use commandline::Operation;
pub(crate) use file::{ pub(crate) use file::{
Animation, ConfigFile as Configuration, Image, Media, ObjectStorage, OpenTelemetry, Repo, Sled, Animation, ConfigFile as Configuration, Image, Media, ObjectStorage, OpenTelemetry, Postgres,
Store, Tracing, Video, Repo, Sled, Store, Tracing, Video,
}; };
pub(crate) use primitives::{Filesystem, LogFormat}; pub(crate) use primitives::{Filesystem, LogFormat};

View file

@ -369,8 +369,64 @@ impl Args {
from: from.into(), from: from.into(),
to: to.into(), to: to.into(),
}, },
config_file,
save_to, save_to,
config_file,
},
MigrateRepoTo::Postgres(MigratePostgresInner { to }) => Output {
config_format: ConfigFormat {
server,
client,
old_repo,
tracing,
metrics,
media,
repo: None,
store: None,
},
operation: Operation::MigrateRepo {
from: from.into(),
to: to.into(),
},
save_to,
config_file,
},
},
MigrateRepoFrom::Postgres(MigratePostgresRepo { from, to }) => match to {
MigrateRepoTo::Sled(MigrateSledInner { to }) => Output {
config_format: ConfigFormat {
server,
client,
old_repo,
tracing,
metrics,
media,
repo: None,
store: None,
},
operation: Operation::MigrateRepo {
from: from.into(),
to: to.into(),
},
save_to,
config_file,
},
MigrateRepoTo::Postgres(MigratePostgresInner { to }) => Output {
config_format: ConfigFormat {
server,
client,
old_repo,
tracing,
metrics,
media,
repo: None,
store: None,
},
operation: Operation::MigrateRepo {
from: from.into(),
to: to.into(),
},
save_to,
config_file,
}, },
}, },
} }
@ -1058,6 +1114,7 @@ enum MigrateStoreFrom {
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
enum MigrateRepoFrom { enum MigrateRepoFrom {
Sled(MigrateSledRepo), Sled(MigrateSledRepo),
Postgres(MigratePostgresRepo),
} }
/// Configure the destination storage for pict-rs storage migration /// Configure the destination storage for pict-rs storage migration
@ -1075,8 +1132,10 @@ enum MigrateStoreTo {
/// Configure the destination repo for pict-rs repo migration /// Configure the destination repo for pict-rs repo migration
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
enum MigrateRepoTo { enum MigrateRepoTo {
/// Migrate to the provided sled storage /// Migrate to the provided sled repo
Sled(MigrateSledInner), Sled(MigrateSledInner),
/// Migrate to the provided postgres repo
Postgres(MigratePostgresInner),
} }
/// Migrate pict-rs' storage from the provided filesystem storage /// Migrate pict-rs' storage from the provided filesystem storage
@ -1099,6 +1158,16 @@ struct MigrateSledRepo {
to: MigrateRepoTo, to: MigrateRepoTo,
} }
/// Migrate pict-rs' repo from the provided postgres repo
#[derive(Debug, Parser)]
struct MigratePostgresRepo {
#[command(flatten)]
from: Postgres,
#[command(subcommand)]
to: MigrateRepoTo,
}
/// Migrate pict-rs' storage to the provided filesystem storage /// Migrate pict-rs' storage to the provided filesystem storage
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
struct MigrateFilesystemInner { struct MigrateFilesystemInner {
@ -1116,6 +1185,13 @@ struct MigrateSledInner {
to: Sled, to: Sled,
} }
/// Migrate pict-rs' repo to the provided postgres repo
#[derive(Debug, Parser)]
struct MigratePostgresInner {
#[command(flatten)]
to: Postgres,
}
/// Migrate pict-rs' storage from the provided object storage /// Migrate pict-rs' storage from the provided object storage
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
struct MigrateObjectStorage { struct MigrateObjectStorage {
@ -1163,6 +1239,8 @@ struct RunObjectStorage {
enum Repo { enum Repo {
/// Run pict-rs with the provided sled-backed data repository /// Run pict-rs with the provided sled-backed data repository
Sled(Sled), Sled(Sled),
/// Run pict-rs with the provided postgres-backed data repository
Postgres(Postgres),
} }
/// Configuration for filesystem media storage /// Configuration for filesystem media storage
@ -1254,6 +1332,15 @@ pub(super) struct Sled {
pub(super) export_path: Option<PathBuf>, pub(super) export_path: Option<PathBuf>,
} }
/// Configuration for the postgres-backed data repository
#[derive(Debug, Parser, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(super) struct Postgres {
/// The URL of the postgres database
#[arg(short, long)]
pub(super) url: Url,
}
#[derive(Debug, Parser, serde::Serialize)] #[derive(Debug, Parser, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
struct OldSled { struct OldSled {

View file

@ -363,8 +363,20 @@ impl From<crate::config::commandline::Sled> for crate::config::file::Sled {
} }
} }
impl From<crate::config::commandline::Postgres> for crate::config::file::Postgres {
fn from(value: crate::config::commandline::Postgres) -> Self {
crate::config::file::Postgres { url: value.url }
}
}
impl From<crate::config::commandline::Sled> for crate::config::file::Repo { impl From<crate::config::commandline::Sled> for crate::config::file::Repo {
fn from(value: crate::config::commandline::Sled) -> Self { fn from(value: crate::config::commandline::Sled) -> Self {
crate::config::file::Repo::Sled(value.into()) crate::config::file::Repo::Sled(value.into())
} }
} }
impl From<crate::config::commandline::Postgres> for crate::config::file::Repo {
fn from(value: crate::config::commandline::Postgres) -> Self {
crate::config::file::Repo::Postgres(value.into())
}
}

View file

@ -88,6 +88,7 @@ pub(crate) struct ObjectStorage {
#[serde(tag = "type")] #[serde(tag = "type")]
pub(crate) enum Repo { pub(crate) enum Repo {
Sled(Sled), Sled(Sled),
Postgres(Postgres),
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -421,3 +422,9 @@ pub(crate) struct Sled {
pub(crate) export_path: PathBuf, pub(crate) export_path: PathBuf,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Postgres {
pub(crate) url: Url,
}

View file

@ -1810,7 +1810,7 @@ async fn launch_object_store<F: Fn(&mut web::ServiceConfig) + Send + Clone + 'st
} }
async fn migrate_inner<S1>( async fn migrate_inner<S1>(
repo: Repo, repo: ArcRepo,
client: ClientWithMiddleware, client: ClientWithMiddleware,
from: S1, from: S1,
to: config::primitives::Store, to: config::primitives::Store,
@ -1824,11 +1824,7 @@ where
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?; let to = FileStore::build(path.clone(), repo.clone()).await?;
match repo { migrate_store(repo, from, to, skip_missing_files, timeout).await?
Repo::Sled(repo) => {
migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await?
}
}
} }
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
@ -1862,11 +1858,7 @@ where
.await? .await?
.build(client); .build(client);
match repo { migrate_store(repo, from, to, skip_missing_files, timeout).await?
Repo::Sled(repo) => {
migrate_store(Arc::new(repo), from, to, skip_missing_files, timeout).await?
}
}
} }
} }
@ -1970,7 +1962,7 @@ impl PictRsConfiguration {
from, from,
to, to,
} => { } => {
let repo = Repo::open(config.repo.clone())?; let repo = Repo::open(config.repo.clone()).await?.to_arc();
match from { match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
@ -2034,15 +2026,15 @@ impl PictRsConfiguration {
return Ok(()); return Ok(());
} }
Operation::MigrateRepo { from, to } => { Operation::MigrateRepo { from, to } => {
let from = Repo::open(from)?.to_arc(); let from = Repo::open(from).await?.to_arc();
let to = Repo::open(to)?.to_arc(); let to = Repo::open(to).await?.to_arc();
repo::migrate_repo(from, to).await?; repo::migrate_repo(from, to).await?;
return Ok(()); return Ok(());
} }
} }
let repo = Repo::open(config.repo.clone())?; let repo = Repo::open(config.repo.clone()).await?;
if config.server.read_only { if config.server.read_only {
tracing::warn!("Launching in READ ONLY mode"); tracing::warn!("Launching in READ ONLY mode");
@ -2050,10 +2042,10 @@ impl PictRsConfiguration {
match config.store.clone() { match config.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => { config::Store::Filesystem(config::Filesystem { path }) => {
let store = FileStore::build(path, repo.clone()).await?;
let arc_repo = repo.to_arc(); let arc_repo = repo.to_arc();
let store = FileStore::build(path, arc_repo.clone()).await?;
if arc_repo.get("migrate-0.4").await?.is_none() { if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? { if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone())
@ -2075,6 +2067,7 @@ impl PictRsConfiguration {
) )
.await?; .await?;
} }
Repo::Postgres(_) => todo!(),
} }
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
@ -2089,6 +2082,8 @@ impl PictRsConfiguration {
client_timeout, client_timeout,
public_endpoint, public_endpoint,
}) => { }) => {
let arc_repo = repo.to_arc();
let store = ObjectStore::build( let store = ObjectStore::build(
endpoint, endpoint,
bucket_name, bucket_name,
@ -2104,13 +2099,11 @@ impl PictRsConfiguration {
signature_duration, signature_duration,
client_timeout, client_timeout,
public_endpoint, public_endpoint,
repo.clone(), arc_repo.clone(),
) )
.await? .await?
.build(client.clone()); .build(client.clone());
let arc_repo = repo.to_arc();
if arc_repo.get("migrate-0.4").await?.is_none() { if arc_repo.get("migrate-0.4").await?.is_none() {
if let Some(old_repo) = repo_04::open(&config.old_repo)? { if let Some(old_repo) = repo_04::open(&config.old_repo)? {
repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone()) repo::migrate_04(old_repo, arc_repo.clone(), store.clone(), config.clone())
@ -2128,6 +2121,7 @@ impl PictRsConfiguration {
}) })
.await?; .await?;
} }
Repo::Postgres(_) => todo!(),
} }
} }
} }

View file

@ -12,6 +12,7 @@ use uuid::Uuid;
mod hash; mod hash;
mod migrate; mod migrate;
pub(crate) mod postgres;
pub(crate) mod sled; pub(crate) mod sled;
pub(crate) use hash::Hash; pub(crate) use hash::Hash;
@ -22,6 +23,7 @@ pub(crate) type ArcRepo = Arc<dyn FullRepo>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) enum Repo { pub(crate) enum Repo {
Sled(self::sled::SledRepo), Sled(self::sled::SledRepo),
Postgres(self::postgres::PostgresRepo),
} }
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -791,7 +793,7 @@ where
impl Repo { impl Repo {
#[tracing::instrument] #[tracing::instrument]
pub(crate) fn open(config: config::Repo) -> color_eyre::Result<Self> { pub(crate) async fn open(config: config::Repo) -> color_eyre::Result<Self> {
match config { match config {
config::Repo::Sled(config::Sled { config::Repo::Sled(config::Sled {
path, path,
@ -802,12 +804,18 @@ impl Repo {
Ok(Self::Sled(repo)) Ok(Self::Sled(repo))
} }
config::Repo::Postgres(config::Postgres { url }) => {
let repo = self::postgres::PostgresRepo::connect(url).await?;
Ok(Self::Postgres(repo))
}
} }
} }
pub(crate) fn to_arc(&self) -> ArcRepo { pub(crate) fn to_arc(&self) -> ArcRepo {
match self { match self {
Self::Sled(sled_repo) => Arc::new(sled_repo.clone()), Self::Sled(sled_repo) => Arc::new(sled_repo.clone()),
Self::Postgres(_) => todo!(),
} }
} }
} }

81
src/repo/postgres.rs Normal file
View file

@ -0,0 +1,81 @@
mod embedded {
use refinery::embed_migrations;
embed_migrations!("./src/repo/postgres/migrations");
}
use diesel_async::{
pooled_connection::{
deadpool::{BuildError, Pool},
AsyncDieselConnectionManager,
},
AsyncPgConnection,
};
use url::Url;
use super::{BaseRepo, HashRepo};
#[derive(Clone)]
pub(crate) struct PostgresRepo {
pool: Pool<AsyncPgConnection>,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ConnectPostgresError {
#[error("Failed to connect to postgres for migrations")]
ConnectForMigration(#[source] tokio_postgres::Error),
#[error("Failed to run migrations")]
Migration(#[source] refinery::Error),
#[error("Failed to build postgres connection pool")]
BuildPool(#[source] BuildError),
}
#[derive(Debug, thiserror::Error)]
enum PostgresError {}
impl PostgresRepo {
pub(crate) async fn connect(postgres_url: Url) -> Result<Self, ConnectPostgresError> {
let (mut client, conn) =
tokio_postgres::connect(postgres_url.as_str(), tokio_postgres::tls::NoTls)
.await
.map_err(ConnectPostgresError::ConnectForMigration)?;
let handle = actix_rt::spawn(conn);
embedded::migrations::runner()
.run_async(&mut client)
.await
.map_err(ConnectPostgresError::Migration)?;
handle.abort();
let _ = handle.await;
let config = AsyncDieselConnectionManager::<AsyncPgConnection>::new(postgres_url);
let pool = Pool::builder(config)
.build()
.map_err(ConnectPostgresError::BuildPool)?;
Ok(PostgresRepo { pool })
}
}
impl BaseRepo for PostgresRepo {}
/*
#[async_trait::async_trait]
impl HashRepo for PostgresRepo {
async fn size(&self) -> Result<u64, RepoError> {
let conn = self.pool.get().await.map_err(PostgresError::from)?;
}
}
*/
impl std::fmt::Debug for PostgresRepo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresRepo")
.field("pool", &"pool")
.finish()
}
}

View file

@ -0,0 +1,31 @@
use barrel::backend::Pg;
use barrel::functions::AutogenFunction;
use barrel::{types, Migration};
pub(crate) fn migration() -> String {
let mut m = Migration::new();
m.create_table("hashes", |t| {
t.add_column(
"hash",
types::binary()
.primary(true)
.unique(true)
.nullable(false)
.size(128),
);
t.add_column("identifier", types::text().unique(true).nullable(false));
t.add_column(
"motion_identifier",
types::text().unique(true).nullable(true),
);
t.add_column(
"created_at",
types::datetime()
.nullable(false)
.default(AutogenFunction::CurrentTimestamp),
);
});
m.make::<Pg>().to_string()
}

View file

@ -1,9 +1,4 @@
use crate::{ use crate::{error_code::ErrorCode, file::File, repo::ArcRepo, store::Store};
error_code::ErrorCode,
file::File,
repo::{Repo, SettingsRepo},
store::Store,
};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{ use std::{
@ -58,7 +53,7 @@ impl FileError {
pub(crate) struct FileStore { pub(crate) struct FileStore {
path_gen: Generator, path_gen: Generator,
root_dir: PathBuf, root_dir: PathBuf,
repo: Repo, repo: ArcRepo,
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -189,7 +184,7 @@ impl Store for FileStore {
impl FileStore { impl FileStore {
#[tracing::instrument(skip(repo))] #[tracing::instrument(skip(repo))]
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> color_eyre::Result<Self> { pub(crate) async fn build(root_dir: PathBuf, repo: ArcRepo) -> color_eyre::Result<Self> {
let path_gen = init_generator(&repo).await?; let path_gen = init_generator(&repo).await?;
tokio::fs::create_dir_all(&root_dir).await?; tokio::fs::create_dir_all(&root_dir).await?;
@ -204,13 +199,9 @@ impl FileStore {
async fn next_directory(&self) -> Result<PathBuf, StoreError> { async fn next_directory(&self) -> Result<PathBuf, StoreError> {
let path = self.path_gen.next(); let path = self.path_gen.next();
match self.repo { self.repo
Repo::Sled(ref sled_repo) => { .set(GENERATOR_KEY, path.to_be_bytes().into())
sled_repo .await?;
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
}
}
let mut target_path = self.root_dir.clone(); let mut target_path = self.root_dir.clone();
for dir in path.to_strings() { for dir in path.to_strings() {
@ -308,18 +299,13 @@ pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), Fi
Ok(()) Ok(())
} }
async fn init_generator(repo: &Repo) -> Result<Generator, StoreError> { async fn init_generator(repo: &ArcRepo) -> Result<Generator, StoreError> {
match repo { if let Some(ivec) = repo.get(GENERATOR_KEY).await? {
Repo::Sled(sled_repo) => { Ok(Generator::from_existing(
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { storage_path_generator::Path::from_be_bytes(ivec.to_vec()).map_err(FileError::from)?,
Ok(Generator::from_existing( ))
storage_path_generator::Path::from_be_bytes(ivec.to_vec()) } else {
.map_err(FileError::from)?, Ok(Generator::new())
))
} else {
Ok(Generator::new())
}
}
} }
} }

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
bytes_stream::BytesStream, bytes_stream::BytesStream,
error_code::ErrorCode, error_code::ErrorCode,
repo::{Repo, SettingsRepo}, repo::ArcRepo,
store::Store, store::Store,
stream::{IntoStreamer, StreamMap}, stream::{IntoStreamer, StreamMap},
}; };
@ -107,7 +107,7 @@ impl From<BlockingError> for ObjectError {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ObjectStore { pub(crate) struct ObjectStore {
path_gen: Generator, path_gen: Generator,
repo: Repo, repo: ArcRepo,
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
client: ClientWithMiddleware, client: ClientWithMiddleware,
@ -119,7 +119,7 @@ pub(crate) struct ObjectStore {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ObjectStoreConfig { pub(crate) struct ObjectStoreConfig {
path_gen: Generator, path_gen: Generator,
repo: Repo, repo: ArcRepo,
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
signature_expiration: u64, signature_expiration: u64,
@ -493,7 +493,7 @@ impl ObjectStore {
signature_expiration: u64, signature_expiration: u64,
client_timeout: u64, client_timeout: u64,
public_endpoint: Option<Url>, public_endpoint: Option<Url>,
repo: Repo, repo: ArcRepo,
) -> Result<ObjectStoreConfig, StoreError> { ) -> Result<ObjectStoreConfig, StoreError> {
let path_gen = init_generator(&repo).await?; let path_gen = init_generator(&repo).await?;
@ -714,13 +714,9 @@ impl ObjectStore {
async fn next_directory(&self) -> Result<Path, StoreError> { async fn next_directory(&self) -> Result<Path, StoreError> {
let path = self.path_gen.next(); let path = self.path_gen.next();
match self.repo { self.repo
Repo::Sled(ref sled_repo) => { .set(GENERATOR_KEY, path.to_be_bytes().into())
sled_repo .await?;
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
}
}
Ok(path) Ok(path)
} }
@ -733,18 +729,14 @@ impl ObjectStore {
} }
} }
async fn init_generator(repo: &Repo) -> Result<Generator, StoreError> { async fn init_generator(repo: &ArcRepo) -> Result<Generator, StoreError> {
match repo { if let Some(ivec) = repo.get(GENERATOR_KEY).await? {
Repo::Sled(sled_repo) => { Ok(Generator::from_existing(
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? { storage_path_generator::Path::from_be_bytes(ivec.to_vec())
Ok(Generator::from_existing( .map_err(ObjectError::from)?,
storage_path_generator::Path::from_be_bytes(ivec.to_vec()) ))
.map_err(ObjectError::from)?, } else {
)) Ok(Generator::new())
} else {
Ok(Generator::new())
}
}
} }
} }