Initial implementation

This commit is contained in:
asonix 2023-07-16 12:26:17 -05:00
parent 9171d226a2
commit 61ecb0914b
4 changed files with 2068 additions and 2 deletions

1556
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,3 +6,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.8.0"
awc = { version = "3.1.1", default-features = false, features = ["rustls"] }
clap = { version = "4.3.12", features = ["derive"] }
color-eyre = "0.6.2"
eyre = "0.6.8"
mime = "0.3.17"
multipart-client-stream = { git = "https://git.asonix.dog/asonix/multipart-client-stream", version = "0.1.0" }
serde = { version = "1.0.171", features = ["derive"] }
serde_json = "1.0.103"
time = { version = "0.3.23", features = ["serde", "parsing", "formatting"] }
tokio = { version = "1.29.1", default-features = false, features = ["fs"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
url = "2.4.0"
uuid = { version = "1.4.0", features = ["serde"] }

View file

@ -1,3 +1,304 @@
fn main() {
println!("Hello, world!");
mod pict_rs;
use std::{
fs::FileType,
path::{Path, PathBuf},
rc::Rc,
time::Duration,
};
use awc::{
http::{header::CONTENT_TYPE, StatusCode},
Client,
};
use clap::Parser;
use eyre::ErrReport;
use multipart_client_stream::{Body, Part};
use pict_rs::{Image, ImageResponse, Upload, UploadReponse};
use tokio::{
sync::{
mpsc::{Receiver, Sender},
Semaphore,
},
task::JoinSet,
};
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer, Registry};
use url::Url;
#[actix_rt::main]
async fn main() -> color_eyre::Result<()> {
let Args {
endpoint,
ingest,
out,
} = Args::parse();
init_tracing()?;
let (state, mut receiver) = State::new(endpoint);
let file_type = tokio::fs::metadata(&ingest).await?.file_type();
let handle = actix_rt::spawn(async move { state.visit_path(&ingest, file_type).await });
let mut images = Vec::new();
while let Some(image) = receiver.recv().await {
images.push(image);
}
let json = serde_json::to_vec(&images)?;
tokio::fs::write(out, json).await?;
handle.await??;
Ok(())
}
#[derive(Debug, clap::Parser)]
struct Args {
#[clap(short, long)]
endpoint: Url,
#[clap(short, long)]
ingest: PathBuf,
#[clap(short, long)]
out: PathBuf,
}
#[derive(Clone)]
struct State {
inner: Rc<StateInner>,
}
struct StateInner {
endpoint: Url,
client: Client,
semaphore: Semaphore,
sender: Sender<Image>,
}
#[derive(Debug, Default)]
struct MultiError {
errors: Vec<ErrReport>,
}
impl State {
fn new(endpoint: Url) -> (Self, Receiver<Image>) {
let (sender, receiver) = tokio::sync::mpsc::channel(8);
let this = Self {
inner: Rc::new(StateInner {
endpoint,
client: Client::builder()
.add_default_header(("User-Agent", "pict-rs-uploader v0.1.0"))
.finish(),
semaphore: Semaphore::new(8),
sender,
}),
};
(this, receiver)
}
async fn visit_path(&self, path: &Path, file_type: FileType) -> color_eyre::Result<()> {
if file_type.is_file() {
self.visit_file(path).await?;
} else if file_type.is_dir() {
self.visit_dir(path).await?;
}
Ok(())
}
async fn visit_file(&self, path: &Path) -> color_eyre::Result<()> {
let image_response = self.inner.upload(path).await?;
match image_response {
ImageResponse::Ok { files, .. } => {
for image in files {
self.inner
.sender
.send(image)
.await
.expect("receiver shouldn't be dead");
}
}
ImageResponse::Error { msg } => {
tracing::warn!("Upload for {path:?} failed with message\n{msg}");
}
}
Ok(())
}
#[tracing::instrument(skip(self))]
async fn visit_dir(&self, path: &Path) -> color_eyre::Result<()> {
let mut read_dir = tokio::fs::read_dir(path).await?;
let mut set = JoinSet::new();
let mut errors = MultiError::new();
while let Some(entry) = read_dir.next_entry().await? {
if set.len() > 4 {
if let Some(res) = set.join_next().await {
match res {
Ok(Err(e)) => {
errors.push(e);
break;
}
Err(e) => {
errors.push(e.into());
break;
}
_ => {}
}
}
}
let this = self.clone();
set.spawn_local(async move {
let file_type = entry.file_type().await?;
let path = entry.path();
this.visit_path(&path, file_type).await
});
}
while let Some(res) = set.join_next().await {
match res {
Ok(Err(e)) => errors.push(e),
Err(e) => errors.push(e.into()),
_ => {}
}
}
if !errors.is_empty() {
return Err(errors.into());
}
Ok(())
}
}
impl StateInner {
#[tracing::instrument(skip(self))]
async fn upload(&self, path: &Path) -> color_eyre::Result<ImageResponse> {
let guard = self.semaphore.acquire().await?;
let filename = path
.file_name()
.ok_or(eyre::eyre!("Path does not have filename"))?
.to_str()
.ok_or(eyre::eyre!("Filename is not valid utf8"))?;
let file = tokio::fs::File::open(path).await?;
let body = Body::builder()
.append(Part::new("images[]", file).filename(filename))
.build();
let mut response = self
.client
.post(self.upload_endpoint())
.insert_header((CONTENT_TYPE, body.content_type()))
.timeout(Duration::from_secs(60))
.send_stream(body)
.await
.map_err(|e| eyre::eyre!("Error sending request {e}"))?;
let response: UploadReponse = response.json().await?;
let mut uploads = match response {
UploadReponse::Error { msg } => {
return Err(eyre::eyre!("Error uploading image: {msg}"))
}
UploadReponse::Ok { uploads, .. } => uploads,
};
let upload = uploads
.pop()
.ok_or(eyre::eyre!("Expected one upload in response"))?;
let response = self.long_poll(&upload).await?;
drop(guard);
Ok(response)
}
async fn long_poll(&self, upload: &Upload) -> color_eyre::Result<ImageResponse> {
let claim_endpoint = self.claim_endpoint();
let mut response = loop {
let response = self
.client
.get(&claim_endpoint)
.query(upload)?
.timeout(Duration::from_secs(20))
.send()
.await
.map_err(|e| eyre::eyre!("Error sending request {e}"))?;
if response.status() != StatusCode::NO_CONTENT {
break response;
}
};
let response: ImageResponse = response.json().await?;
Ok(response)
}
fn upload_endpoint(&self) -> String {
let mut url: Url = self.endpoint.clone();
url.set_path("/image/backgrounded");
url.to_string()
}
fn claim_endpoint(&self) -> String {
let mut url: Url = self.endpoint.clone();
url.set_path("/image/backgrounded/claim");
url.to_string()
}
}
fn init_tracing() -> color_eyre::Result<()> {
color_eyre::install()?;
let targets: Targets = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "info".into())
.parse()?;
let format_layer = tracing_subscriber::fmt::layer().with_filter(targets);
let subscriber = Registry::default().with(format_layer);
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
impl MultiError {
fn new() -> Self {
Self::default()
}
fn push(&mut self, report: ErrReport) {
self.errors.push(report);
}
fn is_empty(&self) -> bool {
self.errors.is_empty()
}
}
impl std::fmt::Display for MultiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for e in &self.errors {
e.fmt(f)?;
}
Ok(())
}
}
impl std::error::Error for MultiError {}

194
src/pict_rs.rs Normal file
View file

@ -0,0 +1,194 @@
use serde::de::Error;
use time::OffsetDateTime;
use uuid::Uuid;
#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
pub(crate) enum UploadReponse {
Ok {
#[allow(unused)]
msg: OkString,
uploads: Vec<Upload>,
},
Error {
msg: String,
},
}
#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
pub(crate) enum ImageResponse {
Ok {
#[allow(unused)]
msg: OkString,
files: Vec<Image>,
},
Error {
msg: String,
},
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Upload {
pub(crate) upload_id: Uuid,
}
#[derive(Debug, serde::Deserialize)]
pub(crate) enum OkString {
#[serde(rename = "ok")]
Ok,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Image {
pub(crate) delete_token: DeleteToken,
pub(crate) file: Alias,
pub(crate) details: Details,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub(crate) enum DeleteToken {
Uuid(Uuid),
Old(String),
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub(crate) enum Alias {
Alias(AliasInner),
Old(String),
}
#[derive(Debug)]
pub(crate) struct AliasInner {
uuid: Uuid,
extension: String,
}
#[derive(Debug)]
pub(crate) struct AliasError;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Details {
pub(crate) width: u16,
pub(crate) height: u16,
pub(crate) frames: Option<u32>,
pub(crate) content_type: Mime,
#[serde(with = "time::serde::rfc3339")]
pub(crate) created_at: OffsetDateTime,
}
#[derive(Debug)]
pub(crate) struct Mime(mime::Mime);
impl std::ops::Deref for Mime {
type Target = mime::Mime;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<mime::Mime> for Mime {
fn as_ref(&self) -> &mime::Mime {
&self.0
}
}
impl std::str::FromStr for AliasInner {
type Err = AliasError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some((left, right)) = s.split_once('.') {
let uuid = left.parse::<Uuid>().map_err(|_| AliasError)?;
Ok(AliasInner {
uuid,
extension: String::from(right),
})
} else {
Err(AliasError)
}
}
}
impl std::fmt::Display for AliasInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.uuid, self.extension)
}
}
impl std::fmt::Display for AliasError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Invalid alias format")
}
}
impl std::error::Error for AliasError {}
impl<'de> serde::Deserialize<'de> for AliasInner {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
s.parse::<AliasInner>().map_err(D::Error::custom)
}
}
impl serde::Serialize for AliasInner {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = self.to_string();
String::serialize(&s, serializer)
}
}
impl<'de> serde::Deserialize<'de> for Mime {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
s.parse::<mime::Mime>().map_err(D::Error::custom).map(Mime)
}
}
impl serde::Serialize for Mime {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = self.to_string();
String::serialize(&s, serializer)
}
}
#[cfg(test)]
mod tests {
#[test]
fn deserialize_ok() {
#[derive(serde::Deserialize)]
struct TestStruct {
#[serde(rename = "msg")]
_msg: super::OkString,
}
let _: TestStruct = serde_json::from_str(r#"{"msg":"ok"}"#).expect("Deserialized");
}
#[test]
fn deserialize_image_response() {
let response: super::ImageResponse =
serde_json::from_str(r#"{"msg": "ok", "files": []}"#).expect("Deserialized");
assert!(matches!(response, super::ImageResponse::Ok { .. }))
}
}