More shuffling, store actors in db

This commit is contained in:
asonix 2020-03-23 17:17:53 -05:00
parent 9925e41673
commit e4c95a8168
23 changed files with 535 additions and 159 deletions

View file

@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TRIGGER IF EXISTS actors_notify ON actors;
DROP FUNCTION IF EXISTS invoke_actors_trigger();
DROP TABLE actors;

View file

@ -0,0 +1,49 @@
-- Your SQL goes here
CREATE TABLE actors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
public_key TEXT NOT NULL,
public_key_id TEXT UNIQUE NOT NULL,
listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
SELECT diesel_manage_updated_at('actors');
CREATE OR REPLACE FUNCTION invoke_actors_trigger ()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
rec RECORD;
channel TEXT;
payload TEXT;
BEGIN
case TG_OP
WHEN 'INSERT' THEN
rec := NEW;
channel := 'new_actors';
payload := NEW.actor_id;
WHEN 'UPDATE' THEN
rec := NEW;
channel := 'new_actors';
payload := NEW.actor_id;
WHEN 'DELETE' THEN
rec := OLD;
channel := 'rm_actors';
payload := OLD.actor_id;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
PERFORM pg_notify(channel, payload::TEXT);
RETURN rec;
END;
$$;
CREATE TRIGGER actors_notify
AFTER INSERT OR UPDATE OR DELETE
ON actors
FOR EACH ROW
EXECUTE PROCEDURE invoke_actors_trigger();

View file

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE nodes;

View file

@ -0,0 +1,12 @@
-- Your SQL goes here
CREATE TABLE nodes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
listener_id UUID NOT NULL REFERENCES listeners(id) ON DELETE CASCADE,
nodeinfo JSONB,
instance JSONB,
contact JSONB,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
SELECT diesel_manage_updated_at('nodes');

View file

@ -1,4 +1,4 @@
use crate::{error::MyError, middleware::MyVerify, requests::Requests};
use crate::{data::ActorCache, error::MyError, middleware::MyVerify, requests::Requests};
use config::Environment;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use sha2::{Digest, Sha256};
@ -71,11 +71,15 @@ impl Config {
}
}
pub fn signature_middleware(&self, requests: Requests) -> VerifySignature<MyVerify> {
pub fn signature_middleware(
&self,
requests: Requests,
actors: ActorCache,
) -> VerifySignature<MyVerify> {
if self.validate_signatures {
VerifySignature::new(MyVerify(requests), Default::default())
VerifySignature::new(MyVerify(requests, actors), Default::default())
} else {
VerifySignature::new(MyVerify(requests), Default::default()).optional()
VerifySignature::new(MyVerify(requests, actors), Default::default()).optional()
}
}

262
src/data/actor.rs Normal file
View file

@ -0,0 +1,262 @@
use crate::{apub::AcceptedActors, db::Db, error::MyError, requests::Requests};
use activitystreams::primitives::XsdAnyUri;
use log::error;
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use ttl_cache::TtlCache;
use uuid::Uuid;
const REFETCH_DURATION: u64 = 60 * 2;
#[derive(Clone)]
pub struct ActorCache {
db: Db,
cache: Arc<RwLock<TtlCache<XsdAnyUri, Actor>>>,
following: Arc<RwLock<HashSet<XsdAnyUri>>>,
}
impl ActorCache {
pub fn new(db: Db) -> Self {
let cache = ActorCache {
db,
cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))),
following: Arc::new(RwLock::new(HashSet::new())),
};
cache.spawn_rehydrate();
cache
}
pub async fn is_following(&self, id: &XsdAnyUri) -> bool {
self.following.read().await.contains(id)
}
pub async fn get(&self, id: &XsdAnyUri, requests: &Requests) -> Result<Actor, MyError> {
if let Some(actor) = self.cache.read().await.get(id) {
return Ok(actor.clone());
}
if let Some(actor) = self.lookup(id).await? {
self.cache.write().await.insert(
id.clone(),
actor.clone(),
Duration::from_secs(REFETCH_DURATION),
);
return Ok(actor);
}
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
let actor_host = accepted_actor.id.as_url().host();
let inbox_host = accepted_actor.inbox().as_url().host();
if actor_host != inbox_host {
let actor_host = actor_host.map(|h| h.to_string()).unwrap_or(String::new());
let inbox_host = inbox_host.map(|h| h.to_string()).unwrap_or(String::new());
return Err(MyError::HostMismatch(actor_host, inbox_host));
}
let inbox = accepted_actor.inbox().clone();
let actor = Actor {
id: accepted_actor.id,
public_key: accepted_actor.public_key.public_key_pem,
public_key_id: accepted_actor.public_key.id,
inbox,
};
self.cache.write().await.insert(
id.clone(),
actor.clone(),
Duration::from_secs(REFETCH_DURATION),
);
self.update(id, &actor.public_key, &actor.public_key_id)
.await?;
Ok(actor)
}
pub async fn follower(&self, actor: &Actor) -> Result<(), MyError> {
self.save(actor.clone()).await
}
pub async fn cache_follower(&self, id: XsdAnyUri) {
self.following.write().await.insert(id);
}
pub async fn bust_follower(&self, id: &XsdAnyUri) {
self.following.write().await.remove(id);
}
pub async fn unfollower(&self, actor: &Actor) -> Result<Option<Uuid>, MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"DELETE FROM actors WHERE actor_id = $1::TEXT RETURNING listener_id;",
&[&actor.id.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let listener_id: Uuid = row.try_get(0)?;
let row_opt = conn
.query_opt(
"SELECT FROM actors WHERE listener_id = $1::UUID;",
&[&listener_id],
)
.await?;
if row_opt.is_none() {
return Ok(Some(listener_id));
}
Ok(None)
}
async fn lookup(&self, id: &XsdAnyUri) -> Result<Option<Actor>, MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"SELECT listeners.actor_id, actors.public_key, actors.public_key_id
FROM listeners
INNER JOIN actors ON actors.listener_id = listeners.id
WHERE
actors.actor_id = $1::TEXT
AND
actors.updated_at + INTERVAL '120 seconds' < NOW()
LIMIT 1;",
&[&id.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Ok(None);
};
let inbox: String = row.try_get(0)?;
let public_key_id: String = row.try_get(2)?;
Ok(Some(Actor {
id: id.clone(),
inbox: inbox.parse()?,
public_key: row.try_get(1)?,
public_key_id: public_key_id.parse()?,
}))
}
async fn save(&self, actor: Actor) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
let row_opt = conn
.query_opt(
"SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;",
&[&actor.inbox.as_str()],
)
.await?;
let row = if let Some(row) = row_opt {
row
} else {
return Err(MyError::NotSubscribed(actor.id.as_str().to_owned()));
};
let listener_id: Uuid = row.try_get(0)?;
conn.execute(
"INSERT INTO actors (actor_id, public_key, public_key_id, listener_id, created_at, updated_at)
VALUES ($1::TEXT, $2::TEXT, $3::TEXT, $4::UUID, 'now', 'now')
ON CONFLICT (actor_id)
DO UPDATE SET public_key = $2::TEXT;",
&[&actor.id.as_str(), &actor.public_key, &actor.public_key_id.as_str(), &listener_id],
)
.await?;
Ok(())
}
async fn update(
&self,
id: &XsdAnyUri,
public_key: &str,
public_key_id: &XsdAnyUri,
) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
conn.execute(
"UPDATE actors
SET public_key = $2::TEXT, public_key_id = $3::TEXT
WHERE actor_id = $1::TEXT;",
&[&id.as_str(), &public_key, &public_key_id.as_str()],
)
.await?;
Ok(())
}
fn spawn_rehydrate(&self) {
use actix::clock::{interval_at, Instant};
let this = self.clone();
actix::spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(60 * 10));
loop {
if let Err(e) = this.rehydrate().await {
error!("Error rehydrating follows, {}", e);
}
interval.tick().await;
}
});
}
async fn rehydrate(&self) -> Result<(), MyError> {
let conn = self.db.pool().get().await?;
let rows = conn.query("SELECT actor_id FROM actors;", &[]).await?;
let actor_ids = rows
.into_iter()
.filter_map(|row| match row.try_get(0) {
Ok(s) => {
let s: String = s;
match s.parse() {
Ok(s) => Some(s),
Err(e) => {
error!("Error parsing actor id, {}", e);
None
}
}
}
Err(e) => {
error!("Error getting actor id from row, {}", e);
None
}
})
.collect();
let mut write_guard = self.following.write().await;
*write_guard = actor_ids;
Ok(())
}
}
#[derive(Clone)]
pub struct Actor {
pub id: XsdAnyUri,
pub public_key: String,
pub public_key_id: XsdAnyUri,
pub inbox: XsdAnyUri,
}

9
src/data/mod.rs Normal file
View file

@ -0,0 +1,9 @@
mod actor;
mod node;
mod state;
pub use self::{
actor::{Actor, ActorCache},
node::{Node, NodeCache},
state::State,
};

View file

@ -1,9 +1,8 @@
use crate::{
apub::AcceptedActors,
config::{Config, UrlKind},
data::NodeCache,
db::Db,
error::MyError,
node::NodeCache,
requests::Requests,
};
use activitystreams::primitives::XsdAnyUri;
@ -16,16 +15,12 @@ use rand::thread_rng;
use rsa::{RSAPrivateKey, RSAPublicKey};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
use ttl_cache::TtlCache;
pub type ActorCache = Arc<RwLock<TtlCache<XsdAnyUri, AcceptedActors>>>;
#[derive(Clone)]
pub struct State {
pub public_key: RSAPublicKey,
private_key: RSAPrivateKey,
config: Config,
actor_cache: ActorCache,
actor_id_cache: Arc<RwLock<LruCache<XsdAnyUri, XsdAnyUri>>>,
blocks: Arc<RwLock<HashSet<String>>>,
whitelists: Arc<RwLock<HashSet<String>>>,
@ -42,7 +37,6 @@ impl State {
Requests::new(
self.config.generate_url(UrlKind::MainKey),
self.private_key.clone(),
self.actor_cache.clone(),
format!(
"Actix Web 3.0.0-alpha.1 ({}/{}; +{})",
self.config.software_name(),
@ -204,7 +198,6 @@ impl State {
public_key,
private_key,
config,
actor_cache: Arc::new(RwLock::new(TtlCache::new(1024 * 8))),
actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))),
blocks: Arc::new(RwLock::new(blocks)),
whitelists: Arc::new(RwLock::new(whitelists)),

View file

@ -129,20 +129,17 @@ impl Db {
}
pub async fn listen(client: &Client) -> Result<(), Error> {
info!("LISTEN new_blocks;");
info!("LISTEN new_whitelists;");
info!("LISTEN new_listeners;");
info!("LISTEN rm_blocks;");
info!("LISTEN rm_whitelists;");
info!("LISTEN rm_listeners;");
info!("LISTEN new_blocks, new_whitelists, new_listeners, new_actors, rm_blocks, rm_whitelists, rm_listeners, rm_actors");
client
.batch_execute(
"LISTEN new_blocks;
LISTEN new_whitelists;
LISTEN new_listeners;
LISTEN new_actors;
LISTEN rm_blocks;
LISTEN rm_whitelists;
LISTEN rm_listeners;",
LISTEN rm_listeners;
LISTEN rm_actors;",
)
.await?;

View file

@ -67,6 +67,9 @@ pub enum MyError {
#[error("Too many CPUs, {0}")]
CpuCount(#[from] std::num::TryFromIntError),
#[error("Hosts don't match, {0}, {1}")]
HostMismatch(String, String),
#[error("Couldn't flush buffer")]
FlushBuffer,

View file

@ -9,6 +9,7 @@ pub use self::{
};
use crate::{
data::{ActorCache, NodeCache, State},
db::Db,
error::MyError,
jobs::{
@ -19,9 +20,7 @@ use crate::{
process_listeners::{Listeners, ListenersProcessor},
storage::Storage,
},
node::NodeCache,
requests::Requests,
state::State,
};
use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig};
use std::time::Duration;
@ -35,20 +34,21 @@ pub fn create_server(db: Db) -> JobServer {
JobServer::new(shared, local)
}
pub fn create_workers(state: State, job_server: JobServer) {
pub fn create_workers(state: State, actors: ActorCache, job_server: JobServer) {
let state2 = state.clone();
let actors2 = actors.clone();
let job_server2 = job_server.clone();
let remote_handle = job_server.remote.clone();
let local_handle = job_server.local.clone();
WorkerConfig::new(move || JobState::new(state.clone(), job_server.clone()))
WorkerConfig::new(move || JobState::new(state.clone(), actors.clone(), job_server.clone()))
.register(DeliverProcessor)
.register(DeliverManyProcessor)
.set_processor_count("default", 4)
.start(remote_handle);
WorkerConfig::new(move || JobState::new(state2.clone(), job_server2.clone()))
WorkerConfig::new(move || JobState::new(state2.clone(), actors2.clone(), job_server2.clone()))
.register(NodeinfoProcessor)
.register(InstanceProcessor)
.register(ListenersProcessor)
@ -60,6 +60,7 @@ pub fn create_workers(state: State, job_server: JobServer) {
pub struct JobState {
requests: Requests,
state: State,
actors: ActorCache,
node_cache: NodeCache,
job_server: JobServer,
}
@ -71,10 +72,11 @@ pub struct JobServer {
}
impl JobState {
fn new(state: State, job_server: JobServer) -> Self {
fn new(state: State, actors: ActorCache, job_server: JobServer) -> Self {
JobState {
requests: state.requests(),
node_cache: state.node_cache(),
actors,
state,
job_server,
}

View file

@ -4,24 +4,23 @@ use actix_web::{middleware::Logger, web, App, HttpServer};
mod apub;
mod args;
mod config;
mod data;
mod db;
mod error;
mod jobs;
mod middleware;
mod node;
mod notify;
mod requests;
mod routes;
mod state;
use self::{
args::Args,
config::Config,
data::{ActorCache, State},
db::Db,
jobs::{create_server, create_workers},
middleware::RelayResolver,
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics},
state::State,
};
#[actix_rt::main]
@ -64,17 +63,19 @@ async fn main() -> Result<(), anyhow::Error> {
}
let state = State::hydrate(config.clone(), &db).await?;
let actors = ActorCache::new(db.clone());
let job_server = create_server(db.clone());
notify::spawn(state.clone(), job_server.clone(), &config)?;
notify::spawn(state.clone(), actors.clone(), job_server.clone(), &config)?;
if args.jobs_only() {
for _ in 0..num_cpus::get() {
let state = state.clone();
let actors = actors.clone();
let job_server = job_server.clone();
Arbiter::new().exec_fn(move || {
create_workers(state, job_server);
create_workers(state, actors, job_server);
});
}
actix_rt::signal::ctrl_c().await?;
@ -86,7 +87,7 @@ async fn main() -> Result<(), anyhow::Error> {
let bind_address = config.bind_address();
HttpServer::new(move || {
if !no_jobs {
create_workers(state.clone(), job_server.clone());
create_workers(state.clone(), actors.clone(), job_server.clone());
}
App::new()
@ -94,13 +95,14 @@ async fn main() -> Result<(), anyhow::Error> {
.data(db.clone())
.data(state.clone())
.data(state.requests())
.data(actors.clone())
.data(config.clone())
.data(job_server.clone())
.service(web::resource("/").route(web::get().to(index)))
.service(
web::resource("/inbox")
.wrap(config.digest_middleware())
.wrap(config.signature_middleware(state.requests()))
.wrap(config.signature_middleware(state.requests(), actors.clone()))
.route(web::post().to(inbox)),
)
.service(web::resource("/actor").route(web::get().to(actor)))

View file

@ -1,13 +1,51 @@
use crate::{error::MyError, requests::Requests};
use crate::{data::ActorCache, error::MyError, requests::Requests};
use activitystreams::primitives::XsdAnyUri;
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use log::{debug, error, warn};
use log::{error, warn};
use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey};
use rsa_pem::KeyExt;
use sha2::{Digest, Sha256};
use std::{future::Future, pin::Pin};
#[derive(Clone)]
pub struct MyVerify(pub Requests);
pub struct MyVerify(pub Requests, pub ActorCache);
impl MyVerify {
async fn verify(
&self,
algorithm: Option<Algorithm>,
key_id: String,
signature: String,
signing_string: String,
) -> Result<bool, MyError> {
let mut uri: XsdAnyUri = key_id.parse()?;
uri.as_url_mut().set_fragment(None);
let actor = self.1.get(&uri, &self.0).await?;
let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key)?;
match algorithm {
Some(Algorithm::Hs2019) => (),
Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (),
other => {
warn!("Invalid algorithm supplied for signature, {:?}", other);
return Err(MyError::Algorithm);
}
};
let decoded = base64::decode(signature)?;
let hashed = Sha256::digest(signing_string.as_bytes());
public_key.verify(
PaddingScheme::PKCS1v15,
Some(&Hashes::SHA2_256),
&hashed,
&decoded,
)?;
Ok(true)
}
}
impl SignatureVerify for MyVerify {
type Error = MyError;
@ -24,10 +62,10 @@ impl SignatureVerify for MyVerify {
let signature = signature.to_owned();
let signing_string = signing_string.to_owned();
let client = self.0.clone();
let this = self.clone();
Box::pin(async move {
verify(client, algorithm, key_id, signature, signing_string)
this.verify(algorithm, key_id, signature, signing_string)
.await
.map_err(|e| {
error!("Failed to verify, {}", e);
@ -36,41 +74,3 @@ impl SignatureVerify for MyVerify {
})
}
}
async fn verify(
client: Requests,
algorithm: Option<Algorithm>,
key_id: String,
signature: String,
signing_string: String,
) -> Result<bool, MyError> {
debug!("Fetching actor");
let actor = client.fetch_actor(&key_id.parse()?).await?;
debug!("Parsing public key");
let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key.public_key_pem)?;
match algorithm {
Some(Algorithm::Hs2019) => (),
Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (),
other => {
warn!("Invalid algorithm supplied for signature, {:?}", other);
return Err(MyError::Algorithm);
}
};
debug!("Decoding base64");
let decoded = base64::decode(signature)?;
debug!("hashing");
let hashed = Sha256::digest(signing_string.as_bytes());
debug!("Verifying signature for {}", key_id);
public_key.verify(
PaddingScheme::PKCS1v15,
Some(&Hashes::SHA2_256),
&hashed,
&decoded,
)?;
Ok(true)
}

View file

@ -1,6 +1,6 @@
use crate::{
config::{Config, UrlKind},
state::State,
data::State,
};
use activitystreams::context;
use actix_web::web::Data;

View file

@ -1,8 +1,8 @@
use crate::{
data::{ActorCache, State},
db::listen,
error::MyError,
jobs::{JobServer, QueryInstance, QueryNodeinfo},
state::State,
};
use activitystreams::primitives::XsdAnyUri;
use actix::clock::{delay_for, Duration};
@ -14,7 +14,12 @@ use futures::{
use log::{debug, error, info, warn};
use std::sync::Arc;
async fn handle_notification(state: State, job_server: JobServer, notif: Notification) {
async fn handle_notification(
state: State,
actors: ActorCache,
job_server: JobServer,
notif: Notification,
) {
match notif.channel() {
"new_blocks" => {
info!("Caching block of {}", notif.payload());
@ -32,6 +37,12 @@ async fn handle_notification(state: State, job_server: JobServer, notif: Notific
let _ = job_server.queue_local(QueryNodeinfo::new(uri));
}
}
"new_actors" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Caching follower {}", uri);
actors.cache_follower(uri).await;
}
}
"rm_blocks" => {
info!("Busting block cache for {}", notif.payload());
state.bust_block(notif.payload()).await;
@ -46,12 +57,19 @@ async fn handle_notification(state: State, job_server: JobServer, notif: Notific
state.bust_listener(&uri).await;
}
}
"rm_actors" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Busting follower cache for {}", uri);
actors.bust_follower(&uri).await;
}
}
_ => (),
};
}
pub fn spawn(
state: State,
actors: ActorCache,
job_server: JobServer,
config: &crate::config::Config,
) -> Result<(), MyError> {
@ -97,7 +115,12 @@ pub fn spawn(
});
while let Some(n) = stream.next().await {
actix::spawn(handle_notification(state.clone(), job_server.clone(), n));
actix::spawn(handle_notification(
state.clone(),
actors.clone(),
job_server.clone(),
n,
));
}
drop(client);

View file

@ -1,4 +1,4 @@
use crate::{apub::AcceptedActors, error::MyError, state::ActorCache};
use crate::error::MyError;
use activitystreams::primitives::XsdAnyUri;
use actix_web::client::Client;
use http_signature_normalization_actix::prelude::*;
@ -11,40 +11,21 @@ pub struct Requests {
client: Client,
key_id: String,
private_key: RSAPrivateKey,
actor_cache: ActorCache,
config: Config,
user_agent: String,
}
impl Requests {
pub fn new(
key_id: String,
private_key: RSAPrivateKey,
actor_cache: ActorCache,
user_agent: String,
) -> Self {
pub fn new(key_id: String, private_key: RSAPrivateKey, user_agent: String) -> Self {
Requests {
client: Client::default(),
key_id,
private_key,
actor_cache,
config: Config::default().dont_use_created_field(),
user_agent,
}
}
pub async fn fetch_actor(&self, actor_id: &XsdAnyUri) -> Result<AcceptedActors, MyError> {
if let Some(actor) = self.get_actor(actor_id).await {
return Ok(actor);
}
let actor: AcceptedActors = self.fetch(actor_id.as_str()).await?;
self.cache_actor(actor_id.to_owned(), actor.clone()).await;
Ok(actor)
}
pub async fn fetch<T>(&self, url: &str) -> Result<T, MyError>
where
T: serde::de::DeserializeOwned,
@ -131,18 +112,4 @@ impl Requests {
.sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?;
Ok(base64::encode(bytes))
}
async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option<AcceptedActors> {
let cache = self.actor_cache.clone();
let read_guard = cache.read().await;
read_guard.get(actor_id).cloned()
}
async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) {
let cache = self.actor_cache.clone();
let mut write_guard = cache.write().await;
write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600));
}
}

View file

@ -1,9 +1,9 @@
use crate::{
apub::PublicKey,
config::{Config, UrlKind},
data::State,
error::MyError,
routes::ok,
state::State,
};
use activitystreams::{
actor::Application, context, endpoint::EndpointProperties, ext::Extensible,

View file

@ -1,13 +1,13 @@
use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes},
apub::{AcceptedObjects, ValidTypes},
config::{Config, UrlKind},
data::{Actor, ActorCache, State},
db::Db,
error::MyError,
jobs::JobServer,
jobs::{Deliver, DeliverMany},
requests::Requests,
routes::accepted,
state::State,
};
use activitystreams::{
activity::{Accept, Announce, Follow, Undo},
@ -25,6 +25,7 @@ use std::convert::TryInto;
pub async fn route(
db: web::Data<Db>,
state: web::Data<State>,
actors: web::Data<ActorCache>,
config: web::Data<Config>,
client: web::Data<Requests>,
jobs: web::Data<JobServer>,
@ -34,12 +35,12 @@ pub async fn route(
) -> Result<HttpResponse, MyError> {
let input = input.into_inner();
let actor = client.fetch_actor(&input.actor).await?;
let actor = actors.get(&input.actor, &client).await?;
let (is_blocked, is_whitelisted, is_listener) = join!(
state.is_blocked(&actor.id),
state.is_whitelisted(&actor.id),
state.is_listener(actor.inbox())
state.is_listener(&actor.inbox)
);
if is_blocked {
@ -51,17 +52,17 @@ pub async fn route(
}
if !is_listener && !valid_without_listener(&input) {
return Err(MyError::NotSubscribed(actor.inbox().to_string()));
return Err(MyError::NotSubscribed(actor.inbox.to_string()));
}
if config.validate_signatures() && (digest_verified.is_none() || verified.is_none()) {
return Err(MyError::NoSignature(actor.public_key.id.to_string()));
return Err(MyError::NoSignature(actor.public_key_id.to_string()));
} else if config.validate_signatures() {
if let Some(verified) = verified {
if actor.public_key.id.as_str() != verified.key_id() {
if actor.public_key_id.as_str() != verified.key_id() {
error!("Bad actor, more info: {:?}", input);
return Err(MyError::BadActor(
actor.public_key.id.to_string(),
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
));
}
@ -70,16 +71,28 @@ pub async fn route(
match input.kind {
ValidTypes::Accept => handle_accept(&config, input).await,
ValidTypes::Reject => handle_reject(&db, &config, &jobs, input, actor).await,
ValidTypes::Reject => handle_reject(&db, &actors, &config, &jobs, input, actor).await,
ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &config, &jobs, input, actor).await
}
ValidTypes::Follow => handle_follow(&db, &config, &jobs, input, actor, is_listener).await,
ValidTypes::Follow => {
handle_follow(&db, &actors, &config, &jobs, input, actor, is_listener).await
}
ValidTypes::Delete | ValidTypes::Update => {
handle_forward(&state, &jobs, input, actor).await
}
ValidTypes::Undo => {
handle_undo(&db, &state, &config, &jobs, input, actor, is_listener).await
handle_undo(
&db,
&state,
&actors,
&config,
&jobs,
input,
actor,
is_listener,
)
.await
}
}
}
@ -111,10 +124,11 @@ async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<HttpRe
async fn handle_reject(
db: &Db,
actors: &ActorCache,
config: &Config,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
actor: Actor,
) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") {
return Err(MyError::Kind(
@ -131,13 +145,13 @@ async fn handle_reject(
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?;
let inbox = actor.inbox().to_owned();
db.remove_listener(inbox).await?;
if let Some(_) = actors.unfollower(&actor).await? {
db.remove_listener(actor.inbox.clone()).await?;
}
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, undo.clone())?)?;
jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?;
Ok(accepted(undo))
}
@ -145,10 +159,11 @@ async fn handle_reject(
async fn handle_undo(
db: &Db,
state: &State,
actors: &ActorCache,
config: &Config,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
actor: Actor,
is_listener: bool,
) -> Result<HttpResponse, MyError> {
match input.object.kind() {
@ -180,22 +195,27 @@ async fn handle_undo(
return Ok(accepted(serde_json::json!({})));
}
let inbox = actor.inbox().to_owned();
db.remove_listener(inbox).await?;
let was_following = actors.is_following(&actor.id).await;
if let Some(_) = actors.unfollower(&actor).await? {
db.remove_listener(actor.inbox.clone()).await?;
}
if was_following {
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?;
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, undo.clone())?)?;
return Ok(accepted(undo));
}
Ok(accepted(undo))
Ok(accepted(serde_json::json!({})))
}
async fn handle_forward(
state: &State,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
actor: Actor,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
@ -210,7 +230,7 @@ async fn handle_announce(
config: &Config,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
actor: Actor,
) -> Result<HttpResponse, MyError> {
let object_id = input.object.id();
@ -231,10 +251,11 @@ async fn handle_announce(
async fn handle_follow(
db: &Db,
actors: &ActorCache,
config: &Config,
jobs: &JobServer,
input: AcceptedObjects,
actor: AcceptedActors,
actor: Actor,
is_listener: bool,
) -> Result<HttpResponse, MyError> {
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?;
@ -244,21 +265,20 @@ async fn handle_follow(
}
if !is_listener {
let inbox = actor.inbox().to_owned();
db.add_listener(inbox).await?;
db.add_listener(actor.inbox.clone()).await?;
}
// if following relay directly, not just following 'public', followback
if input.object.is(&my_id) {
if input.object.is(&my_id) && !actors.is_following(&actor.id).await {
let follow = generate_follow(config, &actor.id, &my_id)?;
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, follow)?)?;
}
jobs.queue(Deliver::new(actor.inbox.clone(), follow)?)?;
}
actors.follower(&actor).await?;
let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?;
let inbox = actor.inbox().to_owned();
jobs.queue(Deliver::new(inbox, accept.clone())?)?;
jobs.queue(Deliver::new(actor.inbox, accept.clone())?)?;
Ok(accepted(accept))
}
@ -379,7 +399,7 @@ where
async fn get_inboxes(
state: &State,
actor: &AcceptedActors,
actor: &Actor,
object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> {
let domain = object_id
@ -388,7 +408,5 @@ async fn get_inboxes(
.ok_or(MyError::Domain)?
.to_string();
let inbox = actor.inbox();
Ok(state.listeners_without(&inbox, &domain).await)
Ok(state.listeners_without(&actor.inbox, &domain).await)
}

View file

@ -1,4 +1,4 @@
use crate::{config::Config, error::MyError, state::State};
use crate::{config::Config, data::State, error::MyError};
use actix_web::{web, HttpResponse};
use log::error;
use std::io::BufWriter;

View file

@ -1,6 +1,6 @@
use crate::{
config::{Config, UrlKind},
state::State,
data::State,
};
use actix_web::{web, Responder};
use actix_webfinger::Link;

View file

@ -1,3 +1,15 @@
table! {
actors (id) {
id -> Uuid,
actor_id -> Text,
public_key -> Text,
public_key_id -> Text,
listener_id -> Uuid,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
blocks (id) {
id -> Uuid,
@ -31,6 +43,18 @@ table! {
}
}
table! {
nodes (id) {
id -> Uuid,
listener_id -> Uuid,
nodeinfo -> Nullable<Jsonb>,
instance -> Nullable<Jsonb>,
contact -> Nullable<Jsonb>,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
settings (id) {
id -> Uuid,
@ -50,10 +74,15 @@ table! {
}
}
joinable!(actors -> listeners (listener_id));
joinable!(nodes -> listeners (listener_id));
allow_tables_to_appear_in_same_query!(
actors,
blocks,
jobs,
listeners,
nodes,
settings,
whitelists,
);

View file

@ -1,4 +1,4 @@
@use crate::{config::{Config, UrlKind}, templates::statics::index_css, node::Node};
@use crate::{config::{Config, UrlKind}, templates::statics::index_css, data::Node};
@(nodes: &[Node], config: &Config)