Add some basic docs
This commit is contained in:
parent
e899ca2d12
commit
e92d6969ee
17
src/fut.rs
17
src/fut.rs
|
@ -4,7 +4,7 @@ use actix_http::{
|
||||||
};
|
};
|
||||||
use actix_web::Error;
|
use actix_web::Error;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::stream::Stream;
|
use futures::stream::{Stream, StreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
io,
|
io,
|
||||||
|
@ -14,6 +14,7 @@ use std::{
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio_util::codec::{Decoder, Encoder};
|
use tokio_util::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
/// A response body for Websocket HTTP Requests
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct StreamingBody {
|
pub struct StreamingBody {
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -25,6 +26,9 @@ pub struct StreamingBody {
|
||||||
closing: bool,
|
closing: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A stream of Messages from a websocket client
|
||||||
|
///
|
||||||
|
/// Messages can be accessed via the stream's `.next()` method
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct MessageStream {
|
pub struct MessageStream {
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -58,6 +62,17 @@ impl MessageStream {
|
||||||
closing: false,
|
closing: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for the next item from the message stream
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// while let Some(Ok(msg)) = stream.next().await {
|
||||||
|
/// // handle message
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub async fn next(&'_ mut self) -> Option<Result<Message, ProtocolError>> {
|
||||||
|
StreamExt::next(self).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for StreamingBody {
|
impl Stream for StreamingBody {
|
||||||
|
|
50
src/lib.rs
50
src/lib.rs
|
@ -1,8 +1,14 @@
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
|
//! Websockets without Actors for actix runtimes
|
||||||
|
//!
|
||||||
|
//! See documentation for the [`handle`] method for usage
|
||||||
|
|
||||||
use actix_http::ws::handshake;
|
use actix_http::ws::handshake;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use tokio::sync::mpsc::channel;
|
use tokio::sync::mpsc::channel;
|
||||||
|
|
||||||
pub use actix_http::ws::Message;
|
pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError};
|
||||||
|
|
||||||
mod fut;
|
mod fut;
|
||||||
mod session;
|
mod session;
|
||||||
|
@ -12,6 +18,48 @@ pub use self::{
|
||||||
session::{Closed, Session},
|
session::{Closed, Session},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Begin handling websocket traffic
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
||||||
|
/// use actix_ws::Message;
|
||||||
|
///
|
||||||
|
/// async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
|
||||||
|
/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
|
||||||
|
///
|
||||||
|
/// actix_rt::spawn(async move {
|
||||||
|
/// while let Some(Ok(msg)) = msg_stream.next() {
|
||||||
|
/// match msg {
|
||||||
|
/// Message::Ping(bytes) => {
|
||||||
|
/// if session.pong(&bytes).await.is_err() {
|
||||||
|
/// return;
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// Message::Text(s) => println!("Got text, {}", s),
|
||||||
|
/// _ => break,
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// let _ = session.close(None).await;
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// Ok(response)
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// #[actix_rt::main]
|
||||||
|
/// async fn main() -> Result<(), anyhow::Error> {
|
||||||
|
/// HttpServer::new(move || {
|
||||||
|
/// App::new()
|
||||||
|
/// .wrap(Logger::default())
|
||||||
|
/// .route("/ws", web::get().to(ws))
|
||||||
|
/// })
|
||||||
|
/// .bind("127.0.0.1:8080")?
|
||||||
|
/// .run()
|
||||||
|
/// .await?;
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub fn handle(
|
pub fn handle(
|
||||||
req: &HttpRequest,
|
req: &HttpRequest,
|
||||||
body: web::Payload,
|
body: web::Payload,
|
||||||
|
|
|
@ -6,12 +6,16 @@ use std::sync::{
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
/// A handle into the websocket session.
|
||||||
|
///
|
||||||
|
/// This type can be used to send messages into the websocket.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
inner: Option<Sender<Message>>,
|
inner: Option<Sender<Message>>,
|
||||||
closed: Arc<AtomicBool>,
|
closed: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The error representing a closed websocket session
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
#[error("Session is closed")]
|
#[error("Session is closed")]
|
||||||
pub struct Closed;
|
pub struct Closed;
|
||||||
|
@ -30,6 +34,13 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send text into the websocket
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// if session.text("Some text").await.is_err() {
|
||||||
|
/// // session closed
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub async fn text<T>(&mut self, msg: T) -> Result<(), Closed>
|
pub async fn text<T>(&mut self, msg: T) -> Result<(), Closed>
|
||||||
where
|
where
|
||||||
T: Into<String>,
|
T: Into<String>,
|
||||||
|
@ -45,6 +56,13 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send raw bytes into the websocket
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// if session.binary(b"some bytes").await.is_err() {
|
||||||
|
/// // session closed
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub async fn binary<T>(&mut self, msg: T) -> Result<(), Closed>
|
pub async fn binary<T>(&mut self, msg: T) -> Result<(), Closed>
|
||||||
where
|
where
|
||||||
T: Into<Bytes>,
|
T: Into<Bytes>,
|
||||||
|
@ -60,6 +78,16 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ping the client
|
||||||
|
///
|
||||||
|
/// For many applications, it will be important to send regular pings to keep track of if the
|
||||||
|
/// client has disconnected
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// if session.ping(b"").await.is_err() {
|
||||||
|
/// // session is closed
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
||||||
self.pre_check();
|
self.pre_check();
|
||||||
if let Some(inner) = self.inner.as_mut() {
|
if let Some(inner) = self.inner.as_mut() {
|
||||||
|
@ -72,6 +100,15 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pong the client
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// match msg {
|
||||||
|
/// Message::Ping(bytes) => {
|
||||||
|
/// let _ = session.pong(&bytes).await;
|
||||||
|
/// }
|
||||||
|
/// _ => (),
|
||||||
|
/// }
|
||||||
pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
||||||
self.pre_check();
|
self.pre_check();
|
||||||
if let Some(inner) = self.inner.as_mut() {
|
if let Some(inner) = self.inner.as_mut() {
|
||||||
|
@ -84,7 +121,14 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn close(&mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
|
/// Send a close message, and consume the session
|
||||||
|
///
|
||||||
|
/// All clones will return `Err(Closed)` if used after this call
|
||||||
|
///
|
||||||
|
/// ```rust,ignore
|
||||||
|
/// session.close(None).await
|
||||||
|
/// ```
|
||||||
|
pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
|
||||||
self.pre_check();
|
self.pre_check();
|
||||||
if let Some(mut inner) = self.inner.take() {
|
if let Some(mut inner) = self.inner.take() {
|
||||||
self.closed.store(true, Ordering::Relaxed);
|
self.closed.store(true, Ordering::Relaxed);
|
||||||
|
|
Loading…
Reference in a new issue