From 64c44e05035db3f3bf8e32ae3492f5064174a418 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Wed, 9 Mar 2022 12:28:11 -0600 Subject: [PATCH] Update to latest actix web --- Cargo.toml | 21 +++++++++------------ examples/chat/Cargo.toml | 8 ++++---- examples/chat/src/main.rs | 8 ++++---- src/fut.rs | 10 +++++----- src/lib.rs | 9 +++++++-- src/session.rs | 4 ++-- 6 files changed, 31 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b4788f..442a9e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "actix-ws" description = "Websockets for the Actix runtime, without Actors" -version = "0.1.0" +version = "0.2.0" authors = ["asonix "] readme = "README.md" repository = "https://git.asonix.dog/asonix/actix-actorless-websockets" @@ -10,17 +10,14 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [workspace] -members = [ - "examples/chat" -] +members = ["examples/chat"] [dependencies] -actix-http = { version = "2.0.0", default-features = false } -actix-web = { version = "3.0.2", default-features = false } -anyhow = "1.0" -bytes = "0.5" -futures = "0.3" -pin-project = "0.4.9" +actix-http = { version = "3.0", default-features = false, features = ["ws"] } +actix-web = { version = "4.0", default-features = false } +bytes = "1.0" +futures-util = "0.3" +pin-project = "1" thiserror = "1.0" -tokio = { version = "0.2", features = ["sync", "stream", "io-util"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "1", features = ["sync", "io-util"] } +tokio-util = { version = "0.7", features = ["codec"] } diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml index f89da13..c6ffa90 100644 --- a/examples/chat/Cargo.toml +++ b/examples/chat/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-rt = "1.1.1" -actix-web = "3.0.2" -actix-ws = { version = "0.1.0", path = "../.." } +actix-rt = "2.6" +actix-web = "4.0.1" +actix-ws = { version = "0.2.0", path = "../.." } anyhow = "1.0" futures = "0.3" log = "0.4" pretty_env_logger = "0.4" -tokio = { version = "0.2", features = ["sync"] } +tokio = { version = "1", features = ["sync"] } diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index 26823d7..ac90a3f 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -72,8 +72,7 @@ async fn ws( break; } - if Instant::now().duration_since(alive2.lock().await.clone()) > Duration::from_secs(10) - { + if Instant::now().duration_since(*alive2.lock().await) > Duration::from_secs(10) { let _ = session2.close(None).await; break; } @@ -90,7 +89,8 @@ async fn ws( } Message::Text(s) => { info!("Relaying text, {}", s); - chat.send(s).await; + let s: &str = s.as_ref(); + chat.send(s.into()).await; } Message::Close(reason) => { let _ = session.close(reason).await; @@ -195,7 +195,7 @@ async fn main() -> Result<(), anyhow::Error> { HttpServer::new(move || { App::new() .wrap(Logger::default()) - .data(chat.clone()) + .app_data(web::Data::new(chat.clone())) .route("/", web::get().to(index)) .route("/ws", web::get().to(ws)) }) diff --git a/src/fut.rs b/src/fut.rs index 358f502..d9a7402 100644 --- a/src/fut.rs +++ b/src/fut.rs @@ -4,7 +4,7 @@ use actix_http::{ }; use actix_web::Error; use bytes::{Bytes, BytesMut}; -use futures::stream::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; use std::{ collections::VecDeque, io, @@ -87,7 +87,7 @@ impl Stream for StreamingBody { } loop { - match Pin::new(&mut this.session_rx).poll_next(cx) { + match Pin::new(&mut this.session_rx).poll_recv(cx) { Poll::Ready(Some(msg)) => { this.messages.push_back(msg); } @@ -100,7 +100,7 @@ impl Stream for StreamingBody { } while let Some(msg) = this.messages.pop_front() { - if let Err(e) = this.codec.encode(msg, &mut this.buf) { + if let Err(e) = this.codec.encode(msg, this.buf) { return Poll::Ready(Some(Err(e.into()))); } } @@ -149,7 +149,7 @@ impl Stream for MessageStream { } // Create messages until there's no more bytes left - while let Some(frame) = this.codec.decode(&mut this.buf)? { + while let Some(frame) = this.codec.decode(this.buf)? { let message = match frame { Frame::Text(bytes) => { let s = std::str::from_utf8(&bytes) @@ -157,7 +157,7 @@ impl Stream for MessageStream { ProtocolError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())) })? .to_string(); - Message::Text(s) + Message::Text(s.into()) } Frame::Binary(bytes) => Message::Binary(bytes), Frame::Ping(bytes) => Message::Ping(bytes), diff --git a/src/lib.rs b/src/lib.rs index 174f0f7..3106a75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,10 @@ //! //! See documentation for the [`handle`] method for usage -use actix_http::ws::handshake; +use actix_http::{ + body::{BodyStream, MessageBody}, + ws::handshake, +}; use actix_web::{web, HttpRequest, HttpResponse}; use tokio::sync::mpsc::channel; @@ -68,7 +71,9 @@ pub fn handle( let (tx, rx) = channel(32); Ok(( - response.streaming(StreamingBody::new(rx)), + response + .message_body(BodyStream::new(StreamingBody::new(rx)).boxed())? + .into(), Session::new(tx), MessageStream::new(body.into_inner()), )) diff --git a/src/session.rs b/src/session.rs index 03bffca..50dfea6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -48,7 +48,7 @@ impl Session { self.pre_check(); if let Some(inner) = self.inner.as_mut() { inner - .send(Message::Text(msg.into())) + .send(Message::Text(msg.into().into())) .await .map_err(|_| Closed) } else { @@ -130,7 +130,7 @@ impl Session { /// ``` pub async fn close(mut self, reason: Option) -> Result<(), Closed> { self.pre_check(); - if let Some(mut inner) = self.inner.take() { + if let Some(inner) = self.inner.take() { self.closed.store(true, Ordering::Relaxed); inner.send(Message::Close(reason)).await.map_err(|_| Closed) } else {