diff --git a/src/fut.rs b/src/fut.rs index c17f387..eb7da84 100644 --- a/src/fut.rs +++ b/src/fut.rs @@ -4,7 +4,7 @@ use actix_http::{ }; use actix_web::Error; use bytes::{Bytes, BytesMut}; -use futures::stream::Stream; +use futures::stream::{Stream, StreamExt}; use std::{ collections::VecDeque, io, @@ -14,6 +14,7 @@ use std::{ use tokio::sync::mpsc::Receiver; use tokio_util::codec::{Decoder, Encoder}; +/// A response body for Websocket HTTP Requests #[pin_project::pin_project] pub struct StreamingBody { #[pin] @@ -25,6 +26,9 @@ pub struct StreamingBody { closing: bool, } +/// A stream of Messages from a websocket client +/// +/// Messages can be accessed via the stream's `.next()` method #[pin_project::pin_project] pub struct MessageStream { #[pin] @@ -58,6 +62,17 @@ impl MessageStream { closing: false, } } + + /// Wait for the next item from the message stream + /// + /// ```rust,ignore + /// while let Some(Ok(msg)) = stream.next().await { + /// // handle message + /// } + /// ``` + pub async fn next(&'_ mut self) -> Option> { + StreamExt::next(self).await + } } impl Stream for StreamingBody { diff --git a/src/lib.rs b/src/lib.rs index 8ecbd6c..cbb0abf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,14 @@ +#![deny(missing_docs)] + +//! Websockets without Actors for actix runtimes +//! +//! See documentation for the [`handle`] method for usage + use actix_http::ws::handshake; use actix_web::{web, HttpRequest, HttpResponse}; use tokio::sync::mpsc::channel; -pub use actix_http::ws::Message; +pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError}; mod fut; mod session; @@ -12,6 +18,48 @@ pub use self::{ session::{Closed, Session}, }; +/// Begin handling websocket traffic +/// +/// ```rust,ignore +/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +/// use actix_ws::Message; +/// +/// async fn ws(req: HttpRequest, body: web::Payload) -> Result { +/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; +/// +/// actix_rt::spawn(async move { +/// while let Some(Ok(msg)) = msg_stream.next() { +/// match msg { +/// Message::Ping(bytes) => { +/// if session.pong(&bytes).await.is_err() { +/// return; +/// } +/// } +/// Message::Text(s) => println!("Got text, {}", s), +/// _ => break, +/// } +/// } +/// +/// let _ = session.close(None).await; +/// }); +/// +/// Ok(response) +/// } +/// +/// #[actix_rt::main] +/// async fn main() -> Result<(), anyhow::Error> { +/// HttpServer::new(move || { +/// App::new() +/// .wrap(Logger::default()) +/// .route("/ws", web::get().to(ws)) +/// }) +/// .bind("127.0.0.1:8080")? +/// .run() +/// .await?; +/// +/// Ok(()) +/// } +/// ``` pub fn handle( req: &HttpRequest, body: web::Payload, diff --git a/src/session.rs b/src/session.rs index 3edef80..03bffca 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,12 +6,16 @@ use std::sync::{ }; use tokio::sync::mpsc::Sender; +/// A handle into the websocket session. +/// +/// This type can be used to send messages into the websocket. #[derive(Clone)] pub struct Session { inner: Option>, closed: Arc, } +/// The error representing a closed websocket session #[derive(Debug, thiserror::Error)] #[error("Session is closed")] pub struct Closed; @@ -30,6 +34,13 @@ impl Session { } } + /// Send text into the websocket + /// + /// ```rust,ignore + /// if session.text("Some text").await.is_err() { + /// // session closed + /// } + /// ``` pub async fn text(&mut self, msg: T) -> Result<(), Closed> where T: Into, @@ -45,6 +56,13 @@ impl Session { } } + /// Send raw bytes into the websocket + /// + /// ```rust,ignore + /// if session.binary(b"some bytes").await.is_err() { + /// // session closed + /// } + /// ``` pub async fn binary(&mut self, msg: T) -> Result<(), Closed> where T: Into, @@ -60,6 +78,16 @@ impl Session { } } + /// Ping the client + /// + /// For many applications, it will be important to send regular pings to keep track of if the + /// client has disconnected + /// + /// ```rust,ignore + /// if session.ping(b"").await.is_err() { + /// // session is closed + /// } + /// ``` pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> { self.pre_check(); if let Some(inner) = self.inner.as_mut() { @@ -72,6 +100,15 @@ impl Session { } } + /// Pong the client + /// + /// ```rust,ignore + /// match msg { + /// Message::Ping(bytes) => { + /// let _ = session.pong(&bytes).await; + /// } + /// _ => (), + /// } pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> { self.pre_check(); if let Some(inner) = self.inner.as_mut() { @@ -84,7 +121,14 @@ impl Session { } } - pub async fn close(&mut self, reason: Option) -> Result<(), Closed> { + /// Send a close message, and consume the session + /// + /// All clones will return `Err(Closed)` if used after this call + /// + /// ```rust,ignore + /// session.close(None).await + /// ``` + pub async fn close(mut self, reason: Option) -> Result<(), Closed> { self.pre_check(); if let Some(mut inner) = self.inner.take() { self.closed.store(true, Ordering::Relaxed);