diff --git a/Cargo.lock b/Cargo.lock index ffc7886..c360ce0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -401,7 +401,6 @@ dependencies = [ "dashmap", "dotenv", "flume", - "futures-util", "http-signature-normalization-actix", "http-signature-normalization-reqwest", "lru", @@ -427,6 +426,7 @@ dependencies = [ "serde", "serde_json", "sled", + "streem", "teloxide", "thiserror", "time", @@ -3064,6 +3064,16 @@ dependencies = [ "der", ] +[[package]] +name = "streem" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56bcbf3c75c402c87b9a1947202c83bbc566ba9f4b788ee3d39db75f20c840a2" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "string_cache" version = "0.8.7" diff --git a/Cargo.toml b/Cargo.toml index 370982e..f3e58d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,6 @@ console-subscriber = { version = "0.1", optional = true } dashmap = "5.1.0" dotenv = "0.15.0" flume = "0.11.0" -futures-util = "0.3.17" lru = "0.11.0" metrics = "0.21.0" metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [ @@ -80,6 +79,7 @@ tracing-subscriber = { version = "0.3", features = [ ] } tokio = { version = "1", features = ["macros", "sync"] } uuid = { version = "1", features = ["v4", "serde"] } +streem = "0.1.0" [dependencies.background-jobs] version = "0.15.0" diff --git a/src/extractors.rs b/src/extractors.rs index 3226398..385ce9a 100644 --- a/src/extractors.rs +++ b/src/extractors.rs @@ -9,12 +9,11 @@ use actix_web::{ FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, }; use bcrypt::{BcryptError, DEFAULT_COST}; -use futures_util::future::LocalBoxFuture; use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn}; use std::{convert::Infallible, str::FromStr, time::Instant}; use tracing_error::SpanTrace; -use crate::{db::Db, spawner::Spawner}; +use crate::{db::Db, future::LocalBoxFuture, spawner::Spawner}; #[derive(Clone)] pub(crate) struct AdminConfig { diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..905a931 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,3 @@ +use std::{future::Future, pin::Pin}; + +pub(crate) type LocalBoxFuture<'a, T> = Pin + 'a>>; diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index fc9107c..f932a12 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -1,10 +1,10 @@ use crate::{ error::Error, + future::LocalBoxFuture, jobs::{debug_object, Deliver, JobState}, }; use activitystreams::iri_string::types::IriString; use background_jobs::ActixJob; -use futures_util::future::LocalBoxFuture; #[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct DeliverMany { diff --git a/src/main.rs b/src/main.rs index eaa6d3d..0373afb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,7 @@ mod data; mod db; mod error; mod extractors; +mod future; mod jobs; mod middleware; mod requests; diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index bae7774..c23a271 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -4,14 +4,11 @@ use actix_web::{ web::BytesMut, HttpMessage, }; -use futures_util::{ - future::TryFutureExt, - stream::{once, TryStreamExt}, -}; use std::{ future::{ready, Ready}, task::{Context, Poll}, }; +use streem::IntoStreamer; #[derive(Clone, Debug)] pub(crate) struct DebugPayload(pub bool); @@ -53,19 +50,23 @@ where fn call(&self, mut req: ServiceRequest) -> Self::Future { if self.0 && req.method() == Method::POST { - let pl = req.take_payload(); + let mut pl = req.take_payload().into_streamer(); + req.set_payload(Payload::Stream { - payload: Box::pin(once( - pl.try_fold(BytesMut::new(), |mut acc, bytes| async { - acc.extend(bytes); - Ok(acc) - }) - .map_ok(|bytes| { - let bytes = bytes.freeze(); - tracing::info!("{}", String::from_utf8_lossy(&bytes)); - bytes - }), - )), + payload: Box::pin(streem::try_from_fn(|yielder| async move { + let mut buf = BytesMut::new(); + + while let Some(bytes) = pl.try_next().await? { + buf.extend(bytes); + } + + let bytes = buf.freeze(); + tracing::info!("{}", String::from_utf8_lossy(&bytes)); + + yielder.yield_ok(bytes).await; + + Ok(()) + })), }); self.1.call(req) diff --git a/src/middleware/webfinger.rs b/src/middleware/webfinger.rs index 9ccbe0e..0590408 100644 --- a/src/middleware/webfinger.rs +++ b/src/middleware/webfinger.rs @@ -1,10 +1,10 @@ use crate::{ config::{Config, UrlKind}, data::State, + future::LocalBoxFuture, }; use actix_web::web::Data; use actix_webfinger::{Resolver, Webfinger}; -use futures_util::future::LocalBoxFuture; use rsa_magic_public_key::AsMagicPublicKey; pub(crate) struct RelayResolver;