From e4c95a816874bbe5c92d93cd2806c8a758feed99 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 23 Mar 2020 17:17:53 -0500 Subject: [PATCH] More shuffling, store actors in db --- .../2020-03-23-175108_create-actors/down.sql | 4 + .../2020-03-23-175108_create-actors/up.sql | 49 ++++ .../2020-03-23-175637_create-nodes/down.sql | 2 + .../2020-03-23-175637_create-nodes/up.sql | 12 + src/config.rs | 12 +- src/data/actor.rs | 262 ++++++++++++++++++ src/data/mod.rs | 9 + src/{ => data}/node.rs | 0 src/{ => data}/state.rs | 9 +- src/db.rs | 11 +- src/error.rs | 3 + src/jobs/mod.rs | 14 +- src/main.rs | 16 +- src/middleware/verifier.rs | 86 +++--- src/middleware/webfinger.rs | 2 +- src/notify.rs | 29 +- src/requests.rs | 37 +-- src/routes/actor.rs | 2 +- src/routes/inbox.rs | 100 ++++--- src/routes/index.rs | 2 +- src/routes/nodeinfo.rs | 2 +- src/schema.rs | 29 ++ templates/index.rs.html | 2 +- 23 files changed, 535 insertions(+), 159 deletions(-) create mode 100644 migrations/2020-03-23-175108_create-actors/down.sql create mode 100644 migrations/2020-03-23-175108_create-actors/up.sql create mode 100644 migrations/2020-03-23-175637_create-nodes/down.sql create mode 100644 migrations/2020-03-23-175637_create-nodes/up.sql create mode 100644 src/data/actor.rs create mode 100644 src/data/mod.rs rename src/{ => data}/node.rs (100%) rename src/{ => data}/state.rs (96%) diff --git a/migrations/2020-03-23-175108_create-actors/down.sql b/migrations/2020-03-23-175108_create-actors/down.sql new file mode 100644 index 0000000..5d31fbb --- /dev/null +++ b/migrations/2020-03-23-175108_create-actors/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TRIGGER IF EXISTS actors_notify ON actors; +DROP FUNCTION IF EXISTS invoke_actors_trigger(); +DROP TABLE actors; diff --git a/migrations/2020-03-23-175108_create-actors/up.sql b/migrations/2020-03-23-175108_create-actors/up.sql new file mode 100644 index 0000000..bbd921c --- /dev/null +++ b/migrations/2020-03-23-175108_create-actors/up.sql @@ -0,0 +1,49 @@ +-- Your SQL goes here +CREATE TABLE actors ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + actor_id TEXT UNIQUE NOT NULL, + public_key TEXT NOT NULL, + public_key_id TEXT UNIQUE NOT NULL, + listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +SELECT diesel_manage_updated_at('actors'); + +CREATE OR REPLACE FUNCTION invoke_actors_trigger () + RETURNS TRIGGER + LANGUAGE plpgsql +AS $$ +DECLARE + rec RECORD; + channel TEXT; + payload TEXT; +BEGIN + case TG_OP + WHEN 'INSERT' THEN + rec := NEW; + channel := 'new_actors'; + payload := NEW.actor_id; + WHEN 'UPDATE' THEN + rec := NEW; + channel := 'new_actors'; + payload := NEW.actor_id; + WHEN 'DELETE' THEN + rec := OLD; + channel := 'rm_actors'; + payload := OLD.actor_id; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + + PERFORM pg_notify(channel, payload::TEXT); + RETURN rec; +END; +$$; + +CREATE TRIGGER actors_notify + AFTER INSERT OR UPDATE OR DELETE + ON actors +FOR EACH ROW + EXECUTE PROCEDURE invoke_actors_trigger(); diff --git a/migrations/2020-03-23-175637_create-nodes/down.sql b/migrations/2020-03-23-175637_create-nodes/down.sql new file mode 100644 index 0000000..7d3304f --- /dev/null +++ b/migrations/2020-03-23-175637_create-nodes/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE nodes; diff --git a/migrations/2020-03-23-175637_create-nodes/up.sql b/migrations/2020-03-23-175637_create-nodes/up.sql new file mode 100644 index 0000000..a821864 --- /dev/null +++ b/migrations/2020-03-23-175637_create-nodes/up.sql @@ -0,0 +1,12 @@ +-- Your SQL goes here +CREATE TABLE nodes ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE, + nodeinfo JSONB, + instance JSONB, + contact JSONB, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +SELECT diesel_manage_updated_at('nodes'); diff --git a/src/config.rs b/src/config.rs index 5420e63..edeacb4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use crate::{error::MyError, middleware::MyVerify, requests::Requests}; +use crate::{data::ActorCache, error::MyError, middleware::MyVerify, requests::Requests}; use config::Environment; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use sha2::{Digest, Sha256}; @@ -71,11 +71,15 @@ impl Config { } } - pub fn signature_middleware(&self, requests: Requests) -> VerifySignature { + pub fn signature_middleware( + &self, + requests: Requests, + actors: ActorCache, + ) -> VerifySignature { if self.validate_signatures { - VerifySignature::new(MyVerify(requests), Default::default()) + VerifySignature::new(MyVerify(requests, actors), Default::default()) } else { - VerifySignature::new(MyVerify(requests), Default::default()).optional() + VerifySignature::new(MyVerify(requests, actors), Default::default()).optional() } } diff --git a/src/data/actor.rs b/src/data/actor.rs new file mode 100644 index 0000000..d69f773 --- /dev/null +++ b/src/data/actor.rs @@ -0,0 +1,262 @@ +use crate::{apub::AcceptedActors, db::Db, error::MyError, requests::Requests}; +use activitystreams::primitives::XsdAnyUri; +use log::error; +use std::{collections::HashSet, sync::Arc, time::Duration}; +use tokio::sync::RwLock; +use ttl_cache::TtlCache; +use uuid::Uuid; + +const REFETCH_DURATION: u64 = 60 * 2; + +#[derive(Clone)] +pub struct ActorCache { + db: Db, + cache: Arc>>, + following: Arc>>, +} + +impl ActorCache { + pub fn new(db: Db) -> Self { + let cache = ActorCache { + db, + cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))), + following: Arc::new(RwLock::new(HashSet::new())), + }; + + cache.spawn_rehydrate(); + + cache + } + + pub async fn is_following(&self, id: &XsdAnyUri) -> bool { + self.following.read().await.contains(id) + } + + pub async fn get(&self, id: &XsdAnyUri, requests: &Requests) -> Result { + if let Some(actor) = self.cache.read().await.get(id) { + return Ok(actor.clone()); + } + + if let Some(actor) = self.lookup(id).await? { + self.cache.write().await.insert( + id.clone(), + actor.clone(), + Duration::from_secs(REFETCH_DURATION), + ); + return Ok(actor); + } + + let accepted_actor = requests.fetch::(id.as_str()).await?; + + let actor_host = accepted_actor.id.as_url().host(); + let inbox_host = accepted_actor.inbox().as_url().host(); + + if actor_host != inbox_host { + let actor_host = actor_host.map(|h| h.to_string()).unwrap_or(String::new()); + let inbox_host = inbox_host.map(|h| h.to_string()).unwrap_or(String::new()); + + return Err(MyError::HostMismatch(actor_host, inbox_host)); + } + + let inbox = accepted_actor.inbox().clone(); + + let actor = Actor { + id: accepted_actor.id, + public_key: accepted_actor.public_key.public_key_pem, + public_key_id: accepted_actor.public_key.id, + inbox, + }; + + self.cache.write().await.insert( + id.clone(), + actor.clone(), + Duration::from_secs(REFETCH_DURATION), + ); + + self.update(id, &actor.public_key, &actor.public_key_id) + .await?; + + Ok(actor) + } + + pub async fn follower(&self, actor: &Actor) -> Result<(), MyError> { + self.save(actor.clone()).await + } + + pub async fn cache_follower(&self, id: XsdAnyUri) { + self.following.write().await.insert(id); + } + + pub async fn bust_follower(&self, id: &XsdAnyUri) { + self.following.write().await.remove(id); + } + + pub async fn unfollower(&self, actor: &Actor) -> Result, MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "DELETE FROM actors WHERE actor_id = $1::TEXT RETURNING listener_id;", + &[&actor.id.as_str()], + ) + .await?; + + let row = if let Some(row) = row_opt { + row + } else { + return Ok(None); + }; + + let listener_id: Uuid = row.try_get(0)?; + + let row_opt = conn + .query_opt( + "SELECT FROM actors WHERE listener_id = $1::UUID;", + &[&listener_id], + ) + .await?; + + if row_opt.is_none() { + return Ok(Some(listener_id)); + } + + Ok(None) + } + + async fn lookup(&self, id: &XsdAnyUri) -> Result, MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "SELECT listeners.actor_id, actors.public_key, actors.public_key_id + FROM listeners + INNER JOIN actors ON actors.listener_id = listeners.id + WHERE + actors.actor_id = $1::TEXT + AND + actors.updated_at + INTERVAL '120 seconds' < NOW() + LIMIT 1;", + &[&id.as_str()], + ) + .await?; + + let row = if let Some(row) = row_opt { + row + } else { + return Ok(None); + }; + + let inbox: String = row.try_get(0)?; + let public_key_id: String = row.try_get(2)?; + + Ok(Some(Actor { + id: id.clone(), + inbox: inbox.parse()?, + public_key: row.try_get(1)?, + public_key_id: public_key_id.parse()?, + })) + } + + async fn save(&self, actor: Actor) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + let row_opt = conn + .query_opt( + "SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;", + &[&actor.inbox.as_str()], + ) + .await?; + + let row = if let Some(row) = row_opt { + row + } else { + return Err(MyError::NotSubscribed(actor.id.as_str().to_owned())); + }; + + let listener_id: Uuid = row.try_get(0)?; + + conn.execute( + "INSERT INTO actors (actor_id, public_key, public_key_id, listener_id, created_at, updated_at) + VALUES ($1::TEXT, $2::TEXT, $3::TEXT, $4::UUID, 'now', 'now') + ON CONFLICT (actor_id) + DO UPDATE SET public_key = $2::TEXT;", + &[&actor.id.as_str(), &actor.public_key, &actor.public_key_id.as_str(), &listener_id], + ) + .await?; + Ok(()) + } + + async fn update( + &self, + id: &XsdAnyUri, + public_key: &str, + public_key_id: &XsdAnyUri, + ) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + conn.execute( + "UPDATE actors + SET public_key = $2::TEXT, public_key_id = $3::TEXT + WHERE actor_id = $1::TEXT;", + &[&id.as_str(), &public_key, &public_key_id.as_str()], + ) + .await?; + + Ok(()) + } + + fn spawn_rehydrate(&self) { + use actix::clock::{interval_at, Instant}; + + let this = self.clone(); + actix::spawn(async move { + let mut interval = interval_at(Instant::now(), Duration::from_secs(60 * 10)); + + loop { + if let Err(e) = this.rehydrate().await { + error!("Error rehydrating follows, {}", e); + } + + interval.tick().await; + } + }); + } + + async fn rehydrate(&self) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + + let rows = conn.query("SELECT actor_id FROM actors;", &[]).await?; + + let actor_ids = rows + .into_iter() + .filter_map(|row| match row.try_get(0) { + Ok(s) => { + let s: String = s; + match s.parse() { + Ok(s) => Some(s), + Err(e) => { + error!("Error parsing actor id, {}", e); + None + } + } + } + Err(e) => { + error!("Error getting actor id from row, {}", e); + None + } + }) + .collect(); + + let mut write_guard = self.following.write().await; + *write_guard = actor_ids; + Ok(()) + } +} + +#[derive(Clone)] +pub struct Actor { + pub id: XsdAnyUri, + pub public_key: String, + pub public_key_id: XsdAnyUri, + pub inbox: XsdAnyUri, +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 0000000..9af1f01 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1,9 @@ +mod actor; +mod node; +mod state; + +pub use self::{ + actor::{Actor, ActorCache}, + node::{Node, NodeCache}, + state::State, +}; diff --git a/src/node.rs b/src/data/node.rs similarity index 100% rename from src/node.rs rename to src/data/node.rs diff --git a/src/state.rs b/src/data/state.rs similarity index 96% rename from src/state.rs rename to src/data/state.rs index 2d26478..7297fe4 100644 --- a/src/state.rs +++ b/src/data/state.rs @@ -1,9 +1,8 @@ use crate::{ - apub::AcceptedActors, config::{Config, UrlKind}, + data::NodeCache, db::Db, error::MyError, - node::NodeCache, requests::Requests, }; use activitystreams::primitives::XsdAnyUri; @@ -16,16 +15,12 @@ use rand::thread_rng; use rsa::{RSAPrivateKey, RSAPublicKey}; use std::{collections::HashSet, sync::Arc}; use tokio::sync::RwLock; -use ttl_cache::TtlCache; - -pub type ActorCache = Arc>>; #[derive(Clone)] pub struct State { pub public_key: RSAPublicKey, private_key: RSAPrivateKey, config: Config, - actor_cache: ActorCache, actor_id_cache: Arc>>, blocks: Arc>>, whitelists: Arc>>, @@ -42,7 +37,6 @@ impl State { Requests::new( self.config.generate_url(UrlKind::MainKey), self.private_key.clone(), - self.actor_cache.clone(), format!( "Actix Web 3.0.0-alpha.1 ({}/{}; +{})", self.config.software_name(), @@ -204,7 +198,6 @@ impl State { public_key, private_key, config, - actor_cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))), actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))), blocks: Arc::new(RwLock::new(blocks)), whitelists: Arc::new(RwLock::new(whitelists)), diff --git a/src/db.rs b/src/db.rs index c5e7248..5ce5590 100644 --- a/src/db.rs +++ b/src/db.rs @@ -129,20 +129,17 @@ impl Db { } pub async fn listen(client: &Client) -> Result<(), Error> { - info!("LISTEN new_blocks;"); - info!("LISTEN new_whitelists;"); - info!("LISTEN new_listeners;"); - info!("LISTEN rm_blocks;"); - info!("LISTEN rm_whitelists;"); - info!("LISTEN rm_listeners;"); + info!("LISTEN new_blocks, new_whitelists, new_listeners, new_actors, rm_blocks, rm_whitelists, rm_listeners, rm_actors"); client .batch_execute( "LISTEN new_blocks; LISTEN new_whitelists; LISTEN new_listeners; + LISTEN new_actors; LISTEN rm_blocks; LISTEN rm_whitelists; - LISTEN rm_listeners;", + LISTEN rm_listeners; + LISTEN rm_actors;", ) .await?; diff --git a/src/error.rs b/src/error.rs index bcbaae4..b9aff00 100644 --- a/src/error.rs +++ b/src/error.rs @@ -67,6 +67,9 @@ pub enum MyError { #[error("Too many CPUs, {0}")] CpuCount(#[from] std::num::TryFromIntError), + #[error("Hosts don't match, {0}, {1}")] + HostMismatch(String, String), + #[error("Couldn't flush buffer")] FlushBuffer, diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index ea91551..78f7104 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -9,6 +9,7 @@ pub use self::{ }; use crate::{ + data::{ActorCache, NodeCache, State}, db::Db, error::MyError, jobs::{ @@ -19,9 +20,7 @@ use crate::{ process_listeners::{Listeners, ListenersProcessor}, storage::Storage, }, - node::NodeCache, requests::Requests, - state::State, }; use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig}; use std::time::Duration; @@ -35,20 +34,21 @@ pub fn create_server(db: Db) -> JobServer { JobServer::new(shared, local) } -pub fn create_workers(state: State, job_server: JobServer) { +pub fn create_workers(state: State, actors: ActorCache, job_server: JobServer) { let state2 = state.clone(); + let actors2 = actors.clone(); let job_server2 = job_server.clone(); let remote_handle = job_server.remote.clone(); let local_handle = job_server.local.clone(); - WorkerConfig::new(move || JobState::new(state.clone(), job_server.clone())) + WorkerConfig::new(move || JobState::new(state.clone(), actors.clone(), job_server.clone())) .register(DeliverProcessor) .register(DeliverManyProcessor) .set_processor_count("default", 4) .start(remote_handle); - WorkerConfig::new(move || JobState::new(state2.clone(), job_server2.clone())) + WorkerConfig::new(move || JobState::new(state2.clone(), actors2.clone(), job_server2.clone())) .register(NodeinfoProcessor) .register(InstanceProcessor) .register(ListenersProcessor) @@ -60,6 +60,7 @@ pub fn create_workers(state: State, job_server: JobServer) { pub struct JobState { requests: Requests, state: State, + actors: ActorCache, node_cache: NodeCache, job_server: JobServer, } @@ -71,10 +72,11 @@ pub struct JobServer { } impl JobState { - fn new(state: State, job_server: JobServer) -> Self { + fn new(state: State, actors: ActorCache, job_server: JobServer) -> Self { JobState { requests: state.requests(), node_cache: state.node_cache(), + actors, state, job_server, } diff --git a/src/main.rs b/src/main.rs index 3b4892a..9883404 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,24 +4,23 @@ use actix_web::{middleware::Logger, web, App, HttpServer}; mod apub; mod args; mod config; +mod data; mod db; mod error; mod jobs; mod middleware; -mod node; mod notify; mod requests; mod routes; -mod state; use self::{ args::Args, config::Config, + data::{ActorCache, State}, db::Db, jobs::{create_server, create_workers}, middleware::RelayResolver, routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, - state::State, }; #[actix_rt::main] @@ -64,17 +63,19 @@ async fn main() -> Result<(), anyhow::Error> { } let state = State::hydrate(config.clone(), &db).await?; + let actors = ActorCache::new(db.clone()); let job_server = create_server(db.clone()); - notify::spawn(state.clone(), job_server.clone(), &config)?; + notify::spawn(state.clone(), actors.clone(), job_server.clone(), &config)?; if args.jobs_only() { for _ in 0..num_cpus::get() { let state = state.clone(); + let actors = actors.clone(); let job_server = job_server.clone(); Arbiter::new().exec_fn(move || { - create_workers(state, job_server); + create_workers(state, actors, job_server); }); } actix_rt::signal::ctrl_c().await?; @@ -86,7 +87,7 @@ async fn main() -> Result<(), anyhow::Error> { let bind_address = config.bind_address(); HttpServer::new(move || { if !no_jobs { - create_workers(state.clone(), job_server.clone()); + create_workers(state.clone(), actors.clone(), job_server.clone()); } App::new() @@ -94,13 +95,14 @@ async fn main() -> Result<(), anyhow::Error> { .data(db.clone()) .data(state.clone()) .data(state.requests()) + .data(actors.clone()) .data(config.clone()) .data(job_server.clone()) .service(web::resource("/").route(web::get().to(index))) .service( web::resource("/inbox") .wrap(config.digest_middleware()) - .wrap(config.signature_middleware(state.requests())) + .wrap(config.signature_middleware(state.requests(), actors.clone())) .route(web::post().to(inbox)), ) .service(web::resource("/actor").route(web::get().to(actor))) diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index dc31955..f0a23ad 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -1,13 +1,51 @@ -use crate::{error::MyError, requests::Requests}; +use crate::{data::ActorCache, error::MyError, requests::Requests}; +use activitystreams::primitives::XsdAnyUri; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; -use log::{debug, error, warn}; +use log::{error, warn}; use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey}; use rsa_pem::KeyExt; use sha2::{Digest, Sha256}; use std::{future::Future, pin::Pin}; #[derive(Clone)] -pub struct MyVerify(pub Requests); +pub struct MyVerify(pub Requests, pub ActorCache); + +impl MyVerify { + async fn verify( + &self, + algorithm: Option, + key_id: String, + signature: String, + signing_string: String, + ) -> Result { + let mut uri: XsdAnyUri = key_id.parse()?; + uri.as_url_mut().set_fragment(None); + let actor = self.1.get(&uri, &self.0).await?; + + let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key)?; + + match algorithm { + Some(Algorithm::Hs2019) => (), + Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (), + other => { + warn!("Invalid algorithm supplied for signature, {:?}", other); + return Err(MyError::Algorithm); + } + }; + + let decoded = base64::decode(signature)?; + let hashed = Sha256::digest(signing_string.as_bytes()); + + public_key.verify( + PaddingScheme::PKCS1v15, + Some(&Hashes::SHA2_256), + &hashed, + &decoded, + )?; + + Ok(true) + } +} impl SignatureVerify for MyVerify { type Error = MyError; @@ -24,10 +62,10 @@ impl SignatureVerify for MyVerify { let signature = signature.to_owned(); let signing_string = signing_string.to_owned(); - let client = self.0.clone(); + let this = self.clone(); Box::pin(async move { - verify(client, algorithm, key_id, signature, signing_string) + this.verify(algorithm, key_id, signature, signing_string) .await .map_err(|e| { error!("Failed to verify, {}", e); @@ -36,41 +74,3 @@ impl SignatureVerify for MyVerify { }) } } - -async fn verify( - client: Requests, - algorithm: Option, - key_id: String, - signature: String, - signing_string: String, -) -> Result { - debug!("Fetching actor"); - let actor = client.fetch_actor(&key_id.parse()?).await?; - - debug!("Parsing public key"); - let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key.public_key_pem)?; - - match algorithm { - Some(Algorithm::Hs2019) => (), - Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (), - other => { - warn!("Invalid algorithm supplied for signature, {:?}", other); - return Err(MyError::Algorithm); - } - }; - - debug!("Decoding base64"); - let decoded = base64::decode(signature)?; - debug!("hashing"); - let hashed = Sha256::digest(signing_string.as_bytes()); - - debug!("Verifying signature for {}", key_id); - public_key.verify( - PaddingScheme::PKCS1v15, - Some(&Hashes::SHA2_256), - &hashed, - &decoded, - )?; - - Ok(true) -} diff --git a/src/middleware/webfinger.rs b/src/middleware/webfinger.rs index 703cb71..2b3c117 100644 --- a/src/middleware/webfinger.rs +++ b/src/middleware/webfinger.rs @@ -1,6 +1,6 @@ use crate::{ config::{Config, UrlKind}, - state::State, + data::State, }; use activitystreams::context; use actix_web::web::Data; diff --git a/src/notify.rs b/src/notify.rs index 2e65fc1..95539eb 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -1,8 +1,8 @@ use crate::{ + data::{ActorCache, State}, db::listen, error::MyError, jobs::{JobServer, QueryInstance, QueryNodeinfo}, - state::State, }; use activitystreams::primitives::XsdAnyUri; use actix::clock::{delay_for, Duration}; @@ -14,7 +14,12 @@ use futures::{ use log::{debug, error, info, warn}; use std::sync::Arc; -async fn handle_notification(state: State, job_server: JobServer, notif: Notification) { +async fn handle_notification( + state: State, + actors: ActorCache, + job_server: JobServer, + notif: Notification, +) { match notif.channel() { "new_blocks" => { info!("Caching block of {}", notif.payload()); @@ -32,6 +37,12 @@ async fn handle_notification(state: State, job_server: JobServer, notif: Notific let _ = job_server.queue_local(QueryNodeinfo::new(uri)); } } + "new_actors" => { + if let Ok(uri) = notif.payload().parse::() { + info!("Caching follower {}", uri); + actors.cache_follower(uri).await; + } + } "rm_blocks" => { info!("Busting block cache for {}", notif.payload()); state.bust_block(notif.payload()).await; @@ -46,12 +57,19 @@ async fn handle_notification(state: State, job_server: JobServer, notif: Notific state.bust_listener(&uri).await; } } + "rm_actors" => { + if let Ok(uri) = notif.payload().parse::() { + info!("Busting follower cache for {}", uri); + actors.bust_follower(&uri).await; + } + } _ => (), }; } pub fn spawn( state: State, + actors: ActorCache, job_server: JobServer, config: &crate::config::Config, ) -> Result<(), MyError> { @@ -97,7 +115,12 @@ pub fn spawn( }); while let Some(n) = stream.next().await { - actix::spawn(handle_notification(state.clone(), job_server.clone(), n)); + actix::spawn(handle_notification( + state.clone(), + actors.clone(), + job_server.clone(), + n, + )); } drop(client); diff --git a/src/requests.rs b/src/requests.rs index 88ff91d..569c366 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,4 +1,4 @@ -use crate::{apub::AcceptedActors, error::MyError, state::ActorCache}; +use crate::error::MyError; use activitystreams::primitives::XsdAnyUri; use actix_web::client::Client; use http_signature_normalization_actix::prelude::*; @@ -11,40 +11,21 @@ pub struct Requests { client: Client, key_id: String, private_key: RSAPrivateKey, - actor_cache: ActorCache, config: Config, user_agent: String, } impl Requests { - pub fn new( - key_id: String, - private_key: RSAPrivateKey, - actor_cache: ActorCache, - user_agent: String, - ) -> Self { + pub fn new(key_id: String, private_key: RSAPrivateKey, user_agent: String) -> Self { Requests { client: Client::default(), key_id, private_key, - actor_cache, config: Config::default().dont_use_created_field(), user_agent, } } - pub async fn fetch_actor(&self, actor_id: &XsdAnyUri) -> Result { - if let Some(actor) = self.get_actor(actor_id).await { - return Ok(actor); - } - - let actor: AcceptedActors = self.fetch(actor_id.as_str()).await?; - - self.cache_actor(actor_id.to_owned(), actor.clone()).await; - - Ok(actor) - } - pub async fn fetch(&self, url: &str) -> Result where T: serde::de::DeserializeOwned, @@ -131,18 +112,4 @@ impl Requests { .sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?; Ok(base64::encode(bytes)) } - - async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option { - let cache = self.actor_cache.clone(); - - let read_guard = cache.read().await; - read_guard.get(actor_id).cloned() - } - - async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) { - let cache = self.actor_cache.clone(); - - let mut write_guard = cache.write().await; - write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600)); - } } diff --git a/src/routes/actor.rs b/src/routes/actor.rs index 67f9d80..dd92458 100644 --- a/src/routes/actor.rs +++ b/src/routes/actor.rs @@ -1,9 +1,9 @@ use crate::{ apub::PublicKey, config::{Config, UrlKind}, + data::State, error::MyError, routes::ok, - state::State, }; use activitystreams::{ actor::Application, context, endpoint::EndpointProperties, ext::Extensible, diff --git a/src/routes/inbox.rs b/src/routes/inbox.rs index 3dacc39..579d3b2 100644 --- a/src/routes/inbox.rs +++ b/src/routes/inbox.rs @@ -1,13 +1,13 @@ use crate::{ - apub::{AcceptedActors, AcceptedObjects, ValidTypes}, + apub::{AcceptedObjects, ValidTypes}, config::{Config, UrlKind}, + data::{Actor, ActorCache, State}, db::Db, error::MyError, jobs::JobServer, jobs::{Deliver, DeliverMany}, requests::Requests, routes::accepted, - state::State, }; use activitystreams::{ activity::{Accept, Announce, Follow, Undo}, @@ -25,6 +25,7 @@ use std::convert::TryInto; pub async fn route( db: web::Data, state: web::Data, + actors: web::Data, config: web::Data, client: web::Data, jobs: web::Data, @@ -34,12 +35,12 @@ pub async fn route( ) -> Result { let input = input.into_inner(); - let actor = client.fetch_actor(&input.actor).await?; + let actor = actors.get(&input.actor, &client).await?; let (is_blocked, is_whitelisted, is_listener) = join!( state.is_blocked(&actor.id), state.is_whitelisted(&actor.id), - state.is_listener(actor.inbox()) + state.is_listener(&actor.inbox) ); if is_blocked { @@ -51,17 +52,17 @@ pub async fn route( } if !is_listener && !valid_without_listener(&input) { - return Err(MyError::NotSubscribed(actor.inbox().to_string())); + return Err(MyError::NotSubscribed(actor.inbox.to_string())); } if config.validate_signatures() && (digest_verified.is_none() || verified.is_none()) { - return Err(MyError::NoSignature(actor.public_key.id.to_string())); + return Err(MyError::NoSignature(actor.public_key_id.to_string())); } else if config.validate_signatures() { if let Some(verified) = verified { - if actor.public_key.id.as_str() != verified.key_id() { + if actor.public_key_id.as_str() != verified.key_id() { error!("Bad actor, more info: {:?}", input); return Err(MyError::BadActor( - actor.public_key.id.to_string(), + actor.public_key_id.to_string(), verified.key_id().to_owned(), )); } @@ -70,16 +71,28 @@ pub async fn route( match input.kind { ValidTypes::Accept => handle_accept(&config, input).await, - ValidTypes::Reject => handle_reject(&db, &config, &jobs, input, actor).await, + ValidTypes::Reject => handle_reject(&db, &actors, &config, &jobs, input, actor).await, ValidTypes::Announce | ValidTypes::Create => { handle_announce(&state, &config, &jobs, input, actor).await } - ValidTypes::Follow => handle_follow(&db, &config, &jobs, input, actor, is_listener).await, + ValidTypes::Follow => { + handle_follow(&db, &actors, &config, &jobs, input, actor, is_listener).await + } ValidTypes::Delete | ValidTypes::Update => { handle_forward(&state, &jobs, input, actor).await } ValidTypes::Undo => { - handle_undo(&db, &state, &config, &jobs, input, actor, is_listener).await + handle_undo( + &db, + &state, + &actors, + &config, + &jobs, + input, + actor, + is_listener, + ) + .await } } } @@ -111,10 +124,11 @@ async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result Result { if !input.object.is_kind("Follow") { return Err(MyError::Kind( @@ -131,13 +145,13 @@ async fn handle_reject( let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; - let inbox = actor.inbox().to_owned(); - db.remove_listener(inbox).await?; + if let Some(_) = actors.unfollower(&actor).await? { + db.remove_listener(actor.inbox.clone()).await?; + } let undo = generate_undo_follow(config, &actor.id, &my_id)?; - let inbox = actor.inbox().to_owned(); - jobs.queue(Deliver::new(inbox, undo.clone())?)?; + jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?; Ok(accepted(undo)) } @@ -145,10 +159,11 @@ async fn handle_reject( async fn handle_undo( db: &Db, state: &State, + actors: &ActorCache, config: &Config, jobs: &JobServer, input: AcceptedObjects, - actor: AcceptedActors, + actor: Actor, is_listener: bool, ) -> Result { match input.object.kind() { @@ -180,22 +195,27 @@ async fn handle_undo( return Ok(accepted(serde_json::json!({}))); } - let inbox = actor.inbox().to_owned(); - db.remove_listener(inbox).await?; + let was_following = actors.is_following(&actor.id).await; - let undo = generate_undo_follow(config, &actor.id, &my_id)?; + if let Some(_) = actors.unfollower(&actor).await? { + db.remove_listener(actor.inbox.clone()).await?; + } - let inbox = actor.inbox().to_owned(); - jobs.queue(Deliver::new(inbox, undo.clone())?)?; + if was_following { + let undo = generate_undo_follow(config, &actor.id, &my_id)?; + jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?; - Ok(accepted(undo)) + return Ok(accepted(undo)); + } + + Ok(accepted(serde_json::json!({}))) } async fn handle_forward( state: &State, jobs: &JobServer, input: AcceptedObjects, - actor: AcceptedActors, + actor: Actor, ) -> Result { let object_id = input.object.id(); @@ -210,7 +230,7 @@ async fn handle_announce( config: &Config, jobs: &JobServer, input: AcceptedObjects, - actor: AcceptedActors, + actor: Actor, ) -> Result { let object_id = input.object.id(); @@ -231,10 +251,11 @@ async fn handle_announce( async fn handle_follow( db: &Db, + actors: &ActorCache, config: &Config, jobs: &JobServer, input: AcceptedObjects, - actor: AcceptedActors, + actor: Actor, is_listener: bool, ) -> Result { let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; @@ -244,21 +265,20 @@ async fn handle_follow( } if !is_listener { - let inbox = actor.inbox().to_owned(); - db.add_listener(inbox).await?; - - // if following relay directly, not just following 'public', followback - if input.object.is(&my_id) { - let follow = generate_follow(config, &actor.id, &my_id)?; - let inbox = actor.inbox().to_owned(); - jobs.queue(Deliver::new(inbox, follow)?)?; - } + db.add_listener(actor.inbox.clone()).await?; } + // if following relay directly, not just following 'public', followback + if input.object.is(&my_id) && !actors.is_following(&actor.id).await { + let follow = generate_follow(config, &actor.id, &my_id)?; + jobs.queue(Deliver::new(actor.inbox.clone(), follow)?)?; + } + + actors.follower(&actor).await?; + let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?; - let inbox = actor.inbox().to_owned(); - jobs.queue(Deliver::new(inbox, accept.clone())?)?; + jobs.queue(Deliver::new(actor.inbox, accept.clone())?)?; Ok(accepted(accept)) } @@ -379,7 +399,7 @@ where async fn get_inboxes( state: &State, - actor: &AcceptedActors, + actor: &Actor, object_id: &XsdAnyUri, ) -> Result, MyError> { let domain = object_id @@ -388,7 +408,5 @@ async fn get_inboxes( .ok_or(MyError::Domain)? .to_string(); - let inbox = actor.inbox(); - - Ok(state.listeners_without(&inbox, &domain).await) + Ok(state.listeners_without(&actor.inbox, &domain).await) } diff --git a/src/routes/index.rs b/src/routes/index.rs index 8d6ad43..470d63b 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -1,4 +1,4 @@ -use crate::{config::Config, error::MyError, state::State}; +use crate::{config::Config, data::State, error::MyError}; use actix_web::{web, HttpResponse}; use log::error; use std::io::BufWriter; diff --git a/src/routes/nodeinfo.rs b/src/routes/nodeinfo.rs index c88b39b..d5bfec8 100644 --- a/src/routes/nodeinfo.rs +++ b/src/routes/nodeinfo.rs @@ -1,6 +1,6 @@ use crate::{ config::{Config, UrlKind}, - state::State, + data::State, }; use actix_web::{web, Responder}; use actix_webfinger::Link; diff --git a/src/schema.rs b/src/schema.rs index 39c67ae..c60ab10 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,3 +1,15 @@ +table! { + actors (id) { + id -> Uuid, + actor_id -> Text, + public_key -> Text, + public_key_id -> Text, + listener_id -> Uuid, + created_at -> Timestamp, + updated_at -> Timestamp, + } +} + table! { blocks (id) { id -> Uuid, @@ -31,6 +43,18 @@ table! { } } +table! { + nodes (id) { + id -> Uuid, + listener_id -> Uuid, + nodeinfo -> Nullable, + instance -> Nullable, + contact -> Nullable, + created_at -> Timestamp, + updated_at -> Timestamp, + } +} + table! { settings (id) { id -> Uuid, @@ -50,10 +74,15 @@ table! { } } +joinable!(actors -> listeners (listener_id)); +joinable!(nodes -> listeners (listener_id)); + allow_tables_to_appear_in_same_query!( + actors, blocks, jobs, listeners, + nodes, settings, whitelists, ); diff --git a/templates/index.rs.html b/templates/index.rs.html index fa67175..becfef1 100644 --- a/templates/index.rs.html +++ b/templates/index.rs.html @@ -1,4 +1,4 @@ -@use crate::{config::{Config, UrlKind}, templates::statics::index_css, node::Node}; +@use crate::{config::{Config, UrlKind}, templates::statics::index_css, data::Node}; @(nodes: &[Node], config: &Config)