Upgrade Concurrency: Update docs, configs, limit maxiumum details operations
Some checks reported errors
continuous-integration/drone/push Build encountered an error

This commit is contained in:
asonix 2023-10-03 17:04:40 -05:00
parent f223d477c7
commit d28c1ac628
10 changed files with 107 additions and 41 deletions

View file

@ -17,11 +17,12 @@ _a simple image hosting service_
1. [Backups](#backups)
2. [0.4 to 0.5 Migration Guide](#0-4-to-0-5-migration-guide)
1. [Overview](#overview)
2. [Configuration Updates](#configuration-updates)
2. [Upgrade Configuration](#upgrade-configuration)
3. [Configuration Updates](#configuration-updates)
1. [Image Changes](#image-changes)
2. [Animation Changes](#animation-changes)
3. [Video Changes](#video-changes)
3. [Upgrading Directly to Postgres](#upgrading-directly-to-postgres)
4. [Upgrading Directly to Postgres](#upgrading-directly-to-postgres)
3. [Filesystem to Object Storage Migration](#filesystem-to-object-storage-migration)
1. [Troubleshooting](#migration-troubleshooting)
4. [Sled to Postgres Migration](#sled-to-postgres-migration)
@ -645,6 +646,30 @@ uploaded before this was standard may not have ever had their details records ge
_This upgrade must be performed while pict-rs is offline._
#### Upgrade Configuration
Because upgrades may take so long, there is a new configuration option introduced to attempt to
improve its speed.
```toml
[upgrade]
concurrency = 32
```
or
```
PICTRS__UPGRADE__CONCURRENCY=32
```
or
```bash
$ pict-rs run --upgrade-concurrency 32
```
This value dictates how many hashes pict-rs will attempt to migrate at the same time. Since this
value will increase the number of concurrent connections to the Repo and the Store, as well as
potentially increase CPU and memory use, it should be considered carefully before increasing.
For large-scale deployments, it is likely this value should be bumped to 128, 256, or even 512. The
default value is 32.
#### Configuration Updates
Previously, pict-rs only had two categories for files: images and videos. pict-rs 0.5 adds a third
category: animation. With the new explicit support for animated file types, some configuration

View file

@ -6,6 +6,9 @@ max_file_count = 1
[client]
timeout = 30
[upgrade]
concurrency = 32
[tracing.logging]
format = "normal"
targets = "info"

View file

@ -27,6 +27,15 @@ max_file_count = 1
# distinct from the object storage client timeout, which can be configured separately
timeout = 30
[upgrade]
## Optional: How many hashes will be migrated from the previous version of pict-rs at the same time
# environment variable: PICTRS__UPGRADE__CONCURRENCY
# default: 32
#
# Increasing this value will increase the number of concurrent connections to the Repo, and to the
# Store, so make sure your deployment can handle more throughput before tweaking this.
concurrency = 32
## Logging configuration
[tracing.logging]

View file

@ -1,15 +1,10 @@
use std::sync::Arc;
use crate::{
bytes_stream::BytesStream,
discover::Discovery,
error::Error,
formats::{InternalFormat, InternalVideoFormat},
serde_str::Serde,
store::Store,
};
use actix_web::web;
use streem::IntoStreamer;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -83,6 +78,7 @@ impl Details {
self.inner.created_at.timestamp
}
#[tracing::instrument(level = "DEBUG")]
pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result<Self, Error> {
let Discovery {
input,
@ -99,28 +95,6 @@ impl Details {
))
}
#[tracing::instrument(level = "DEBUG")]
pub(crate) async fn from_store<S: Store>(
store: &S,
identifier: &Arc<str>,
timeout: u64,
) -> Result<Self, Error> {
let mut buf = BytesStream::new();
let mut stream = store
.to_stream(identifier, None, None)
.await?
.into_streamer();
while let Some(res) = stream.next().await {
buf.add_bytes(res?);
}
let bytes = buf.into_bytes();
Self::from_bytes(timeout, bytes).await
}
pub(crate) fn internal_format(&self) -> InternalFormat {
self.inner.format
}

View file

@ -99,7 +99,9 @@ async fn process<S: Store + 'static>(
let input_details = if let Some(details) = repo.details(&identifier).await? {
details
} else {
let details = Details::from_store(store, &identifier, media.process_timeout).await?;
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?;
repo.relate_details(&identifier, &details).await?;

View file

@ -106,7 +106,8 @@ where
.save_async_read(hasher_reader, input_type.media_type())
.await?;
let details = Details::from_store(store, &identifier, media.process_timeout).await?;
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let details = Details::from_bytes(media.process_timeout, bytes_stream.into_bytes()).await?;
drop(permit);

View file

@ -122,8 +122,9 @@ async fn ensure_details<S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let new_details =
Details::from_store(store, &identifier, config.media.process_timeout).await?;
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes()).await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
@ -897,8 +898,10 @@ async fn process<S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let new_details =
Details::from_store(&store, &identifier, config.media.process_timeout).await?;
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes())
.await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
@ -1015,8 +1018,10 @@ async fn process_head<S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
let new_details =
Details::from_store(&store, &identifier, config.media.process_timeout).await?;
Details::from_bytes(config.media.process_timeout, bytes_stream.into_bytes())
.await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");

View file

@ -407,7 +407,12 @@ where
let details = if let Some(details) = details_opt {
details
} else {
let new_details = Details::from_store(from, identifier, timeout)
let bytes_stream = from
.to_bytes(&identifier, None, None)
.await
.map_err(From::from)
.map_err(MigrateError::Details)?;
let new_details = Details::from_bytes(timeout, bytes_stream.into_bytes())
.await
.map_err(MigrateError::Details)?;
repo.relate_details(identifier, &new_details)

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use streem::IntoStreamer;
use tokio::task::JoinSet;
use tokio::{sync::Semaphore, task::JoinSet};
use crate::{
config::Configuration,
@ -341,6 +341,18 @@ async fn set_details<S: Store>(
}
}
static DETAILS_SEMAPHORE: OnceLock<Semaphore> = OnceLock::new();
fn details_semaphore() -> &'static Semaphore {
DETAILS_SEMAPHORE.get_or_init(|| {
let parallelism = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
crate::sync::bare_semaphore(parallelism * 2)
})
}
#[tracing::instrument(skip_all)]
async fn fetch_or_generate_details<S: Store>(
old_repo: &OldSledRepo,
@ -353,8 +365,13 @@ async fn fetch_or_generate_details<S: Store>(
if let Some(details) = details_opt {
Ok(details)
} else {
Details::from_store(store, identifier, config.media.process_timeout)
.await
.map_err(From::from)
let bytes_stream = store.to_bytes(identifier, None, None).await?;
let bytes = bytes_stream.into_bytes();
let guard = details_semaphore().acquire().await?;
let details = Details::from_bytes(config.media.process_timeout, bytes).await?;
drop(guard);
Ok(details)
}
}

View file

@ -1,9 +1,10 @@
use actix_web::web::Bytes;
use futures_core::Stream;
use std::{fmt::Debug, sync::Arc};
use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{error_code::ErrorCode, stream::LocalBoxStream};
use crate::{bytes_stream::BytesStream, error_code::ErrorCode, stream::LocalBoxStream};
pub(crate) mod file_store;
pub(crate) mod object_store;
@ -22,6 +23,9 @@ pub(crate) enum StoreError {
#[error("Error in 0.4 DB")]
Repo04(#[from] crate::repo_04::RepoError),
#[error("Error reading bytes stream")]
ReadStream(#[source] std::io::Error),
#[error("Requested file is not found")]
FileNotFound(#[source] std::io::Error),
@ -35,6 +39,7 @@ impl StoreError {
Self::FileStore(e) => e.error_code(),
Self::ObjectStore(e) => e.error_code(),
Self::Repo(e) => e.error_code(),
Self::ReadStream(_) => ErrorCode::IO_ERROR,
Self::Repo04(_) => ErrorCode::OLD_REPO_ERROR,
Self::FileNotFound(_) | Self::ObjectNotFound(_) => ErrorCode::NOT_FOUND,
}
@ -112,6 +117,26 @@ pub(crate) trait Store: Clone + Debug {
len: Option<u64>,
) -> Result<LocalBoxStream<'static, std::io::Result<Bytes>>, StoreError>;
async fn to_bytes(
&self,
identifier: &Arc<str>,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<BytesStream, StoreError> {
let mut buf = BytesStream::new();
let mut streamer = self
.to_stream(identifier, from_start, len)
.await?
.into_streamer();
while let Some(bytes) = streamer.try_next().await.map_err(StoreError::ReadStream)? {
buf.add_bytes(bytes);
}
Ok(buf)
}
async fn read_into<Writer>(
&self,
identifier: &Arc<str>,