Multipart client implementation

This commit is contained in:
asonix 2023-07-15 20:29:02 -05:00
parent ec2116e40b
commit 38e72a8c5b
4 changed files with 519 additions and 8 deletions

View file

@ -1,8 +1,17 @@
[package]
name = "awc-multipart"
name = "multipart-client-stream"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "1"
futures-core = "0.3"
mime = "0.3"
rand = { version = "0.8", features = ["small_rng"] }
tokio = { version = "1", default-features = false, features = [ "io-util" ] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }

View file

@ -1,5 +1,5 @@
{
description = "awc-multipart";
description = "multipart-client-stream";
inputs = {
nixpkgs.url = "nixpkgs/nixos-unstable";

239
src/internal.rs Normal file
View file

@ -0,0 +1,239 @@
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};
use bytes::{BufMut, BytesMut};
use tokio::io::{AsyncRead, ReadBuf};
const CONTENT_TYPE: &[u8] = b"content-type";
const CONTENT_DISPOSITION: &[u8] = b"content-disposition";
pub struct SendRead<'a>(pub(super) Pin<Box<dyn AsyncRead + Send + 'a>>);
pub struct UnsendRead<'a>(pub(super) Pin<Box<dyn AsyncRead + 'a>>);
impl<'a> From<SendRead<'a>> for UnsendRead<'a> {
fn from(value: SendRead<'a>) -> Self {
UnsendRead(value.0)
}
}
pub(super) struct Body<R> {
boundary: Vec<u8>,
pending: BytesMut,
current: Option<R>,
rest: VecDeque<Part<R>>,
closed: bool,
}
pub(super) struct Part<R> {
content_type: Vec<u8>,
content_disposition: Vec<u8>,
reader: R,
}
fn boundary_len(boundary: &[u8]) -> usize {
boundary.len() + 2
}
fn write_boundary<B: BufMut>(boundary: &[u8], buf: &mut B) {
buf.put_slice(b"--");
buf.put_slice(boundary);
}
fn final_boundary_len(boundary: &[u8]) -> usize {
boundary_len(boundary) + 2
}
fn write_final_boundary<B: BufMut>(boundary: &[u8], buf: &mut B) {
write_boundary(boundary, buf);
buf.put_slice(b"--");
}
fn crlf_len() -> usize {
2
}
fn write_crlf<B: BufMut>(buf: &mut B) {
buf.put_slice(b"\r\n");
}
fn headers_len<R>(part: &Part<R>) -> usize {
crlf_len()
+ CONTENT_TYPE.len()
+ 2
+ part.content_type.len()
+ crlf_len()
+ CONTENT_DISPOSITION.len()
+ 2
+ part.content_disposition.len()
+ crlf_len()
+ crlf_len()
}
fn write_headers<B: BufMut, R>(part: &Part<R>, buf: &mut B) {
write_crlf(buf);
buf.put_slice(CONTENT_TYPE);
buf.put_slice(b": ");
buf.put_slice(&part.content_type);
write_crlf(buf);
buf.put_slice(CONTENT_DISPOSITION);
buf.put_slice(b": ");
buf.put_slice(&part.content_disposition);
write_crlf(buf);
write_crlf(buf);
}
impl<R> Body<R> {
pub(super) fn new(boundary: String, parts: VecDeque<Part<R>>) -> Self {
Self {
boundary: Vec::from(boundary),
pending: BytesMut::new(),
current: None,
rest: parts,
closed: false,
}
}
fn write_boundary_to_pending(&mut self) {
write_boundary(&self.boundary, &mut self.pending);
}
fn write_headers_to_pending(&mut self, part: &Part<R>) {
write_headers(part, &mut self.pending);
}
fn write_final_boundary_to_pending(&mut self) {
write_final_boundary(&self.boundary, &mut self.pending);
}
fn write_clrf_to_pending(&mut self) {
write_crlf(&mut self.pending)
}
}
impl<'a> Part<SendRead<'a>> {
pub(super) fn new(
content_type: String,
content_disposition: String,
reader: SendRead<'a>,
) -> Self {
Part {
content_type: Vec::from(content_type),
content_disposition: Vec::from(content_disposition),
reader,
}
}
}
impl<'a> Part<UnsendRead<'a>> {
pub(super) fn new(
content_type: String,
content_disposition: String,
reader: UnsendRead<'a>,
) -> Self {
Part {
content_type: Vec::from(content_type),
content_disposition: Vec::from(content_disposition),
reader,
}
}
}
impl<'a> AsyncRead for SendRead<'a> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl<'a> AsyncRead for UnsendRead<'a> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl<R> AsyncRead for Body<R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
let initial_len = buf.filled().len();
if !self.pending.is_empty() {
if self.pending.len() > buf.remaining() {
let bytes = self.pending.split_to(buf.remaining());
buf.put_slice(&bytes);
return Poll::Ready(Ok(()));
} else {
buf.put_slice(&self.pending);
self.pending.clear();
}
}
if self.closed {
return Poll::Ready(Ok(()));
}
let before_poll = buf.filled().len();
if let Some(ref mut reader) = self.current {
match Pin::new(reader).poll_read(cx, buf) {
Poll::Ready(Ok(())) if buf.filled().len() == before_poll => {
self.current.take();
if crlf_len() < buf.remaining() {
write_crlf(buf);
} else {
self.write_clrf_to_pending();
}
self.poll_read(cx, buf)
}
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(otherwise) => Poll::Ready(otherwise),
Poll::Pending if buf.filled().len() > initial_len => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
} else if let Some(part) = self.rest.pop_front() {
let fill_buf = if boundary_len(&self.boundary) < buf.remaining() {
write_boundary(&self.boundary, buf);
true
} else {
self.write_boundary_to_pending();
false
};
if fill_buf && headers_len(&part) < buf.remaining() {
write_headers(&part, buf);
} else {
self.write_headers_to_pending(&part);
};
self.current = Some(part.reader);
if buf.remaining() > 0 {
self.poll_read(cx, buf)
} else {
Poll::Ready(Ok(()))
}
} else if buf.remaining() > final_boundary_len(&self.boundary) {
write_final_boundary(&self.boundary, buf);
self.closed = true;
Poll::Ready(Ok(()))
} else {
self.write_final_boundary_to_pending();
self.closed = true;
self.poll_read(cx, buf)
}
}
}

View file

@ -1,14 +1,277 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
mod internal;
use std::{collections::VecDeque, io::Cursor, pin::Pin};
use bytes::Bytes;
use futures_core::Stream;
use internal::{SendRead, UnsendRead};
use mime::Mime;
use rand::{distributions::Alphanumeric, rngs::SmallRng, Rng, SeedableRng};
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
pub struct Body<R> {
stream: ReaderStream<internal::Body<R>>,
}
pub struct BodyBuilder<R> {
boundary: String,
parts: Vec<Part<R>>,
}
pub struct Part<R> {
name: String,
content_type: Option<Mime>,
filename: Option<String>,
reader: R,
}
#[derive(Debug)]
pub struct Empty;
impl<'a> Body<SendRead<'a>> {
pub fn builder() -> BodyBuilder<SendRead<'a>> {
let boundary = SmallRng::from_entropy()
.sample_iter(&Alphanumeric)
.map(char::from)
.take(6)
.collect::<String>();
BodyBuilder {
boundary,
parts: Vec::new(),
}
}
}
impl<'a> BodyBuilder<SendRead<'a>> {
pub fn boundary(mut self, boundary: String) -> Self {
self.boundary = boundary;
self
}
pub fn append(mut self, part: Part<SendRead<'a>>) -> Self {
self.parts.push(part);
self
}
pub fn append_unsend(self, part: Part<UnsendRead<'a>>) -> BodyBuilder<UnsendRead<'a>> {
let mut parts: Vec<_> = self
.parts
.into_iter()
.map(Part::<UnsendRead<'a>>::from)
.collect();
parts.push(part);
BodyBuilder {
boundary: self.boundary,
parts,
}
}
pub fn build(self) -> Body<SendRead<'a>> {
let parts = self
.parts
.into_iter()
.map(Part::<SendRead<'a>>::build)
.collect();
Body {
stream: ReaderStream::new(internal::Body::new(self.boundary, parts)),
}
}
}
impl<'a> BodyBuilder<UnsendRead<'a>> {
pub fn boundary(mut self, boundary: String) -> Self {
self.boundary = boundary;
self
}
pub fn append(mut self, part: Part<SendRead<'a>>) -> Self {
self.parts.push(From::from(part));
self
}
pub fn append_unsend(mut self, part: Part<UnsendRead<'a>>) -> BodyBuilder<UnsendRead<'a>> {
self.parts.push(part);
self
}
pub fn build(self) -> Body<UnsendRead<'a>> {
let parts: VecDeque<internal::Part<UnsendRead<'a>>> = self
.parts
.into_iter()
.map(Part::<UnsendRead<'a>>::build)
.collect();
Body {
stream: ReaderStream::new(internal::Body::new(self.boundary, parts)),
}
}
}
fn encode(value: String) -> String {
value.replace('"', "\\\"")
}
impl<'a> Part<SendRead<'a>> {
pub fn new<R: AsyncRead + Send + 'a>(name: String, reader: R) -> Self {
Part {
name,
content_type: None,
filename: None,
reader: SendRead(Box::pin(reader)),
}
}
pub fn new_unsend<R: AsyncRead + 'a>(name: String, reader: R) -> Part<UnsendRead<'a>> {
Part {
name,
content_type: None,
filename: None,
reader: UnsendRead(Box::pin(reader)),
}
}
pub fn new_str(name: String, text: &'a str) -> Self {
Self::new(name, text.as_bytes()).content_type(mime::TEXT_PLAIN)
}
pub fn new_string(name: String, text: String) -> Self {
Self::new(name, Cursor::new(text)).content_type(mime::TEXT_PLAIN)
}
pub fn content_type(mut self, content_type: mime::Mime) -> Self {
self.content_type = Some(content_type);
self
}
pub fn filename(mut self, filename: String) -> Self {
self.filename = Some(filename);
self
}
fn build(self) -> internal::Part<SendRead<'a>> {
let content_type = self.content_type.unwrap_or(mime::APPLICATION_OCTET_STREAM);
let name = encode(self.name);
let filename = self.filename.map(encode);
let content_disposition = if let Some(filename) = filename {
format!("form-data; name=\"{name}\"; filename=\"{filename}\"")
} else {
format!("form-data; name=\"{name}\"")
};
internal::Part::<SendRead<'a>>::new(
content_type.to_string(),
content_disposition,
self.reader,
)
}
}
impl<'a> Part<UnsendRead<'a>> {
pub fn content_type(mut self, content_type: mime::Mime) -> Self {
self.content_type = Some(content_type);
self
}
pub fn filename(mut self, filename: String) -> Self {
self.filename = Some(filename);
self
}
fn build(self) -> internal::Part<UnsendRead<'a>> {
let content_type = self.content_type.unwrap_or(mime::APPLICATION_OCTET_STREAM);
let name = encode(self.name);
let filename = self.filename.map(encode);
let content_disposition = if let Some(filename) = filename {
format!("form-data; name=\"{name}\"; filename=\"{filename}\"")
} else {
format!("form-data; name=\"{name}\"")
};
internal::Part::<UnsendRead<'a>>::new(
content_type.to_string(),
content_disposition,
self.reader,
)
}
}
impl<R> Stream for Body<R>
where
R: AsyncRead + Unpin,
{
type Item = std::io::Result<Bytes>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().stream).poll_next(cx)
}
}
impl<'a> From<Part<SendRead<'a>>> for Part<UnsendRead<'a>> {
fn from(value: Part<SendRead<'a>>) -> Self {
Self {
name: value.name,
content_type: value.content_type,
filename: value.filename,
reader: UnsendRead::from(value.reader),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{future::poll_fn, pin::Pin};
struct Streamer<S>(S);
impl<S> Streamer<S> {
async fn next(&mut self) -> Option<S::Item>
where
S: futures_core::Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut self.0).poll_next(cx)).await
}
}
#[tokio::test]
async fn build_text() {
let body = super::Body::builder()
.boundary(String::from("hello"))
.append(super::Part::new_str(String::from("first_name"), "John"))
.append(super::Part::new_str(String::from("last_name"), "Doe"))
.build();
let mut out = Vec::new();
let mut streamer = Streamer(body);
while let Some(res) = streamer.next().await {
out.extend(res.expect("read success"));
}
let out = String::from_utf8(out).expect("Valid string");
assert_eq!(out, "--hello\r\ncontent-type: text/plain\r\ncontent-disposition: form-data; name=\"first_name\"\r\n\r\nJohn\r\n--hello\r\ncontent-type: text/plain\r\ncontent-disposition: form-data; name=\"last_name\"\r\n\r\nDoe\r\n--hello--")
}
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
fn encode() {
let cases = [("hello", "hello"), ("Hello \"John\"", "Hello \\\"John\\\"")];
for (input, expected) in cases {
let output = super::encode(String::from(input));
assert_eq!(output, expected);
}
}
}