range header support

This commit is contained in:
John Doe 2021-01-13 20:08:04 -05:00
parent 6e9c7e3dbe
commit 61061f0451
2 changed files with 117 additions and 17 deletions

View file

@ -15,6 +15,9 @@ pub(crate) enum UploadError {
#[error("Error parsing string, {0}")]
ParseString(#[from] std::string::FromUtf8Error),
#[error("Error parsing request, {0}")]
ParseReq(String),
#[error("Error interacting with filesystem, {0}")]
Io(#[from] std::io::Error),
@ -112,7 +115,8 @@ impl ResponseError for UploadError {
UploadError::Gif(_)
| UploadError::DuplicateAlias
| UploadError::NoFiles
| UploadError::Upload(_) => StatusCode::BAD_REQUEST,
| UploadError::Upload(_)
| UploadError::ParseReq(_) => StatusCode::BAD_REQUEST,
UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND,
UploadError::InvalidToken => StatusCode::FORBIDDEN,
_ => StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -1,14 +1,12 @@
use actix_form_data::{Field, Form, Value};
use actix_web::{
client::Client,
guard,
http::header::{CacheControl, CacheDirective, LastModified},
middleware::{Compress, Logger},
web, App, HttpResponse, HttpServer,
};
use futures::stream::{Stream, TryStreamExt};
use actix_fs::file;
use actix_web::{App, HttpResponse, HttpServer, client::Client, guard, http::header::{ACCEPT_RANGES, CONTENT_LENGTH, CacheControl, CacheDirective, ContentRange, ContentRangeSpec, Header, LastModified}, middleware::{Compress, Logger}, web};
use bytes::Bytes;
use futures::{StreamExt, stream::{Stream, TryStreamExt}};
use once_cell::sync::Lazy;
use std::{collections::HashSet, path::PathBuf, sync::Once, time::SystemTime};
use std::{
collections::HashSet, convert::TryInto, io, path::PathBuf, sync::Once, time::SystemTime,
};
use structopt::StructOpt;
use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter;
@ -486,6 +484,7 @@ async fn details(
/// Serve files
#[instrument(skip(manager))]
async fn serve(
req: web::HttpRequest,
alias: web::Path<String>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, UploadError> {
@ -505,14 +504,81 @@ async fn serve(
details
};
let stream = actix_fs::read_to_stream(path).await?;
match req.headers().get("Range") {
//Range header exists - return as ranged
Some(range_head) => {
let range_hdr = range_head.to_str().map_err(|_| {
UploadError::ParseReq("Range header contains non-utf8 characters".to_string())
})?;
Ok(srv_response(
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
))
let range_dashed = range_hdr
.split('=')
.skip(1)
.next()
.ok_or(UploadError::ParseReq("Malformed Range header".to_string()))?;
let range: Vec<u64> = range_dashed
.split('-')
.map(|s| s.parse::<u64>())
.collect::<Result<Vec<u64>, _>>()
.map_err(|_| {
UploadError::ParseReq("Cannot parse byte locations in range header".to_string())
})?;
let (out_file, _) = file::seek(
file::open(path).await?,
io::SeekFrom::Current(range[0].try_into().map_err(|_| {
UploadError::ParseReq("Byte locations too high in range header".to_string())
})?),
)
.await?;
let (out_file, meta) = file::metadata(out_file)
.await
.map_err(|_| UploadError::Upload("Error reading metadata".to_string()))?;
let whole_to = ((range[1] - range[0]) as f64 / 65_356.0).floor() as usize;
let partial_len = ((range[1] - range[0]) % 65_536) as usize;
let stream = file::read_to_stream(out_file)
.await?
.take(whole_to + 1)
.enumerate()
.map(move |bytes_res| {
match bytes_res.1 {
Ok(mut bytes) => {
if bytes_res.0 == whole_to {
return Ok(bytes.split_to(partial_len));
}
return Ok(bytes);
}
Err(e) => Err(e),
}
});
return Ok(srv_ranged_response(
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
Some((range[0], range[1])),
Some(meta.len()),
));
}
//No Range header in the request - return the entire document
None => {
let stream = actix_fs::read_to_stream(path).await?;
return Ok(srv_response(
stream,
details.content_type(),
7 * DAYS,
details.system_time(),
));
}
};
}
// A helper method to produce responses with proper cache headers
@ -533,6 +599,36 @@ where
CacheDirective::MaxAge(expires),
CacheDirective::Extension("immutable".to_owned(), None),
]))
.set_header(ACCEPT_RANGES, "bytes")
.content_type(ext.to_string())
.streaming(stream.err_into())
}
fn srv_ranged_response<S, E>(
stream: S,
ext: mime::Mime,
expires: u32,
modified: SystemTime,
range: Option<(u64, u64)>,
instance_length: Option<u64>,
) -> HttpResponse
where
S: Stream<Item = Result<bytes::Bytes, E>> + Unpin + 'static,
E: Into<UploadError>,
{
HttpResponse::PartialContent()
.set(LastModified(modified.into()))
.set(CacheControl(vec![
CacheDirective::Public,
CacheDirective::MaxAge(expires),
CacheDirective::Extension("immutable".to_owned(), None),
]))
.set(ContentRange(ContentRangeSpec::Bytes {
range,
instance_length,
}))
.set_header(CONTENT_LENGTH, range.unwrap().1 - range.unwrap().0)
.set_header(ACCEPT_RANGES, "bytes")
.content_type(ext.to_string())
.streaming(stream.err_into())
}