Add database exports

This should help admins running pict-rs on traditional filesystems to
keep valid backups of pict-rs' sled repo
This commit is contained in:
asonix 2023-07-08 17:35:57 -05:00
parent ee7bd192c4
commit 7768d4e58e
11 changed files with 134 additions and 19 deletions

View file

@ -487,6 +487,25 @@ A secure API key can be generated by any password generator.
"msg": "No hash associated with provided alias"
}
```
- `POST /internal/export` Export the current sled database to the configured `export_path`. This is
useful for taking backups of a running pict-rs server. On success, it will return
```json
{
"msg": "ok"
}
```
Restoring from an exported database is as simple as:
1. Stopping pict-rs
2. Moving your current `sled-repo` directory to a safe location (e.g. `sled-repo.bak`)
```bash
$ mv sled-repo sled-repo.bak
```
3. Copying an exported database to `sled-repo`
```bash
$ cp -r exports/2023-07-08T22:26:21.194126713Z sled-repo
```
4. Starting pict-rs
Additionally, all endpoints support setting deadlines, after which the request will cease
processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an

View file

@ -45,6 +45,7 @@ max_frame_count = 100
type = "sled"
path = "/mnt/sled-repo"
cache_capacity = 67108864
export_path = "/mnt/exports"
[store]
type = "filesystem"

View file

@ -31,6 +31,7 @@ skip_validate_imports = false
type = 'sled'
path = 'data/sled-repo-local'
cache_capacity = 67108864
export_path = "data/exports-local"
[store]
type = 'filesystem'

View file

@ -270,6 +270,14 @@ path = '/mnt/sled-repo'
# default: 67,108,864 (1024 * 1024 * 64, or 64MB)
cache_capacity = 67108864
## Optional: path for storing database exports
# environment variable: PICTRS__REPO__EXPORT_PATH
# default: /mnt/exports
#
# Used in combination with the /internal/export endpoint to dump the current sled database into a
# new file. This can be helpful for backing up a running pict-rs server.
export_path = "/mnt/exports"
## Media storage configuration
[store]

View file

@ -701,4 +701,7 @@ struct Sled {
#[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
cache_capacity: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
export_path: Option<PathBuf>,
}

View file

@ -96,6 +96,7 @@ enum RepoDefaults {
struct SledDefaults {
path: PathBuf,
cache_capacity: u64,
export_path: PathBuf,
}
#[derive(Clone, Debug, serde::Serialize)]
@ -203,6 +204,7 @@ impl Default for SledDefaults {
SledDefaults {
path: PathBuf::from(String::from("/mnt/sled-repo")),
cache_capacity: 1024 * 1024 * 64,
export_path: PathBuf::from(String::from("/mnt/exports")),
}
}
}

View file

@ -158,4 +158,6 @@ pub(crate) struct Sled {
pub(crate) path: PathBuf,
pub(crate) cache_capacity: u64,
pub(crate) export_path: PathBuf,
}

View file

@ -6,10 +6,11 @@ use crate::{
store::Store,
};
use actix_web::web;
use time::format_description::well_known::Rfc3339;
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
enum MaybeHumanDate {
pub(crate) enum MaybeHumanDate {
HumanDate(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime),
OldDate(#[serde(serialize_with = "time::serde::rfc3339::serialize")] time::OffsetDateTime),
}
@ -123,3 +124,15 @@ impl From<MaybeHumanDate> for std::time::SystemTime {
}
}
}
impl std::fmt::Display for MaybeHumanDate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OldDate(date) | Self::HumanDate(date) => {
let s = date.format(&Rfc3339).map_err(|_| std::fmt::Error)?;
f.write_str(&s)
}
}
}
}

View file

@ -36,6 +36,7 @@ use futures_util::{
Stream, StreamExt, TryStreamExt,
};
use once_cell::sync::{Lazy, OnceCell};
use repo::sled::SledRepo;
use rusty_s3::UrlStyle;
use std::{
future::ready,
@ -1113,11 +1114,16 @@ fn next_worker_id() -> String {
format!("{}-{}", CONFIG.server.worker_id, next_id)
}
fn configure_endpoints<R: FullRepo + 'static, S: Store + 'static>(
fn configure_endpoints<
R: FullRepo + 'static,
S: Store + 'static,
F: Fn(&mut web::ServiceConfig),
>(
config: &mut web::ServiceConfig,
repo: R,
store: S,
client: Client,
extra_config: F,
) {
config
.app_data(web::Data::new(repo))
@ -1184,7 +1190,8 @@ fn configure_endpoints<R: FullRepo + 'static, S: Store + 'static>(
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>)))
.service(web::resource("/identifier").route(web::get().to(identifier::<R, S>)))
.service(web::resource("/set_not_found").route(web::post().to(set_not_found::<R>))),
.service(web::resource("/set_not_found").route(web::post().to(set_not_found::<R>)))
.configure(extra_config),
);
}
@ -1204,44 +1211,51 @@ where
.in_scope(|| actix_rt::spawn(queue::process_images(repo, store, next_worker_id())));
}
async fn launch_file_store<R: FullRepo + 'static>(
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
repo: R,
store: FileStore,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let store = store.clone();
let repo = repo.clone();
let extra_config = extra_config.clone();
spawn_workers(repo.clone(), store.clone());
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.configure(move |sc| configure_endpoints(sc, repo, store, client))
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
})
.bind(CONFIG.server.address)?
.run()
.await
}
async fn launch_object_store<R: FullRepo + 'static>(
async fn launch_object_store<
R: FullRepo + 'static,
F: Fn(&mut web::ServiceConfig) + Send + Clone,
>(
repo: R,
store_config: ObjectStoreConfig,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let store = store_config.clone().build(client.clone());
let repo = repo.clone();
let extra_config = extra_config.clone();
spawn_workers(repo.clone(), store.clone());
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.configure(move |sc| configure_endpoints(sc, repo, store, client))
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config))
})
.bind(CONFIG.server.address)?
.run()
@ -1355,6 +1369,18 @@ pub fn install_tracing() -> color_eyre::Result<()> {
init_tracing(&CONFIG.tracing)
}
async fn export_handler(repo: web::Data<SledRepo>) -> Result<HttpResponse, Error> {
repo.export().await?;
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok"
})))
}
fn sled_extra_config(sc: &mut web::ServiceConfig) {
sc.service(web::resource("/export").route(web::post().to(export_handler)));
}
/// Run the pict-rs application
///
/// This must be called after `init_config`, or else the default configuration builder will run and
@ -1422,7 +1448,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store).await?;
launch_file_store(sled_repo, store, sled_extra_config).await?;
}
}
}
@ -1457,7 +1483,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_object_store(sled_repo, store).await?;
launch_object_store(sled_repo, store, sled_extra_config).await?;
}
}
}

View file

@ -465,17 +465,13 @@ impl Repo {
pub(crate) fn open(config: config::Repo) -> color_eyre::Result<Self> {
match config {
config::Repo::Sled(config::Sled {
mut path,
path,
cache_capacity,
export_path,
}) => {
path.push("v0.4.0-alpha.1");
let repo = self::sled::SledRepo::build(path, cache_capacity, export_path)?;
let db = ::sled::Config::new()
.cache_capacity(cache_capacity)
.path(path)
.open()?;
Ok(Self::Sled(self::sled::SledRepo::new(db)?))
Ok(Self::Sled(repo))
}
}
}

View file

@ -1,4 +1,5 @@
use crate::{
details::MaybeHumanDate,
repo::{
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo,
@ -12,6 +13,7 @@ use futures_util::Stream;
use sled::{CompareAndSwapError, Db, IVec, Tree};
use std::{
collections::HashMap,
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
@ -67,12 +69,20 @@ pub(crate) struct SledRepo {
in_progress_queue: Tree,
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
uploads: Tree,
cache_capacity: u64,
export_path: PathBuf,
db: Db,
}
impl SledRepo {
#[tracing::instrument(skip(db))]
pub(crate) fn new(db: Db) -> Result<Self, SledError> {
#[tracing::instrument]
pub(crate) fn build(
path: PathBuf,
cache_capacity: u64,
export_path: PathBuf,
) -> color_eyre::Result<Self> {
let db = Self::open(path, cache_capacity)?;
Ok(SledRepo {
healthz_count: Arc::new(AtomicU64::new(0)),
healthz: db.open_tree("pict-rs-healthz-tree")?,
@ -90,9 +100,42 @@ impl SledRepo {
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
uploads: db.open_tree("pict-rs-uploads-tree")?,
cache_capacity,
export_path,
db,
})
}
fn open(mut path: PathBuf, cache_capacity: u64) -> Result<Db, SledError> {
path.push("v0.4.0-alpha.1");
let db = ::sled::Config::new()
.cache_capacity(cache_capacity)
.path(path)
.open()?;
Ok(db)
}
#[tracing::instrument(level = "warn")]
pub(crate) async fn export(&self) -> Result<(), RepoError> {
let path = self
.export_path
.join(MaybeHumanDate::HumanDate(time::OffsetDateTime::now_utc()).to_string());
let export_db = Self::open(path, self.cache_capacity)?;
let this = self.db.clone();
actix_rt::task::spawn_blocking(move || {
let export = this.export();
export_db.import(export);
})
.await
.map_err(SledError::from)?;
Ok(())
}
}
impl BaseRepo for SledRepo {
@ -289,6 +332,7 @@ impl QueueRepo for SledRepo {
let worker_id = worker_id.clone();
let job = b!(self.queue, {
in_progress_queue.remove(&worker_id)?;
in_progress_queue.flush()?;
while let Some((key, job)) = queue
.scan_prefix(queue_name.as_bytes())