Handle notify dying, add env_logger option

This commit is contained in:
asonix 2020-03-20 10:09:42 -05:00
parent 7538041b67
commit 65ce77898a
6 changed files with 34 additions and 6 deletions

1
Cargo.lock generated
View file

@ -1759,6 +1759,7 @@ dependencies = [
"bb8-postgres", "bb8-postgres",
"config", "config",
"dotenv", "dotenv",
"env_logger",
"futures", "futures",
"http-signature-normalization-actix", "http-signature-normalization-actix",
"log", "log",

View file

@ -22,6 +22,7 @@ base64 = "0.12"
bb8-postgres = "0.4.0" bb8-postgres = "0.4.0"
config = "0.10.1" config = "0.10.1"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.7.1"
futures = "0.3.4" futures = "0.3.4"
http-signature-normalization-actix = { version = "0.3.0-alpha.7", default-features = false, features = ["sha-2"] } http-signature-normalization-actix = { version = "0.3.0-alpha.7", default-features = false, features = ["sha-2"] }
log = "0.4" log = "0.4"

View file

@ -58,6 +58,7 @@ WHITELIST_MODE=false
VALIDATE_SIGNATURES=false VALIDATE_SIGNATURES=false
HTTPS=false HTTPS=false
DATABASE_URL= DATABASE_URL=
PRETTY_LOG=true
``` ```
To run this server in production, you'll likely want to set most of them To run this server in production, you'll likely want to set most of them
```env ```env
@ -69,6 +70,7 @@ WHITELIST_MODE=false
VALIDATE_SIGNATURES=true VALIDATE_SIGNATURES=true
HTTPS=true HTTPS=true
DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database
PRETTY_LOG=false
``` ```
### Contributing ### Contributing

View file

@ -15,6 +15,7 @@ pub struct Config {
validate_signatures: bool, validate_signatures: bool,
https: bool, https: bool,
database_url: String, database_url: String,
pretty_log: bool,
} }
pub enum UrlKind { pub enum UrlKind {
@ -39,11 +40,16 @@ impl Config {
.set_default("whitelist_mode", false)? .set_default("whitelist_mode", false)?
.set_default("validate_signatures", false)? .set_default("validate_signatures", false)?
.set_default("https", false)? .set_default("https", false)?
.set_default("pretty_log", true)?
.merge(Environment::new())?; .merge(Environment::new())?;
Ok(config.try_into()?) Ok(config.try_into()?)
} }
pub fn pretty_log(&self) -> bool {
self.pretty_log
}
pub fn validate_signatures(&self) -> bool { pub fn validate_signatures(&self) -> bool {
self.validate_signatures self.validate_signatures
} }

View file

@ -56,7 +56,11 @@ async fn main() -> Result<(), anyhow::Error> {
std::env::set_var("RUST_LOG", "info") std::env::set_var("RUST_LOG", "info")
} }
pretty_env_logger::init(); if config.pretty_log() {
pretty_env_logger::init();
} else {
env_logger::init();
}
let pg_config: tokio_postgres::Config = config.database_url().parse()?; let pg_config: tokio_postgres::Config = config.database_url().parse()?;
let db = Db::build(pg_config.clone()).await?; let db = Db::build(pg_config.clone()).await?;

View file

@ -6,12 +6,15 @@ use futures::{
future::ready, future::ready,
stream::{poll_fn, StreamExt}, stream::{poll_fn, StreamExt},
}; };
use log::{debug, error, info}; use log::{debug, error, info, warn};
use tokio::sync::mpsc; use tokio::sync::mpsc;
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct Notify(Notification); pub enum Notify {
Msg(Notification),
Done,
}
pub struct NotifyHandler { pub struct NotifyHandler {
client: Option<Client>, client: Option<Client>,
@ -37,6 +40,7 @@ impl Actor for NotifyHandler {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
info!("Starting notify handler");
let config = self.config.clone(); let config = self.config.clone();
let fut = async move { let fut = async move {
@ -51,7 +55,7 @@ impl Actor for NotifyHandler {
let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m {
Ok(AsyncMessage::Notification(n)) => { Ok(AsyncMessage::Notification(n)) => {
debug!("Handling Notification, {:?}", n); debug!("Handling Notification, {:?}", n);
ready(Some(Notify(n))) ready(Some(Notify::Msg(n)))
} }
Ok(AsyncMessage::Notice(e)) => { Ok(AsyncMessage::Notice(e)) => {
debug!("Handling Notice, {:?}", e); debug!("Handling Notice, {:?}", e);
@ -77,7 +81,8 @@ impl Actor for NotifyHandler {
_ => (), _ => (),
}; };
} }
debug!("Stream handler ended"); warn!("Stream handler ended");
let _ = tx.send(Notify::Done).await;
}); });
Ok((client, rx)) Ok((client, rx))
@ -116,7 +121,16 @@ impl Actor for NotifyHandler {
} }
impl StreamHandler<Notify> for NotifyHandler { impl StreamHandler<Notify> for NotifyHandler {
fn handle(&mut self, Notify(notif): Notify, ctx: &mut Self::Context) { fn handle(&mut self, notify: Notify, ctx: &mut Self::Context) {
let notif = match notify {
Notify::Msg(notif) => notif,
Notify::Done => {
warn!("Stopping notify handler");
ctx.stop();
return;
}
};
let state = self.state.clone(); let state = self.state.clone();
let fut = async move { let fut = async move {