Remove pin-project-lite, it wasn't needed

This commit is contained in:
Aode (Lion) 2022-03-09 23:21:08 -06:00
parent 5ff9ab7f46
commit 03ee0f606d
2 changed files with 27 additions and 34 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "actix-ws" name = "actix-ws"
description = "Websockets for the Actix runtime, without Actors" description = "Websockets for the Actix runtime, without Actors"
version = "0.2.4" version = "0.2.5"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
readme = "README.md" readme = "README.md"
repository = "https://git.asonix.dog/asonix/actix-actorless-websockets" repository = "https://git.asonix.dog/asonix/actix-actorless-websockets"
@ -17,5 +17,4 @@ actix-codec = "0.5.0"
actix-http = { version = "3.0", default-features = false, features = ["ws"] } actix-http = { version = "3.0", default-features = false, features = ["ws"] }
actix-web = { version = "4.0", default-features = false } actix-web = { version = "4.0", default-features = false }
futures-core = "0.3" futures-core = "0.3"
pin-project-lite = "0.2"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }

View file

@ -17,32 +17,26 @@ use std::{
}; };
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
pin_project_lite::pin_project! { /// A response body for Websocket HTTP Requests
/// A response body for Websocket HTTP Requests pub struct StreamingBody {
pub struct StreamingBody { session_rx: Receiver<Message>,
#[pin]
session_rx: Receiver<Message>,
messages: VecDeque<Message>, messages: VecDeque<Message>,
buf: BytesMut, buf: BytesMut,
codec: Codec, codec: Codec,
closing: bool, closing: bool,
}
} }
pin_project_lite::pin_project! { /// A stream of Messages from a websocket client
/// A stream of Messages from a websocket client ///
/// /// Messages can be accessed via the stream's `.next()` method
/// Messages can be accessed via the stream's `.next()` method pub struct MessageStream {
pub struct MessageStream { payload: Payload,
#[pin]
payload: Payload,
messages: VecDeque<Message>, messages: VecDeque<Message>,
buf: BytesMut, buf: BytesMut,
codec: Codec, codec: Codec,
closing: bool, closing: bool,
}
} }
impl StreamingBody { impl StreamingBody {
@ -71,7 +65,7 @@ impl MessageStream {
/// Wait for the next item from the message stream /// Wait for the next item from the message stream
/// ///
/// ```rust,ignore /// ```rust,ignore
/// while let Some(Ok(msg)) = stream.next().await { /// while let Some(Ok(msg)) = stream.recv().await {
/// // handle message /// // handle message
/// } /// }
/// ``` /// ```
@ -106,9 +100,9 @@ impl Stream for StreamingBody {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.get_mut();
if *this.closing { if this.closing {
return Poll::Ready(None); return Poll::Ready(None);
} }
@ -118,7 +112,7 @@ impl Stream for StreamingBody {
this.messages.push_back(msg); this.messages.push_back(msg);
} }
Poll::Ready(None) => { Poll::Ready(None) => {
*this.closing = true; this.closing = true;
break; break;
} }
Poll::Pending => break, Poll::Pending => break,
@ -126,7 +120,7 @@ impl Stream for StreamingBody {
} }
while let Some(msg) = this.messages.pop_front() { while let Some(msg) = this.messages.pop_front() {
if let Err(e) = this.codec.encode(msg, this.buf) { if let Err(e) = this.codec.encode(msg, &mut this.buf) {
return Poll::Ready(Some(Err(e.into()))); return Poll::Ready(Some(Err(e.into())));
} }
} }
@ -143,7 +137,7 @@ impl Stream for MessageStream {
type Item = Result<Message, ProtocolError>; type Item = Result<Message, ProtocolError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.get_mut();
// Return the first message in the queue if one exists // Return the first message in the queue if one exists
// //
@ -152,7 +146,7 @@ impl Stream for MessageStream {
return Poll::Ready(Some(Ok(msg))); return Poll::Ready(Some(Ok(msg)));
} }
if !*this.closing { if !this.closing {
// Read in bytes until there's nothing left to read // Read in bytes until there's nothing left to read
loop { loop {
match Pin::new(&mut this.payload).poll_next(cx) { match Pin::new(&mut this.payload).poll_next(cx) {
@ -166,7 +160,7 @@ impl Stream for MessageStream {
))))); )))));
} }
Poll::Ready(None) => { Poll::Ready(None) => {
*this.closing = true; this.closing = true;
break; break;
} }
Poll::Pending => break, Poll::Pending => break,
@ -175,7 +169,7 @@ impl Stream for MessageStream {
} }
// Create messages until there's no more bytes left // Create messages until there's no more bytes left
while let Some(frame) = this.codec.decode(this.buf)? { while let Some(frame) = this.codec.decode(&mut this.buf)? {
let message = match frame { let message = match frame {
Frame::Text(bytes) => { Frame::Text(bytes) => {
let s = std::str::from_utf8(&bytes) let s = std::str::from_utf8(&bytes)
@ -201,7 +195,7 @@ impl Stream for MessageStream {
} }
// If we've exhausted our message queue and we're closing, close the stream // If we've exhausted our message queue and we're closing, close the stream
if *this.closing { if this.closing {
return Poll::Ready(None); return Poll::Ready(None);
} }