diff --git a/src/error.rs b/src/error.rs index 3f94780..dc50d75 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,9 @@ pub(crate) enum UploadError { #[error("Error parsing string, {0}")] ParseString(#[from] std::string::FromUtf8Error), + #[error("Error parsing request, {0}")] + ParseReq(String), + #[error("Error interacting with filesystem, {0}")] Io(#[from] std::io::Error), @@ -112,7 +115,8 @@ impl ResponseError for UploadError { UploadError::Gif(_) | UploadError::DuplicateAlias | UploadError::NoFiles - | UploadError::Upload(_) => StatusCode::BAD_REQUEST, + | UploadError::Upload(_) + | UploadError::ParseReq(_) => StatusCode::BAD_REQUEST, UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND, UploadError::InvalidToken => StatusCode::FORBIDDEN, _ => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/src/main.rs b/src/main.rs index 97b72e1..e13e2cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,12 @@ use actix_form_data::{Field, Form, Value}; -use actix_web::{ - client::Client, - guard, - http::header::{CacheControl, CacheDirective, LastModified}, - middleware::{Compress, Logger}, - web, App, HttpResponse, HttpServer, -}; -use futures::stream::{Stream, TryStreamExt}; +use actix_fs::file; +use actix_web::{App, HttpResponse, HttpServer, client::Client, guard, http::header::{ACCEPT_RANGES, CONTENT_LENGTH, CacheControl, CacheDirective, ContentRange, ContentRangeSpec, Header, LastModified}, middleware::{Compress, Logger}, web}; +use bytes::Bytes; +use futures::{StreamExt, stream::{Stream, TryStreamExt}}; use once_cell::sync::Lazy; -use std::{collections::HashSet, path::PathBuf, sync::Once, time::SystemTime}; +use std::{ + collections::HashSet, convert::TryInto, io, path::PathBuf, sync::Once, time::SystemTime, +}; use structopt::StructOpt; use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; @@ -486,6 +484,7 @@ async fn details( /// Serve files #[instrument(skip(manager))] async fn serve( + req: web::HttpRequest, alias: web::Path, manager: web::Data, ) -> Result { @@ -505,14 +504,81 @@ async fn serve( details }; - let stream = actix_fs::read_to_stream(path).await?; + match req.headers().get("Range") { + //Range header exists - return as ranged + Some(range_head) => { + let range_hdr = range_head.to_str().map_err(|_| { + UploadError::ParseReq("Range header contains non-utf8 characters".to_string()) + })?; - Ok(srv_response( - stream, - details.content_type(), - 7 * DAYS, - details.system_time(), - )) + let range_dashed = range_hdr + .split('=') + .skip(1) + .next() + .ok_or(UploadError::ParseReq("Malformed Range header".to_string()))?; + + let range: Vec = range_dashed + .split('-') + .map(|s| s.parse::()) + .collect::, _>>() + .map_err(|_| { + UploadError::ParseReq("Cannot parse byte locations in range header".to_string()) + })?; + + + let (out_file, _) = file::seek( + file::open(path).await?, + io::SeekFrom::Current(range[0].try_into().map_err(|_| { + UploadError::ParseReq("Byte locations too high in range header".to_string()) + })?), + ) + .await?; + + let (out_file, meta) = file::metadata(out_file) + .await + .map_err(|_| UploadError::Upload("Error reading metadata".to_string()))?; + + + let whole_to = ((range[1] - range[0]) as f64 / 65_356.0).floor() as usize; + let partial_len = ((range[1] - range[0]) % 65_536) as usize; + + + let stream = file::read_to_stream(out_file) + .await? + .take(whole_to + 1) + .enumerate() + .map(move |bytes_res| { + match bytes_res.1 { + Ok(mut bytes) => { + if bytes_res.0 == whole_to { + return Ok(bytes.split_to(partial_len)); + } + return Ok(bytes); + } + Err(e) => Err(e), + } + }); + + return Ok(srv_ranged_response( + stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + Some((range[0], range[1])), + Some(meta.len()), + )); + } + //No Range header in the request - return the entire document + None => { + let stream = actix_fs::read_to_stream(path).await?; + return Ok(srv_response( + stream, + details.content_type(), + 7 * DAYS, + details.system_time(), + )); + } + }; } // A helper method to produce responses with proper cache headers @@ -533,6 +599,36 @@ where CacheDirective::MaxAge(expires), CacheDirective::Extension("immutable".to_owned(), None), ])) + .set_header(ACCEPT_RANGES, "bytes") + .content_type(ext.to_string()) + .streaming(stream.err_into()) +} + +fn srv_ranged_response( + stream: S, + ext: mime::Mime, + expires: u32, + modified: SystemTime, + range: Option<(u64, u64)>, + instance_length: Option, +) -> HttpResponse +where + S: Stream> + Unpin + 'static, + E: Into, +{ + HttpResponse::PartialContent() + .set(LastModified(modified.into())) + .set(CacheControl(vec![ + CacheDirective::Public, + CacheDirective::MaxAge(expires), + CacheDirective::Extension("immutable".to_owned(), None), + ])) + .set(ContentRange(ContentRangeSpec::Bytes { + range, + instance_length, + })) + .set_header(CONTENT_LENGTH, range.unwrap().1 - range.unwrap().0) + .set_header(ACCEPT_RANGES, "bytes") .content_type(ext.to_string()) .streaming(stream.err_into()) }