Fix restructure, upload

This commit is contained in:
Aode (Lion) 2021-10-18 23:37:11 -05:00
parent 195f614a4b
commit 34038a0f16
5 changed files with 307 additions and 231 deletions

View file

@ -300,8 +300,7 @@ async fn upload(
let delete_token = image.result.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
let path = manager.path_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
@ -365,8 +364,7 @@ async fn download(
let delete_token = session.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
let path = manager.path_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
@ -446,8 +444,7 @@ async fn prepare_process(
.parse::<Format>()
.map_err(|_| UploadError::UnsupportedFormat)?;
let processed_name = format!("{}.{}", name, ext);
let base = manager.image_dir();
let thumbnail_path = self::processor::build_path(base, &chain, processed_name);
let thumbnail_path = self::processor::build_path(&chain, processed_name);
let thumbnail_args = self::processor::build_args(&chain);
Ok((format, name, thumbnail_path, thumbnail_args))
@ -463,7 +460,12 @@ async fn process_details(
let (_, name, thumbnail_path, _) =
prepare_process(query, ext.as_str(), &manager, &whitelist).await?;
let details = manager.variant_details(thumbnail_path, name).await?;
let real_path = manager
.variant_path(&thumbnail_path, &name)
.await?
.ok_or(UploadError::MissingAlias)?;
let details = manager.variant_details(real_path, name).await?;
let details = details.ok_or(UploadError::NoFiles)?;
@ -482,162 +484,175 @@ async fn process(
let (format, name, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &manager, &whitelist).await?;
let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?;
// If the thumbnail doesn't exist, we need to create it
let thumbnail_exists = if let Err(e) = tokio::fs::metadata(&thumbnail_path)
.instrument(tracing::info_span!("Get thumbnail metadata"))
.await
{
if e.kind() != std::io::ErrorKind::NotFound {
error!("Error looking up processed image, {}", e);
return Err(e.into());
let thumbnail_exists = if let Some(real_path) = &real_path_opt {
if let Err(e) = tokio::fs::metadata(real_path)
.instrument(tracing::info_span!("Get thumbnail metadata"))
.await
{
if e.kind() != std::io::ErrorKind::NotFound {
error!("Error looking up processed image, {}", e);
return Err(e.into());
}
false
} else {
true
}
false
} else {
true
false
};
let details = manager
.variant_details(thumbnail_path.clone(), name.clone())
.await?;
if thumbnail_exists {
if let Some(real_path) = real_path_opt {
let details_opt = manager
.variant_details(real_path.clone(), name.clone())
.await?;
if !thumbnail_exists || details.is_none() {
let mut original_path = manager.image_dir();
original_path.push(name.clone());
let thumbnail_path2 = thumbnail_path.clone();
let process_fut = async {
let thumbnail_path = thumbnail_path2;
// Create and save a JPG for motion images (gif, mp4)
if let Some((updated_path, exists)) =
self::processor::prepare_image(original_path.clone()).await?
{
original_path = updated_path.clone();
if exists.is_new() {
// Save the transcoded file in another task
debug!("Spawning storage task");
let manager2 = manager.clone();
let name = name.clone();
let span = tracing::info_span!(
parent: None,
"Storing variant info",
path = &tracing::field::debug(&updated_path),
name = &tracing::field::display(&name),
);
span.follows_from(Span::current());
actix_rt::spawn(
async move {
if let Err(e) = manager2.store_variant(updated_path, name).await {
error!("Error storing variant, {}", e);
return;
}
}
.instrument(span),
);
}
}
let permit = PROCESS_SEMAPHORE.acquire().await?;
let file = crate::file::File::open(original_path.clone()).await?;
let mut processed_reader =
crate::magick::process_image_file_read(file, thumbnail_args, format)?;
let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?;
let bytes = web::Bytes::from(vec);
drop(permit);
let details = if let Some(details) = details {
let details = if let Some(details) = details_opt {
details
} else {
Details::from_bytes(bytes.clone()).await?
let details = Details::from_path(real_path.clone()).await?;
manager
.store_variant_details(real_path.clone(), name, &details)
.await?;
details
};
let save_span = tracing::info_span!(
parent: None,
"Saving variant information",
path = tracing::field::debug(&thumbnail_path),
name = tracing::field::display(&name),
);
save_span.follows_from(Span::current());
let details2 = details.clone();
let bytes2 = bytes.clone();
actix_rt::spawn(
async move {
if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e);
return;
}
if let Err(e) = manager
.store_variant_details(thumbnail_path.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await {
tracing::warn!("Error saving variant info: {}", e);
}
}
.instrument(save_span),
);
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
};
let (details, bytes) =
CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?;
return match range {
Some(range_header) => {
if !range_header.is_bytes() {
return Err(UploadError::Range.into());
}
if range_header.is_empty() {
Err(UploadError::Range.into())
} else if range_header.len() == 1 {
let range = range_header.ranges().next().unwrap();
let content_range = range.to_content_range(bytes.len() as u64);
let stream = range.chop_bytes(bytes);
let mut builder = HttpResponse::PartialContent();
builder.insert_header(content_range);
Ok(srv_response(
builder,
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
))
} else {
Err(UploadError::Range.into())
}
}
None => Ok(srv_response(
HttpResponse::Ok(),
once(ready(Ok(bytes) as Result<_, Error>)),
details.content_type(),
7 * DAYS,
details.system_time(),
)),
};
return ranged_file_resp(real_path, range, details).await;
}
}
let details = if let Some(details) = details {
details
} else {
let details = Details::from_path(thumbnail_path.clone()).await?;
manager
.store_variant_details(thumbnail_path.clone(), name, &details)
.await?;
details
let mut original_path = manager.path_from_filename(name.clone()).await?;
let thumbnail_path2 = thumbnail_path.clone();
let process_fut = async {
let thumbnail_path = thumbnail_path2;
// Create and save a JPG for motion images (gif, mp4)
if let Some((updated_path, exists)) =
self::processor::prepare_image(original_path.clone()).await?
{
original_path = updated_path.clone();
if exists.is_new() {
// Save the transcoded file in another task
debug!("Spawning storage task");
let manager2 = manager.clone();
let name = name.clone();
let span = tracing::info_span!(
parent: None,
"Storing variant info",
path = &tracing::field::debug(&updated_path),
name = &tracing::field::display(&name),
);
span.follows_from(Span::current());
actix_rt::spawn(
async move {
if let Err(e) = manager2.store_variant(None, &updated_path, &name).await {
error!("Error storing variant, {}", e);
return;
}
}
.instrument(span),
);
}
}
let permit = PROCESS_SEMAPHORE.acquire().await?;
let file = crate::file::File::open(original_path.clone()).await?;
let mut processed_reader =
crate::magick::process_image_file_read(file, thumbnail_args, format)?;
let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?;
let bytes = web::Bytes::from(vec);
drop(permit);
let details = Details::from_bytes(bytes.clone()).await?;
let save_span = tracing::info_span!(
parent: None,
"Saving variant information",
path = tracing::field::debug(&thumbnail_path),
name = tracing::field::display(&name),
);
save_span.follows_from(Span::current());
let details2 = details.clone();
let bytes2 = bytes.clone();
actix_rt::spawn(
async move {
let real_path = match manager.next_directory() {
Ok(real_path) => real_path.join(&name),
Err(e) => {
tracing::warn!("Failed to generate directory path: {}", e);
return;
}
};
if let Err(e) = safe_save_file(real_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e);
return;
}
if let Err(e) = manager
.store_variant_details(real_path.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager
.store_variant(Some(&thumbnail_path), &real_path, &name)
.await
{
tracing::warn!("Error saving variant info: {}", e);
}
}
.instrument(save_span),
);
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
};
ranged_file_resp(thumbnail_path, range, details).await
let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?;
match range {
Some(range_header) => {
if !range_header.is_bytes() {
return Err(UploadError::Range.into());
}
if range_header.is_empty() {
Err(UploadError::Range.into())
} else if range_header.len() == 1 {
let range = range_header.ranges().next().unwrap();
let content_range = range.to_content_range(bytes.len() as u64);
let stream = range.chop_bytes(bytes);
let mut builder = HttpResponse::PartialContent();
builder.insert_header(content_range);
Ok(srv_response(
builder,
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
))
} else {
Err(UploadError::Range.into())
}
}
None => Ok(srv_response(
HttpResponse::Ok(),
once(ready(Ok(bytes) as Result<_, Error>)),
details.content_type(),
7 * DAYS,
details.system_time(),
)),
}
}
/// Fetch file details
@ -647,8 +662,7 @@ async fn details(
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
let name = manager.from_alias(alias.into_inner()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
let path = manager.path_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
@ -673,8 +687,7 @@ async fn serve(
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
let name = manager.from_alias(alias.into_inner()).await?;
let mut path = manager.image_dir();
path.push(name.clone());
let path = manager.path_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;

View file

@ -266,11 +266,11 @@ pub(crate) fn build_chain(args: &[(String, String)]) -> ProcessChain {
ProcessChain { inner }
}
pub(crate) fn build_path(base: PathBuf, chain: &ProcessChain, filename: String) -> PathBuf {
pub(crate) fn build_path(chain: &ProcessChain, filename: String) -> PathBuf {
let mut path = chain
.inner
.iter()
.fold(base, |acc, processor| processor.path(acc));
.fold(PathBuf::default(), |acc, processor| processor.path(acc));
path.push(filename);
path

View file

@ -48,7 +48,7 @@ pub struct UploadManager {
struct UploadManagerInner {
format: Option<Format>,
hasher: sha2::Sha256,
image_dir: PathBuf,
root_dir: PathBuf,
alias_tree: sled::Tree,
filename_tree: sled::Tree,
main_tree: sled::Tree,
@ -77,19 +77,12 @@ struct FilenameIVec {
}
impl UploadManager {
/// Get the image directory
pub(crate) fn image_dir(&self) -> PathBuf {
self.inner.image_dir.clone()
}
/// Create a new UploadManager
pub(crate) async fn new(mut root_dir: PathBuf, format: Option<Format>) -> Result<Self, Error> {
pub(crate) async fn new(root_dir: PathBuf, format: Option<Format>) -> Result<Self, Error> {
let root_clone = root_dir.clone();
// sled automatically creates it's own directories
let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??;
root_dir.push("files");
// Ensure file dir exists
tokio::fs::create_dir_all(&root_dir).await?;
@ -101,7 +94,7 @@ impl UploadManager {
inner: Arc::new(UploadManagerInner {
format,
hasher: sha2::Sha256::new(),
image_dir: root_dir,
root_dir,
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
details_tree: db.open_tree("details")?,
@ -118,17 +111,61 @@ impl UploadManager {
Ok(manager)
}
#[instrument(skip(self))]
pub(crate) async fn path_from_filename(&self, filename: String) -> Result<PathBuf, Error> {
let path_tree = self.inner.path_tree.clone();
let path_ivec = web::block(move || path_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFile)?;
let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?);
Ok(self.inner.root_dir.join(relative))
}
#[instrument(skip(self))]
async fn store_path(&self, filename: String, path: &std::path::Path) -> Result<(), Error> {
let path_bytes = path.to_str().ok_or(UploadError::Path)?.as_bytes().to_vec();
let path_tree = self.inner.path_tree.clone();
web::block(move || path_tree.insert(filename.as_bytes(), path_bytes)).await??;
Ok(())
}
#[instrument(skip(self))]
pub(crate) async fn variant_path(
&self,
process_path: &std::path::Path,
filename: &str,
) -> Result<Option<PathBuf>, Error> {
let key = self.variant_key(process_path, filename)?;
let path_tree = self.inner.path_tree.clone();
let path_opt = web::block(move || path_tree.get(key)).await??;
if let Some(path_ivec) = path_opt {
let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?);
Ok(Some(self.inner.root_dir.join(relative)))
} else {
Ok(None)
}
}
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> {
pub(crate) async fn store_variant(
&self,
variant_process_path: Option<&std::path::Path>,
real_path: &std::path::Path,
filename: &str,
) -> Result<(), Error> {
let path_bytes = self
.generalize_path(&path)?
.generalize_path(real_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let key = self.variant_key(&path, &filename)?;
let variant_path = variant_process_path.unwrap_or(real_path);
let key = self.variant_key(variant_path, filename)?;
let path_tree = self.inner.path_tree.clone();
debug!("Storing variant");
@ -199,14 +236,14 @@ impl UploadManager {
self.aliases_by_hash(&hash).await
}
fn next_directory(&self) -> Result<PathBuf, Error> {
pub(crate) fn next_directory(&self) -> Result<PathBuf, Error> {
let path = self.inner.path_gen.next();
self.inner
.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
let mut target_path = self.image_dir();
let mut target_path = self.inner.root_dir.join("files");
for dir in path.to_strings() {
target_path.push(dir)
}
@ -381,14 +418,18 @@ impl UploadManager {
#[instrument(skip(self))]
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> {
let filename = filename.inner;
let mut path = self.image_dir();
let fname = String::from_utf8(filename.to_vec())?;
path.push(fname);
let filename2 = filename.clone();
let path_tree = self.inner.path_tree.clone();
let path = web::block(move || path_tree.remove(filename2)).await??;
let mut errors = Vec::new();
debug!("Deleting {:?}", path);
if let Err(e) = tokio::fs::remove_file(path).await {
errors.push(e.into());
if let Some(path) = path {
let path = self.inner.root_dir.join(String::from_utf8(path.to_vec())?);
debug!("Deleting {:?}", path);
if let Err(e) = self.remove_path(&path).await {
errors.push(e.into());
}
}
let filename2 = filename.clone();
@ -410,9 +451,12 @@ impl UploadManager {
debug!("{} files prepared for deletion", paths.len());
for path in paths {
let s = String::from_utf8_lossy(&path);
debug!("Deleting {}", s);
if let Err(e) = remove_path(path).await {
let path = self
.inner
.root_dir
.join(String::from_utf8_lossy(&path).as_ref());
debug!("Deleting {:?}", path);
if let Err(e) = self.remove_path(&path).await {
errors.push(e);
}
}
@ -434,6 +478,55 @@ impl UploadManager {
}
Ok(())
}
async fn try_remove_parents(&self, mut path: &std::path::Path) -> Result<(), Error> {
let root = self.inner.root_dir.join("files");
while let Some(parent) = path.parent() {
if parent.ends_with(&root) {
break;
}
if tokio::fs::remove_dir(parent).await.is_err() {
break;
}
path = parent;
}
Ok(())
}
async fn remove_path(&self, path: &std::path::Path) -> Result<(), Error> {
tokio::fs::remove_file(path).await?;
self.try_remove_parents(path).await
}
fn variant_key(
&self,
variant_process_path: &std::path::Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path_string = variant_process_path
.to_str()
.ok_or(UploadError::Path)?
.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
fn details_key(
&self,
variant_path: &std::path::Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
}
impl<T> Serde<T> {
@ -502,12 +595,6 @@ fn init_generator(settings: &sled::Tree) -> Result<Generator, Error> {
}
}
async fn remove_path(path: sled::IVec) -> Result<(), Error> {
let path_string = String::from_utf8(path.to_vec())?;
tokio::fs::remove_file(path_string).await?;
Ok(())
}
fn trans_err<E>(e: E) -> sled::transaction::ConflictableTransactionError<Error>
where
Error: From<E>,

View file

@ -20,7 +20,7 @@ impl UploadManager {
let filename = String::from_utf8(filename.to_vec())?;
tracing::info!("Migrating {}", filename);
let mut file_path = self.image_dir();
let mut file_path = self.inner.root_dir.join("files");
file_path.push(filename.clone());
if tokio::fs::metadata(&file_path).await.is_ok() {
@ -46,18 +46,7 @@ impl UploadManager {
for res in self.inner.main_tree.range(start..end) {
let (hash_variant_key, variant_path_or_details) = res?;
if hash_variant_key.ends_with(DETAILS) {
let details = variant_path_or_details;
let start_index = hash.len() + 1;
let end_index = hash_variant_key.len() - DETAILS.len();
let path_bytes = &hash_variant_key[start_index..end_index];
let variant_path = PathBuf::from(String::from_utf8(path_bytes.to_vec())?);
let key = self.details_key(&variant_path, &filename)?;
self.inner.details_tree.insert(key, details)?;
} else {
if !hash_variant_key.ends_with(DETAILS) {
let variant_path =
PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?);
if tokio::fs::metadata(&variant_path).await.is_ok() {
@ -71,13 +60,14 @@ impl UploadManager {
.as_bytes()
.to_vec();
let variant_key = self.variant_key(&target_path, &filename)?;
let variant_key = self.migrate_variant_key(&variant_path, &filename)?;
self.inner
.path_tree
.insert(variant_key, relative_target_path_bytes)?;
safe_move_file(variant_path, target_path).await?;
safe_move_file(variant_path.clone(), target_path).await?;
self.try_remove_parents(&variant_path).await?;
}
}
@ -90,7 +80,7 @@ impl UploadManager {
}
fn restructure_complete(&self) -> Result<bool, Error> {
Ok(!self
Ok(self
.inner
.settings_tree
.get(RESTRUCTURE_COMPLETE)?
@ -106,31 +96,19 @@ impl UploadManager {
}
pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&self.inner.image_dir)?)
Ok(path.strip_prefix(&self.inner.root_dir)?)
}
pub(super) fn details_key(
fn migrate_variant_key(
&self,
variant_path: &Path,
variant_process_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let path = self
.generalize_path(variant_process_path)?
.strip_prefix("files")?;
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
pub(super) fn variant_key(
&self,
variant_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
self.variant_key(path, filename)
}
}

View file

@ -229,9 +229,9 @@ impl UploadManagerSession {
}
// -- WRITE NEW FILE --
let mut real_path = self.manager.image_dir();
real_path.push(name);
let real_path = self.manager.next_directory()?.join(&name);
self.manager.store_path(name, &real_path).await?;
crate::safe_move_file(tmpfile, real_path).await?;
Ok(())
@ -280,22 +280,20 @@ impl UploadManagerSession {
// generate a short filename that isn't already in-use
#[instrument(skip(self, content_type))]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
let image_dir = self.manager.image_dir();
loop {
debug!("Filename generation loop");
let mut path = image_dir.clone();
let s: String = Uuid::new_v4().to_string();
let filename = file_name(s, content_type.clone())?;
path.push(filename.clone());
let path_tree = self.manager.inner.path_tree.clone();
let filename2 = filename.clone();
let filename_exists = web::block(move || path_tree.get(filename2.as_bytes()))
.await??
.is_some();
if let Err(e) = tokio::fs::metadata(path).await {
if e.kind() == std::io::ErrorKind::NotFound {
debug!("Generated unused filename {}", filename);
return Ok(filename);
}
return Err(e.into());
if !filename_exists {
return Ok(filename);
}
debug!("Filename exists, trying again");