Compare commits
6 commits
Author | SHA1 | Date | |
---|---|---|---|
Aode (Lion) | 03ee0f606d | ||
Aode (Lion) | 5ff9ab7f46 | ||
Aode (Lion) | c40608eceb | ||
Aode (Lion) | 3d6f491dd8 | ||
Aode (Lion) | 6fd331b135 | ||
Aode (Lion) | ac7fa058b6 |
11
Cargo.toml
11
Cargo.toml
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "actix-ws"
|
||||
description = "Websockets for the Actix runtime, without Actors"
|
||||
version = "0.2.0"
|
||||
version = "0.2.5"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
readme = "README.md"
|
||||
repository = "https://git.asonix.dog/asonix/actix-actorless-websockets"
|
||||
|
@ -13,11 +13,8 @@ edition = "2018"
|
|||
members = ["examples/chat"]
|
||||
|
||||
[dependencies]
|
||||
actix-codec = "0.5.0"
|
||||
actix-http = { version = "3.0", default-features = false, features = ["ws"] }
|
||||
actix-web = { version = "4.0", default-features = false }
|
||||
bytes = "1.0"
|
||||
futures-util = "0.3"
|
||||
pin-project = "1"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["sync", "io-util"] }
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
futures-core = "0.3"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
|
|
|
@ -5,8 +5,8 @@ _websockets for the Actix Runtime without actors_
|
|||
```toml
|
||||
# Cargo.toml
|
||||
anyhow = "1.0"
|
||||
actix-web = "3.0.2"
|
||||
actix-ws = "0.1.0"
|
||||
actix-web = "4.0.1"
|
||||
actix-ws = "0.2.0"
|
||||
```
|
||||
|
||||
```rust
|
||||
|
|
62
src/fut.rs
62
src/fut.rs
|
@ -1,23 +1,24 @@
|
|||
use actix_codec::{Decoder, Encoder};
|
||||
use actix_http::{
|
||||
ws::{Codec, Frame, Message, ProtocolError},
|
||||
Payload,
|
||||
};
|
||||
use actix_web::Error;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_util::stream::{Stream, StreamExt};
|
||||
use actix_web::{
|
||||
web::{Bytes, BytesMut},
|
||||
Error,
|
||||
};
|
||||
use futures_core::stream::Stream;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
/// A response body for Websocket HTTP Requests
|
||||
#[pin_project::pin_project]
|
||||
pub struct StreamingBody {
|
||||
#[pin]
|
||||
session_rx: Receiver<Message>,
|
||||
|
||||
messages: VecDeque<Message>,
|
||||
|
@ -29,9 +30,7 @@ pub struct StreamingBody {
|
|||
/// A stream of Messages from a websocket client
|
||||
///
|
||||
/// Messages can be accessed via the stream's `.next()` method
|
||||
#[pin_project::pin_project]
|
||||
pub struct MessageStream {
|
||||
#[pin]
|
||||
payload: Payload,
|
||||
|
||||
messages: VecDeque<Message>,
|
||||
|
@ -66,13 +65,34 @@ impl MessageStream {
|
|||
/// Wait for the next item from the message stream
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// while let Some(Ok(msg)) = stream.next().await {
|
||||
/// while let Some(Ok(msg)) = stream.recv().await {
|
||||
/// // handle message
|
||||
/// }
|
||||
/// ```
|
||||
#[allow(clippy::should_implement_trait)]
|
||||
pub async fn next(&'_ mut self) -> Option<Result<Message, ProtocolError>> {
|
||||
StreamExt::next(self).await
|
||||
pub async fn recv(&mut self) -> Option<Result<Message, ProtocolError>> {
|
||||
poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
|
||||
}
|
||||
}
|
||||
|
||||
struct PollFn<F>(F);
|
||||
|
||||
impl<F> Unpin for PollFn<F> {}
|
||||
|
||||
fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
PollFn(f)
|
||||
}
|
||||
|
||||
impl<F, T> Future for PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
(&mut self.0)(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,9 +100,9 @@ impl Stream for StreamingBody {
|
|||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
let mut this = self.get_mut();
|
||||
|
||||
if *this.closing {
|
||||
if this.closing {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
|
@ -92,7 +112,7 @@ impl Stream for StreamingBody {
|
|||
this.messages.push_back(msg);
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
*this.closing = true;
|
||||
this.closing = true;
|
||||
break;
|
||||
}
|
||||
Poll::Pending => break,
|
||||
|
@ -100,7 +120,7 @@ impl Stream for StreamingBody {
|
|||
}
|
||||
|
||||
while let Some(msg) = this.messages.pop_front() {
|
||||
if let Err(e) = this.codec.encode(msg, this.buf) {
|
||||
if let Err(e) = this.codec.encode(msg, &mut this.buf) {
|
||||
return Poll::Ready(Some(Err(e.into())));
|
||||
}
|
||||
}
|
||||
|
@ -117,7 +137,7 @@ impl Stream for MessageStream {
|
|||
type Item = Result<Message, ProtocolError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
let mut this = self.get_mut();
|
||||
|
||||
// Return the first message in the queue if one exists
|
||||
//
|
||||
|
@ -126,7 +146,7 @@ impl Stream for MessageStream {
|
|||
return Poll::Ready(Some(Ok(msg)));
|
||||
}
|
||||
|
||||
if !*this.closing {
|
||||
if !this.closing {
|
||||
// Read in bytes until there's nothing left to read
|
||||
loop {
|
||||
match Pin::new(&mut this.payload).poll_next(cx) {
|
||||
|
@ -140,7 +160,7 @@ impl Stream for MessageStream {
|
|||
)))));
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
*this.closing = true;
|
||||
this.closing = true;
|
||||
break;
|
||||
}
|
||||
Poll::Pending => break,
|
||||
|
@ -149,7 +169,7 @@ impl Stream for MessageStream {
|
|||
}
|
||||
|
||||
// Create messages until there's no more bytes left
|
||||
while let Some(frame) = this.codec.decode(this.buf)? {
|
||||
while let Some(frame) = this.codec.decode(&mut this.buf)? {
|
||||
let message = match frame {
|
||||
Frame::Text(bytes) => {
|
||||
let s = std::str::from_utf8(&bytes)
|
||||
|
@ -175,7 +195,7 @@ impl Stream for MessageStream {
|
|||
}
|
||||
|
||||
// If we've exhausted our message queue and we're closing, close the stream
|
||||
if *this.closing {
|
||||
if this.closing {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use actix_http::ws::{CloseReason, Message};
|
||||
use bytes::Bytes;
|
||||
use actix_web::web::Bytes;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
|
@ -16,10 +16,17 @@ pub struct Session {
|
|||
}
|
||||
|
||||
/// The error representing a closed websocket session
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Session is closed")]
|
||||
#[derive(Debug)]
|
||||
pub struct Closed;
|
||||
|
||||
impl std::fmt::Display for Closed {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Session is closed")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Closed {}
|
||||
|
||||
impl Session {
|
||||
pub(super) fn new(inner: Sender<Message>) -> Self {
|
||||
Session {
|
||||
|
|
Loading…
Reference in a new issue