relay/src/inbox.rs

341 lines
9.1 KiB
Rust
Raw Normal View History

2020-03-15 17:49:27 +00:00
use activitystreams::{
2020-03-15 22:37:53 +00:00
activity::apub::{Accept, Announce, Follow, Undo},
context,
2020-03-15 17:49:27 +00:00
primitives::XsdAnyUri,
};
2020-03-15 02:05:40 +00:00
use actix::Addr;
2020-03-16 03:36:46 +00:00
use actix_web::{client::Client, web, HttpResponse};
2020-03-15 17:49:27 +00:00
use futures::join;
2020-03-15 22:37:53 +00:00
use log::error;
2020-03-15 02:05:40 +00:00
use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes},
2020-03-15 17:49:27 +00:00
db_actor::{DbActor, DbQuery, Pool},
2020-03-16 03:36:46 +00:00
error::MyError,
2020-03-15 22:37:53 +00:00
state::{State, UrlKind},
2020-03-15 02:05:40 +00:00
};
pub async fn inbox(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
client: web::Data<Client>,
input: web::Json<AcceptedObjects>,
2020-03-15 22:37:53 +00:00
) -> Result<HttpResponse, MyError> {
2020-03-15 02:05:40 +00:00
let input = input.into_inner();
2020-03-15 22:37:53 +00:00
let actor = fetch_actor(state.clone(), &client, &input.actor).await?;
2020-03-15 02:05:40 +00:00
match input.kind {
2020-03-15 22:37:53 +00:00
ValidTypes::Announce | ValidTypes::Create => {
handle_relay(state, client, input, actor).await
}
ValidTypes::Follow => handle_follow(db_actor, state, client, input, actor).await,
ValidTypes::Delete | ValidTypes::Update => {
handle_forward(state, client, input, actor).await
}
ValidTypes::Undo => handle_undo(db_actor, state, client, input, actor).await,
2020-03-15 02:05:40 +00:00
}
2020-03-15 22:37:53 +00:00
}
2020-03-15 02:05:40 +00:00
2020-03-15 22:37:53 +00:00
pub fn response<T>(item: T) -> HttpResponse
where
T: serde::ser::Serialize,
{
HttpResponse::Accepted()
.content_type("application/activity+json")
.json(item)
}
async fn handle_undo(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") {
2020-03-16 03:36:46 +00:00
return Err(MyError::Kind);
2020-03-15 22:37:53 +00:00
}
let inbox = actor.inbox().to_owned();
let state2 = state.clone().into_inner();
db_actor.do_send(DbQuery(move |pool: Pool| {
let inbox = inbox.clone();
async move {
let conn = pool.get().await?;
state2.remove_listener(&conn, &inbox).await.map_err(|e| {
error!("Error removing listener, {}", e);
e
})
}
}));
let mut undo = Undo::default();
let mut follow = Follow::default();
follow
.object_props
.set_id(state.generate_url(UrlKind::Activity))?;
follow
.follow_props
.set_actor_xsd_any_uri(actor.id.clone())?
.set_object_xsd_any_uri(actor.id.clone())?;
undo.object_props
.set_id(state.generate_url(UrlKind::Activity))?
.set_many_to_xsd_any_uris(vec![actor.id.clone()])?
.set_context_xsd_any_uri(context())?;
undo.undo_props
.set_object_object_box(follow)?
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
if input.object.child_object_is_actor() {
let undo2 = undo.clone();
let client = client.into_inner();
actix::Arbiter::spawn(async move {
2020-03-16 03:36:46 +00:00
let _ = deliver(&state.into_inner(), &client, actor.id, &undo2).await;
2020-03-15 22:37:53 +00:00
});
}
Ok(response(undo))
}
async fn handle_forward(
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
let inboxes = get_inboxes(&state, &actor, &object_id).await?;
2020-03-16 03:36:46 +00:00
deliver_many(state, client, inboxes, input.clone());
2020-03-15 22:37:53 +00:00
Ok(response(input))
2020-03-15 22:37:53 +00:00
}
async fn handle_relay(
state: web::Data<State>,
client: web::Data<Client>,
input: AcceptedObjects,
actor: AcceptedActors,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
if state.is_cached(object_id).await {
2020-03-16 03:36:46 +00:00
return Err(MyError::Duplicate);
2020-03-15 22:37:53 +00:00
}
let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?;
let mut announce = Announce::default();
announce
.object_props
.set_context_xsd_any_uri(context())?
.set_many_to_xsd_any_uris(vec![state.generate_url(UrlKind::Followers)])?
.set_id(activity_id.clone())?;
announce
.announce_props
.set_object_xsd_any_uri(object_id.clone())?
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
let inboxes = get_inboxes(&state, &actor, &object_id).await?;
state.cache(object_id.to_owned(), activity_id).await;
2020-03-16 03:36:46 +00:00
deliver_many(state, client, inboxes, announce.clone());
Ok(response(announce))
2020-03-15 17:49:27 +00:00
}
async fn handle_follow(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
2020-03-15 22:37:53 +00:00
client: web::Data<Client>,
2020-03-15 17:49:27 +00:00
input: AcceptedObjects,
actor: AcceptedActors,
2020-03-15 22:37:53 +00:00
) -> Result<HttpResponse, MyError> {
2020-03-15 17:49:27 +00:00
let (is_listener, is_blocked, is_whitelisted) = join!(
state.is_listener(&actor.id),
state.is_blocked(&actor.id),
state.is_whitelisted(&actor.id)
);
if is_blocked {
error!("Follow from blocked listener, {}", actor.id);
2020-03-16 03:36:46 +00:00
return Err(MyError::Blocked);
2020-03-15 17:49:27 +00:00
}
if !is_whitelisted {
error!("Follow from non-whitelisted listener, {}", actor.id);
2020-03-16 03:36:46 +00:00
return Err(MyError::Whitelist);
2020-03-15 17:49:27 +00:00
}
if !is_listener {
2020-03-15 22:37:53 +00:00
let state = state.clone().into_inner();
2020-03-15 17:49:27 +00:00
2020-03-15 22:37:53 +00:00
let inbox = actor.inbox().to_owned();
2020-03-15 17:49:27 +00:00
db_actor.do_send(DbQuery(move |pool: Pool| {
2020-03-15 22:37:53 +00:00
let inbox = inbox.clone();
2020-03-15 17:49:27 +00:00
let state = state.clone();
async move {
let conn = pool.get().await?;
2020-03-15 22:37:53 +00:00
state.add_listener(&conn, inbox).await.map_err(|e| {
error!("Error adding listener, {}", e);
e
})
2020-03-15 17:49:27 +00:00
}
}));
}
2020-03-15 22:37:53 +00:00
let actor_inbox = actor.inbox().clone();
2020-03-15 17:49:27 +00:00
let mut accept = Accept::default();
let mut follow = Follow::default();
follow.object_props.set_id(input.id)?;
follow
.follow_props
2020-03-15 22:37:53 +00:00
.set_object_xsd_any_uri(state.generate_url(UrlKind::Actor))?
2020-03-15 17:49:27 +00:00
.set_actor_xsd_any_uri(actor.id.clone())?;
accept
.object_props
2020-03-15 22:37:53 +00:00
.set_id(state.generate_url(UrlKind::Activity))?
2020-03-15 17:49:27 +00:00
.set_many_to_xsd_any_uris(vec![actor.id])?;
accept
.accept_props
.set_object_object_box(follow)?
2020-03-15 22:37:53 +00:00
.set_actor_xsd_any_uri(state.generate_url(UrlKind::Actor))?;
2020-03-15 17:49:27 +00:00
2020-03-15 22:37:53 +00:00
let client = client.into_inner();
let accept2 = accept.clone();
actix::Arbiter::spawn(async move {
2020-03-16 03:36:46 +00:00
let _ = deliver(&state.into_inner(), &client, actor_inbox, &accept2).await;
2020-03-15 22:37:53 +00:00
});
Ok(response(accept))
2020-03-15 02:05:40 +00:00
}
2020-03-15 16:29:01 +00:00
async fn fetch_actor(
state: web::Data<State>,
2020-03-15 22:37:53 +00:00
client: &web::Data<Client>,
2020-03-15 16:29:01 +00:00
actor_id: &XsdAnyUri,
) -> Result<AcceptedActors, MyError> {
if let Some(actor) = state.get_actor(actor_id).await {
return Ok(actor);
}
let actor: AcceptedActors = client
2020-03-15 17:49:27 +00:00
.get(actor_id.as_str())
2020-03-15 02:05:40 +00:00
.header("Accept", "application/activity+json")
.send()
.await
2020-03-15 17:49:27 +00:00
.map_err(|e| {
2020-03-15 22:37:53 +00:00
error!("Couldn't send request to {} for actor, {}", actor_id, e);
2020-03-16 03:36:46 +00:00
MyError::SendRequest
2020-03-15 17:49:27 +00:00
})?
2020-03-15 02:05:40 +00:00
.json()
.await
2020-03-15 17:49:27 +00:00
.map_err(|e| {
2020-03-15 22:37:53 +00:00
error!("Coudn't fetch actor from {}, {}", actor_id, e);
2020-03-16 03:36:46 +00:00
MyError::ReceiveResponse
2020-03-15 17:49:27 +00:00
})?;
2020-03-15 16:29:01 +00:00
state.cache_actor(actor_id.to_owned(), actor.clone()).await;
Ok(actor)
2020-03-15 02:05:40 +00:00
}
2020-03-16 03:36:46 +00:00
fn deliver_many<T>(
state: web::Data<State>,
client: web::Data<Client>,
inboxes: Vec<XsdAnyUri>,
item: T,
) where
2020-03-15 22:37:53 +00:00
T: serde::ser::Serialize + 'static,
{
let client = client.into_inner();
2020-03-16 03:36:46 +00:00
let state = state.into_inner();
2020-03-15 22:37:53 +00:00
actix::Arbiter::spawn(async move {
use futures::stream::StreamExt;
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
2020-03-16 03:36:46 +00:00
unordered.push(deliver(&state, &client, inbox, &item));
2020-03-15 22:37:53 +00:00
}
while let Some(_) = unordered.next().await {}
});
}
async fn deliver<T>(
2020-03-16 03:36:46 +00:00
state: &std::sync::Arc<State>,
2020-03-15 22:37:53 +00:00
client: &std::sync::Arc<Client>,
inbox: XsdAnyUri,
item: &T,
) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
2020-03-16 03:36:46 +00:00
use http_signature_normalization_actix::prelude::*;
use sha2::{Digest, Sha256};
let config = Config::default();
let mut digest = Sha256::new();
let key_id = state.generate_url(UrlKind::Actor);
let item_string = serde_json::to_string(item)?;
2020-03-15 22:37:53 +00:00
let res = client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
2020-03-16 03:36:46 +00:00
.header("User-Agent", "Aode Relay v0.1.0")
.signature_with_digest(
&config,
&key_id,
&mut digest,
item_string,
|signing_string| state.sign(signing_string.as_bytes()),
)?
.send()
2020-03-15 22:37:53 +00:00
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
2020-03-16 03:36:46 +00:00
MyError::SendRequest
2020-03-15 22:37:53 +00:00
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
2020-03-16 03:36:46 +00:00
return Err(MyError::Status);
2020-03-15 22:37:53 +00:00
}
Ok(())
}
async fn get_inboxes(
state: &web::Data<State>,
actor: &AcceptedActors,
object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> {
2020-03-16 03:36:46 +00:00
let domain = object_id
.as_url()
.host()
.ok_or(MyError::Domain)?
.to_string();
2020-03-15 22:37:53 +00:00
let inbox = actor.inbox();
Ok(state.listeners_without(&inbox, &domain).await)
}