ranged support for process endpoint, bugfixes

This commit is contained in:
John Doe 2021-01-14 08:52:11 -05:00
parent 61061f0451
commit c663337bcd
2 changed files with 75 additions and 38 deletions

View file

@ -74,6 +74,9 @@ pub(crate) enum UploadError {
#[error("{0}")]
Json(#[from] serde_json::Error),
#[error("Range header not satisfiable")]
Range,
}
impl From<actix_web::client::SendRequestError> for UploadError {
@ -119,6 +122,7 @@ impl ResponseError for UploadError {
| UploadError::ParseReq(_) => StatusCode::BAD_REQUEST,
UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND,
UploadError::InvalidToken => StatusCode::FORBIDDEN,
UploadError::Range => StatusCode::RANGE_NOT_SATISFIABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}

View file

@ -1,6 +1,6 @@
use actix_form_data::{Field, Form, Value};
use actix_fs::file;
use actix_web::{App, HttpResponse, HttpServer, client::Client, guard, http::header::{ACCEPT_RANGES, CONTENT_LENGTH, CacheControl, CacheDirective, ContentRange, ContentRangeSpec, Header, LastModified}, middleware::{Compress, Logger}, web};
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, client::Client, guard, http::{HeaderValue, header::{ACCEPT_RANGES, CONTENT_LENGTH, CacheControl, CacheDirective, ContentRange, ContentRangeSpec, Header, LastModified}}, middleware::{Compress, Logger}, web};
use bytes::Bytes;
use futures::{StreamExt, stream::{Stream, TryStreamExt}};
use once_cell::sync::Lazy;
@ -333,6 +333,7 @@ async fn process_details(
/// Process files
#[instrument(skip(manager, whitelist))]
async fn process(
req: HttpRequest,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
@ -427,14 +428,36 @@ async fn process(
drop(entered);
});
return Ok(srv_response(
Box::pin(futures::stream::once(async {
Ok(img_bytes) as Result<_, UploadError>
})),
details.content_type(),
7 * DAYS,
details.system_time(),
));
match req.headers().get("Range") {
Some(range_head) => {
let range = parse_range_header(range_head)?;
let resp_bytes = img_bytes.slice(range[0] as usize..range[1] as usize);
let stream = Box::pin(futures::stream::once(async move {
Ok(resp_bytes) as Result<_, UploadError>
}));
return Ok(srv_ranged_response(
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
Some((range[0], range[1])),
Some(img_bytes.len() as u64)));
}
None => {
return Ok(srv_response(
Box::pin(futures::stream::once(async {
Ok(img_bytes) as Result<_, UploadError>
})),
details.content_type(),
7 * DAYS,
details.system_time(),
));
}
};
}
let details = if let Some(details) = details {
@ -447,14 +470,7 @@ async fn process(
details
};
let stream = actix_fs::read_to_stream(thumbnail_path).await?;
Ok(srv_response(
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
))
ranged_file_resp(thumbnail_path, req, details).await
}
/// Fetch file details
@ -504,27 +520,40 @@ async fn serve(
details
};
ranged_file_resp(path, req, details).await
}
fn parse_range_header(range_head: &HeaderValue) -> Result<Vec<u64>, UploadError> {
let range_head_str = range_head.to_str().map_err(|_| {
UploadError::ParseReq("Range header contains non-utf8 characters".to_string())
})?;
let range_dashed = range_head_str
.split('=')
.skip(1)
.next()
.ok_or(UploadError::ParseReq("Malformed Range header".to_string()))?;
let range: Vec<u64> = range_dashed
.split('-')
.map(|s| s.parse::<u64>())
.collect::<Result<Vec<u64>, _>>()
.map_err(|_| {
UploadError::ParseReq("Cannot parse byte locations in range header".to_string())
})?;
if range[0] > range[1] {
return Err(UploadError::Range);
}
Ok(range)
}
async fn ranged_file_resp(path: PathBuf, req: HttpRequest, details: Details) -> Result<HttpResponse, UploadError> {
match req.headers().get("Range") {
//Range header exists - return as ranged
Some(range_head) => {
let range_hdr = range_head.to_str().map_err(|_| {
UploadError::ParseReq("Range header contains non-utf8 characters".to_string())
})?;
let range_dashed = range_hdr
.split('=')
.skip(1)
.next()
.ok_or(UploadError::ParseReq("Malformed Range header".to_string()))?;
let range: Vec<u64> = range_dashed
.split('-')
.map(|s| s.parse::<u64>())
.collect::<Result<Vec<u64>, _>>()
.map_err(|_| {
UploadError::ParseReq("Cannot parse byte locations in range header".to_string())
})?;
let range = parse_range_header(range_head)?;
let (out_file, _) = file::seek(
file::open(path).await?,
@ -538,10 +567,15 @@ async fn serve(
.await
.map_err(|_| UploadError::Upload("Error reading metadata".to_string()))?;
if meta.len() < range[0] {
return Err(UploadError::Range);
}
// file::read_to_stream() creates a stream in 65,356 byte chunks.
let whole_to = ((range[1] - range[0]) as f64 / 65_356.0).floor() as usize;
let partial_len = ((range[1] - range[0]) % 65_536) as usize;
let partial_len = ((range[1] - range[0]) % 65_356) as usize;
//debug!("Range of {}. Returning {} whole chunks, and {} bytes of the partial chunk", range[1]-range[0], whole_to, partial_len);
let stream = file::read_to_stream(out_file)
.await?
@ -550,7 +584,7 @@ async fn serve(
.map(move |bytes_res| {
match bytes_res.1 {
Ok(mut bytes) => {
if bytes_res.0 == whole_to {
if bytes_res.0 == whole_to && partial_len <= bytes.len() {
return Ok(bytes.split_to(partial_len));
}
return Ok(bytes);
@ -627,7 +661,6 @@ where
range,
instance_length,
}))
.set_header(CONTENT_LENGTH, range.unwrap().1 - range.unwrap().0)
.set_header(ACCEPT_RANGES, "bytes")
.content_type(ext.to_string())
.streaming(stream.err_into())