diff --git a/README.md b/README.md index d016797..5b270e1 100644 --- a/README.md +++ b/README.md @@ -487,6 +487,25 @@ A secure API key can be generated by any password generator. "msg": "No hash associated with provided alias" } ``` +- `POST /internal/export` Export the current sled database to the configured `export_path`. This is + useful for taking backups of a running pict-rs server. On success, it will return + ```json + { + "msg": "ok" + } + ``` + + Restoring from an exported database is as simple as: + 1. Stopping pict-rs + 2. Moving your current `sled-repo` directory to a safe location (e.g. `sled-repo.bak`) + ```bash + $ mv sled-repo sled-repo.bak + ``` + 3. Copying an exported database to `sled-repo` + ```bash + $ cp -r exports/2023-07-08T22:26:21.194126713Z sled-repo + ``` + 4. Starting pict-rs Additionally, all endpoints support setting deadlines, after which the request will cease processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an diff --git a/defaults.toml b/defaults.toml index fccd46f..a837c0d 100644 --- a/defaults.toml +++ b/defaults.toml @@ -45,6 +45,7 @@ max_frame_count = 100 type = "sled" path = "/mnt/sled-repo" cache_capacity = 67108864 +export_path = "/mnt/exports" [store] type = "filesystem" diff --git a/dev.toml b/dev.toml index 85af377..a041c12 100644 --- a/dev.toml +++ b/dev.toml @@ -31,6 +31,7 @@ skip_validate_imports = false type = 'sled' path = 'data/sled-repo-local' cache_capacity = 67108864 +export_path = "data/exports-local" [store] type = 'filesystem' diff --git a/pict-rs.toml b/pict-rs.toml index ce5e73a..25854a1 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -270,6 +270,14 @@ path = '/mnt/sled-repo' # default: 67,108,864 (1024 * 1024 * 64, or 64MB) cache_capacity = 67108864 +## Optional: path for storing database exports +# environment variable: PICTRS__REPO__EXPORT_PATH +# default: /mnt/exports +# +# Used in combination with the /internal/export endpoint to dump the current sled database into a +# new file. This can be helpful for backing up a running pict-rs server. +export_path = "/mnt/exports" + ## Media storage configuration [store] diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 4d880ec..8249ddf 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -701,4 +701,7 @@ struct Sled { #[arg(short, long)] #[serde(skip_serializing_if = "Option::is_none")] cache_capacity: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + export_path: Option, } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 1b67355..46481f0 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -96,6 +96,7 @@ enum RepoDefaults { struct SledDefaults { path: PathBuf, cache_capacity: u64, + export_path: PathBuf, } #[derive(Clone, Debug, serde::Serialize)] @@ -203,6 +204,7 @@ impl Default for SledDefaults { SledDefaults { path: PathBuf::from(String::from("/mnt/sled-repo")), cache_capacity: 1024 * 1024 * 64, + export_path: PathBuf::from(String::from("/mnt/exports")), } } } diff --git a/src/config/file.rs b/src/config/file.rs index c2deb6f..91a3a6a 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -158,4 +158,6 @@ pub(crate) struct Sled { pub(crate) path: PathBuf, pub(crate) cache_capacity: u64, + + pub(crate) export_path: PathBuf, } diff --git a/src/details.rs b/src/details.rs index 5032aef..fea1251 100644 --- a/src/details.rs +++ b/src/details.rs @@ -6,10 +6,11 @@ use crate::{ store::Store, }; use actix_web::web; +use time::format_description::well_known::Rfc3339; #[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(untagged)] -enum MaybeHumanDate { +pub(crate) enum MaybeHumanDate { HumanDate(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime), OldDate(#[serde(serialize_with = "time::serde::rfc3339::serialize")] time::OffsetDateTime), } @@ -123,3 +124,15 @@ impl From for std::time::SystemTime { } } } + +impl std::fmt::Display for MaybeHumanDate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::OldDate(date) | Self::HumanDate(date) => { + let s = date.format(&Rfc3339).map_err(|_| std::fmt::Error)?; + + f.write_str(&s) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 569bfc6..ee031b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,7 @@ use futures_util::{ Stream, StreamExt, TryStreamExt, }; use once_cell::sync::{Lazy, OnceCell}; +use repo::sled::SledRepo; use rusty_s3::UrlStyle; use std::{ future::ready, @@ -1113,11 +1114,16 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -fn configure_endpoints( +fn configure_endpoints< + R: FullRepo + 'static, + S: Store + 'static, + F: Fn(&mut web::ServiceConfig), +>( config: &mut web::ServiceConfig, repo: R, store: S, client: Client, + extra_config: F, ) { config .app_data(web::Data::new(repo)) @@ -1184,7 +1190,8 @@ fn configure_endpoints( .service(web::resource("/purge").route(web::post().to(purge::))) .service(web::resource("/aliases").route(web::get().to(aliases::))) .service(web::resource("/identifier").route(web::get().to(identifier::))) - .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))), + .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))) + .configure(extra_config), ); } @@ -1204,44 +1211,51 @@ where .in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id()))); } -async fn launch_file_store( +async fn launch_file_store( repo: R, store: FileStore, + extra_config: F, ) -> std::io::Result<()> { HttpServer::new(move || { let client = build_client(); let store = store.clone(); let repo = repo.clone(); + let extra_config = extra_config.clone(); spawn_workers(repo.clone(), store.clone()); App::new() .wrap(TracingLogger::default()) .wrap(Deadline) - .configure(move |sc| configure_endpoints(sc, repo, store, client)) + .configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) }) .bind(CONFIG.server.address)? .run() .await } -async fn launch_object_store( +async fn launch_object_store< + R: FullRepo + 'static, + F: Fn(&mut web::ServiceConfig) + Send + Clone, +>( repo: R, store_config: ObjectStoreConfig, + extra_config: F, ) -> std::io::Result<()> { HttpServer::new(move || { let client = build_client(); let store = store_config.clone().build(client.clone()); let repo = repo.clone(); + let extra_config = extra_config.clone(); spawn_workers(repo.clone(), store.clone()); App::new() .wrap(TracingLogger::default()) .wrap(Deadline) - .configure(move |sc| configure_endpoints(sc, repo, store, client)) + .configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) }) .bind(CONFIG.server.address)? .run() @@ -1355,6 +1369,18 @@ pub fn install_tracing() -> color_eyre::Result<()> { init_tracing(&CONFIG.tracing) } +async fn export_handler(repo: web::Data) -> Result { + repo.export().await?; + + Ok(HttpResponse::Created().json(&serde_json::json!({ + "msg": "ok" + }))) +} + +fn sled_extra_config(sc: &mut web::ServiceConfig) { + sc.service(web::resource("/export").route(web::post().to(export_handler))); +} + /// Run the pict-rs application /// /// This must be called after `init_config`, or else the default configuration builder will run and @@ -1422,7 +1448,7 @@ pub async fn run() -> color_eyre::Result<()> { .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; - launch_file_store(sled_repo, store).await?; + launch_file_store(sled_repo, store, sled_extra_config).await?; } } } @@ -1457,7 +1483,7 @@ pub async fn run() -> color_eyre::Result<()> { .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; - launch_object_store(sled_repo, store).await?; + launch_object_store(sled_repo, store, sled_extra_config).await?; } } } diff --git a/src/repo.rs b/src/repo.rs index ae9f2eb..4d45f5d 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -465,17 +465,13 @@ impl Repo { pub(crate) fn open(config: config::Repo) -> color_eyre::Result { match config { config::Repo::Sled(config::Sled { - mut path, + path, cache_capacity, + export_path, }) => { - path.push("v0.4.0-alpha.1"); + let repo = self::sled::SledRepo::build(path, cache_capacity, export_path)?; - let db = ::sled::Config::new() - .cache_capacity(cache_capacity) - .path(path) - .open()?; - - Ok(Self::Sled(self::sled::SledRepo::new(db)?)) + Ok(Self::Sled(repo)) } } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index d9cf320..4e9f7a3 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,4 +1,5 @@ use crate::{ + details::MaybeHumanDate, repo::{ Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, @@ -12,6 +13,7 @@ use futures_util::Stream; use sled::{CompareAndSwapError, Db, IVec, Tree}; use std::{ collections::HashMap, + path::PathBuf, pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -67,12 +69,20 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, + cache_capacity: u64, + export_path: PathBuf, db: Db, } impl SledRepo { - #[tracing::instrument(skip(db))] - pub(crate) fn new(db: Db) -> Result { + #[tracing::instrument] + pub(crate) fn build( + path: PathBuf, + cache_capacity: u64, + export_path: PathBuf, + ) -> color_eyre::Result { + let db = Self::open(path, cache_capacity)?; + Ok(SledRepo { healthz_count: Arc::new(AtomicU64::new(0)), healthz: db.open_tree("pict-rs-healthz-tree")?, @@ -90,9 +100,42 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, + cache_capacity, + export_path, db, }) } + + fn open(mut path: PathBuf, cache_capacity: u64) -> Result { + path.push("v0.4.0-alpha.1"); + + let db = ::sled::Config::new() + .cache_capacity(cache_capacity) + .path(path) + .open()?; + + Ok(db) + } + + #[tracing::instrument(level = "warn")] + pub(crate) async fn export(&self) -> Result<(), RepoError> { + let path = self + .export_path + .join(MaybeHumanDate::HumanDate(time::OffsetDateTime::now_utc()).to_string()); + + let export_db = Self::open(path, self.cache_capacity)?; + + let this = self.db.clone(); + + actix_rt::task::spawn_blocking(move || { + let export = this.export(); + export_db.import(export); + }) + .await + .map_err(SledError::from)?; + + Ok(()) + } } impl BaseRepo for SledRepo { @@ -289,6 +332,7 @@ impl QueueRepo for SledRepo { let worker_id = worker_id.clone(); let job = b!(self.queue, { in_progress_queue.remove(&worker_id)?; + in_progress_queue.flush()?; while let Some((key, job)) = queue .scan_prefix(queue_name.as_bytes())