whatamidoing

This commit is contained in:
asonix 2020-03-14 21:05:40 -05:00
commit b7369e2cc0
22 changed files with 2913 additions and 0 deletions

1
.env Normal file
View file

@ -0,0 +1 @@
DATABASE_URL=postgres://ap_actix:ap_actix@localhost:5432/ap_actix

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

2187
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

23
Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "ap-actix"
version = "0.1.0"
authors = ["asonix <asonix@asonix.dog>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0"
actix = "0.10.0-alpha.2"
actix-web = { version = "3.0.0-alpha.1", features = ["openssl"] }
actix-rt = "1.0.0"
activitystreams = "0.5.0-alpha.5"
bb8-postgres = "0.4.0"
dotenv = "0.15.0"
futures = "0.3.4"
log = "0.4"
pretty_env_logger = "0.4.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "0.2.13", features = ["sync"] }

5
diesel.toml Normal file
View file

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"

0
migrations/.gitkeep Normal file
View file

View file

@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View file

@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP INDEX listeners_actor_id_index;
DROP TABLE listeners;

View file

@ -0,0 +1,11 @@
-- Your SQL goes here
CREATE TABLE listeners (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX listeners_actor_id_index ON listeners(actor_id);
SELECT diesel_manage_updated_at('listeners');

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP INDEX blocks_actor_id_index;
DROP TABLE blocks;

View file

@ -0,0 +1,11 @@
-- Your SQL goes here
CREATE TABLE blocks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX blocks_actor_id_index ON blocks(actor_id);
SELECT diesel_manage_updated_at('blocks');

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP INDEX whitelists_actor_id_index;
DROP TABLE whitelists;

View file

@ -0,0 +1,11 @@
-- Your SQL goes here
CREATE TABLE whitelists (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
actor_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX whitelists_actor_id_index ON whitelists(actor_id);
SELECT diesel_manage_updated_at('whitelists');

86
src/apub.rs Normal file
View file

@ -0,0 +1,86 @@
use activitystreams::{
object::{Object, ObjectBox},
primitives::XsdAnyUri,
PropRefs,
};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PropRefs)]
#[serde(rename_all = "camelCase")]
#[prop_refs(Object)]
pub struct AnyExistingObject {
pub id: XsdAnyUri,
#[serde(rename = "type")]
pub kind: String,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "PascalCase")]
pub enum ValidTypes {
Announce,
Create,
Delete,
Follow,
Undo,
}
impl Default for ValidTypes {
fn default() -> Self {
ValidTypes::Create
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
#[serde(rename_all = "camelCase")]
pub enum ValidObjects {
Id(XsdAnyUri),
Object(AnyExistingObject),
}
impl Default for ValidObjects {
fn default() -> Self {
ValidObjects::Id(Default::default())
}
}
#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AcceptedObjects {
pub id: XsdAnyUri,
#[serde(rename = "type")]
pub kind: ValidTypes,
pub actor: XsdAnyUri,
pub object: ValidObjects,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AcceptedActors {
pub id: XsdAnyUri,
#[serde(rename = "type")]
pub kind: String,
pub inbox: XsdAnyUri,
pub endpoints: Endpoints,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Endpoints {
shared_inbox: Option<XsdAnyUri>,
}
impl ValidObjects {
pub fn id(&self) -> &XsdAnyUri {
match self {
ValidObjects::Id(ref id) => id,
ValidObjects::Object(ref obj) => &obj.id,
}
}
}

100
src/cache.rs Normal file
View file

@ -0,0 +1,100 @@
use std::collections::{BTreeMap, HashMap, LinkedList};
pub struct WeightedCache<K, V>
where
K: std::hash::Hash + Eq + Clone,
{
size: usize,
capacity: usize,
forward: HashMap<K, (V, Count)>,
backward: BTreeMap<Count, LinkedList<K>>,
}
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct Count(usize);
impl<K, V> WeightedCache<K, V>
where
K: std::hash::Hash + Eq + Clone,
{
/// Create a new Weighted Cache
///
/// panics if capacity is 0
pub fn new(capacity: usize) -> Self {
if capacity == 0 {
panic!("Cache Capacity must be > 0");
}
WeightedCache {
size: 0,
capacity,
forward: HashMap::new(),
backward: BTreeMap::new(),
}
}
/// Gets a value from the weighted cache
pub fn get(&mut self, key: K) -> Option<&V> {
let (value, count) = self.forward.get_mut(&key)?;
if let Some(v) = self.backward.get_mut(count) {
v.drain_filter(|item| item == &key);
}
count.0 += 1;
let entry = self.backward.entry(*count).or_insert(LinkedList::new());
entry.push_back(key);
Some(&*value)
}
/// set a value in the weighted cache
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
if self.forward.contains_key(&key) {
return None;
}
let ret = if self.size >= self.capacity {
self.remove_least()
} else {
None
};
let count = Count(1);
self.forward.insert(key.clone(), (value, count));
let entry = self.backward.entry(count).or_insert(LinkedList::new());
entry.push_back(key);
self.size += 1;
ret
}
fn remove_least(&mut self) -> Option<V> {
let items = self.backward.values_mut().next()?;
let oldest = items.pop_front()?;
let length = items.len();
drop(items);
let (item, count) = self.forward.remove(&oldest)?;
if length == 0 {
self.backward.remove(&count);
self.backward = self
.backward
.clone()
.into_iter()
.map(|(mut k, v)| {
k.0 -= count.0;
(k, v)
})
.collect();
}
self.size -= 1;
Some(item)
}
}

115
src/db_actor.rs Normal file
View file

@ -0,0 +1,115 @@
use crate::label::ArbiterLabel;
use actix::prelude::*;
use bb8_postgres::{bb8, tokio_postgres, PostgresConnectionManager};
use log::{error, info};
use tokio::sync::oneshot::{channel, Receiver};
pub type Pool = bb8::Pool<PostgresConnectionManager<tokio_postgres::tls::NoTls>>;
pub enum DbActorState {
Waiting(tokio_postgres::Config),
Ready(Pool),
}
pub struct DbActor {
pool: DbActorState,
}
pub struct DbQuery<F>(pub F);
impl DbActor {
pub fn new(config: tokio_postgres::Config) -> Addr<Self> {
Supervisor::start(|_| DbActor {
pool: DbActorState::new_empty(config),
})
}
}
impl DbActorState {
pub fn new_empty(config: tokio_postgres::Config) -> Self {
DbActorState::Waiting(config)
}
pub async fn new(config: tokio_postgres::Config) -> Result<Self, tokio_postgres::error::Error> {
let manager = PostgresConnectionManager::new(config, tokio_postgres::tls::NoTls);
let pool = bb8::Pool::builder().max_size(8).build(manager).await?;
Ok(DbActorState::Ready(pool))
}
}
impl Actor for DbActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Starting DB Actor in {}", ArbiterLabel::get());
match self.pool {
DbActorState::Waiting(ref config) => {
let fut =
DbActorState::new(config.clone())
.into_actor(self)
.map(|res, actor, ctx| {
match res {
Ok(pool) => {
info!("DB pool created in {}", ArbiterLabel::get());
actor.pool = pool;
}
Err(e) => {
error!(
"Error starting DB Actor in {}, {}",
ArbiterLabel::get(),
e
);
ctx.stop();
}
};
});
ctx.wait(fut);
}
_ => (),
};
}
}
impl Supervised for DbActor {}
impl<F, Fut, R> Handler<DbQuery<F>> for DbActor
where
F: Fn(Pool) -> Fut + 'static,
Fut: Future<Output = R>,
R: Send + 'static,
{
type Result = ResponseFuture<Receiver<R>>;
fn handle(&mut self, msg: DbQuery<F>, ctx: &mut Self::Context) -> Self::Result {
let (tx, rx) = channel();
let pool = match self.pool {
DbActorState::Ready(ref pool) => pool.clone(),
_ => {
error!("Tried to query DB before ready");
return Box::pin(async move { rx });
}
};
ctx.spawn(
async move {
let result = (msg.0)(pool).await;
let _ = tx.send(result);
}
.into_actor(self),
);
Box::pin(async move { rx })
}
}
impl<F, Fut, R> Message for DbQuery<F>
where
F: Fn(Pool) -> Fut,
Fut: Future<Output = R>,
R: Send + 'static,
{
type Result = Receiver<R>;
}

53
src/inbox.rs Normal file
View file

@ -0,0 +1,53 @@
use activitystreams::primitives::XsdAnyUri;
use actix::Addr;
use actix_web::{client::Client, web, Responder};
use log::info;
use std::sync::Arc;
use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes},
db_actor::DbActor,
state::State,
};
#[derive(Clone, Debug, thiserror::Error)]
#[error("Something went wrong :(")]
pub struct MyError;
pub async fn inbox(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
client: web::Data<Client>,
input: web::Json<AcceptedObjects>,
) -> Result<impl Responder, MyError> {
let _state = state.into_inner();
let input = input.into_inner();
info!("Relaying {} for {}", input.object.id(), input.actor);
let actor = fetch_actor(client.into_inner(), &input.actor).await?;
info!("Actor, {:#?}", actor);
match input.kind {
ValidTypes::Announce => (),
ValidTypes::Create => (),
ValidTypes::Delete => (),
ValidTypes::Follow => (),
ValidTypes::Undo => (),
}
Ok("{}")
}
async fn fetch_actor(client: Arc<Client>, actor_id: &XsdAnyUri) -> Result<AcceptedActors, MyError> {
client
.get(actor_id.as_ref())
.header("Accept", "application/activity+json")
.send()
.await
.map_err(|_| MyError)?
.json()
.await
.map_err(|_| MyError)
}
impl actix_web::error::ResponseError for MyError {}

33
src/label.rs Normal file
View file

@ -0,0 +1,33 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[derive(Clone, Debug)]
pub struct ArbiterLabelFactory(Arc<AtomicUsize>);
#[derive(Clone, Debug)]
pub struct ArbiterLabel(usize);
impl ArbiterLabelFactory {
pub fn new() -> Self {
ArbiterLabelFactory(Arc::new(AtomicUsize::new(0)))
}
pub fn set_label(&self) {
let id = self.0.fetch_add(1, Ordering::SeqCst);
actix::Arbiter::set_item(ArbiterLabel(id));
}
}
impl ArbiterLabel {
pub fn get() -> ArbiterLabel {
actix::Arbiter::get_item(|label: &ArbiterLabel| label.clone())
}
}
impl std::fmt::Display for ArbiterLabel {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Arbiter #{}", self.0)
}
}

51
src/main.rs Normal file
View file

@ -0,0 +1,51 @@
#![feature(drain_filter)]
use actix_web::{client::Client, web, App, HttpServer, Responder};
use bb8_postgres::tokio_postgres;
mod apub;
mod cache;
mod db_actor;
mod inbox;
mod label;
mod state;
use self::{db_actor::DbActor, label::ArbiterLabelFactory, state::State};
async fn index() -> impl Responder {
"hewwo, mr obama"
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
std::env::set_var("RUST_LOG", "info");
pretty_env_logger::init();
let pg_config: tokio_postgres::Config = std::env::var("DATABASE_URL")?.parse()?;
let arbiter_labeler = ArbiterLabelFactory::new();
let db_actor = DbActor::new(pg_config.clone());
arbiter_labeler.clone().set_label();
let state: State = db_actor
.send(db_actor::DbQuery(State::hydrate))
.await?
.await??;
HttpServer::new(move || {
let actor = DbActor::new(pg_config.clone());
arbiter_labeler.clone().set_label();
let client = Client::default();
App::new()
.data(actor)
.data(state.clone())
.data(client)
.service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/inbox").route(web::post().to(inbox::inbox)))
})
.bind("127.0.0.1:8080")?
.run()
.await?;
Ok(())
}

32
src/schema.rs Normal file
View file

@ -0,0 +1,32 @@
table! {
blocks (id) {
id -> Uuid,
actor_id -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
listeners (id) {
id -> Uuid,
actor_id -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
table! {
whitelists (id) {
id -> Uuid,
actor_id -> Text,
created_at -> Timestamp,
updated_at -> Timestamp,
}
}
allow_tables_to_appear_in_same_query!(
blocks,
listeners,
whitelists,
);

142
src/state.rs Normal file
View file

@ -0,0 +1,142 @@
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use bb8_postgres::tokio_postgres::{row::Row, Client};
use futures::try_join;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::{Mutex, RwLock};
use crate::{cache::WeightedCache, db_actor::Pool};
#[derive(Clone)]
pub struct State {
cache: Arc<Mutex<WeightedCache<XsdAnyUri, XsdAnyUri>>>,
blocks: Arc<RwLock<HashSet<XsdAnyUri>>>,
whitelists: Arc<RwLock<HashSet<XsdAnyUri>>>,
listeners: Arc<RwLock<HashSet<XsdAnyUri>>>,
}
impl State {
pub async fn is_cached(&self, object_id: XsdAnyUri) -> bool {
let cache = self.cache.clone();
let mut lock = cache.lock().await;
lock.get(object_id).is_some()
}
pub async fn cache(&self, object_id: XsdAnyUri, actor_id: XsdAnyUri) {
let cache = self.cache.clone();
let mut lock = cache.lock().await;
lock.insert(object_id, actor_id);
}
pub async fn add_block(&self, client: &Client, block: XsdAnyUri) -> Result<(), Error> {
let blocks = self.blocks.clone();
client
.execute(
"INSERT INTO blocks (actor_id, created_at) VALUES ($1::TEXT, now);",
&[&block.as_ref()],
)
.await?;
let mut write_guard = blocks.write().await;
write_guard.insert(block);
Ok(())
}
pub async fn add_whitelist(&self, client: &Client, whitelist: XsdAnyUri) -> Result<(), Error> {
let whitelists = self.whitelists.clone();
client
.execute(
"INSERT INTO whitelists (actor_id, created_at) VALUES ($1::TEXT, now);",
&[&whitelist.as_ref()],
)
.await?;
let mut write_guard = whitelists.write().await;
write_guard.insert(whitelist);
Ok(())
}
pub async fn add_listener(&self, client: &Client, listener: XsdAnyUri) -> Result<(), Error> {
let listeners = self.listeners.clone();
client
.execute(
"INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, now);",
&[&listener.as_ref()],
)
.await?;
let mut write_guard = listeners.write().await;
write_guard.insert(listener);
Ok(())
}
pub async fn hydrate(pool: Pool) -> Result<Self, Error> {
let pool1 = pool.clone();
let pool2 = pool.clone();
let f1 = async move {
let conn = pool.get().await?;
hydrate_blocks(&conn).await
};
let f2 = async move {
let conn = pool1.get().await?;
hydrate_whitelists(&conn).await
};
let f3 = async move {
let conn = pool2.get().await?;
hydrate_listeners(&conn).await
};
let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?;
Ok(State {
cache: Arc::new(Mutex::new(WeightedCache::new(1024 * 8))),
blocks: Arc::new(RwLock::new(blocks)),
whitelists: Arc::new(RwLock::new(whitelists)),
listeners: Arc::new(RwLock::new(listeners)),
})
}
}
pub async fn hydrate_blocks(client: &Client) -> Result<HashSet<XsdAnyUri>, Error> {
let rows = client.query("SELECT actor_id FROM blocks", &[]).await?;
parse_rows(rows)
}
pub async fn hydrate_whitelists(client: &Client) -> Result<HashSet<XsdAnyUri>, Error> {
let rows = client.query("SELECT actor_id FROM whitelists", &[]).await?;
parse_rows(rows)
}
pub async fn hydrate_listeners(client: &Client) -> Result<HashSet<XsdAnyUri>, Error> {
let rows = client.query("SELECT actor_id FROM listeners", &[]).await?;
parse_rows(rows)
}
pub fn parse_rows(rows: Vec<Row>) -> Result<HashSet<XsdAnyUri>, Error> {
let hs = rows
.into_iter()
.filter_map(move |row| {
let s: String = row.try_get("actor_id").ok()?;
s.parse().ok()
})
.collect();
Ok(hs)
}