Update to tokio 1

This commit is contained in:
asonix 2021-05-16 12:42:00 -05:00
parent 586c6b2a58
commit 4f5ceb0eaf
4 changed files with 255 additions and 382 deletions

575
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -13,16 +13,16 @@ edition = "2018"
[dependencies]
anyhow = "1.0"
async-lock = "1.1.2"
async-mutex = "1.1.2"
bytes = "0.5"
async-mutex = "1.4.0"
bytes = "1"
log = "0.4"
env_logger = "0.7.1"
futures = "0.3.4"
rand = "0.7"
structopt = "0.3.14"
tokio = { version = "0.2.20", features = ["full"] }
tokio-rustls = "0.13.0"
tokio-util = { version = "0.3.1", features = ["codec", "udp"] }
env_logger = "0.8.0"
futures = "0.3.15"
rand = "0.8"
structopt = "0.3.21"
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.22.0"
tokio-stream = { version = "0.1.6", features = ["net", "io-util"] }
tokio-util = { version = "0.6.7", features = ["codec", "net"] }
ttl_cache = "0.5.1"
webpki-roots = "0.19.0"
webpki-roots = "0.21.0"

View file

@ -1,5 +1,4 @@
use crate::cache::RequestCache;
use async_lock::Lock;
use bytes::Bytes;
use futures::{sink::SinkExt, stream::StreamExt, try_join};
use log::{debug, error};
@ -67,18 +66,18 @@ impl Config {
pub async fn run(
config: Config,
rx: Lock<Receiver<(Bytes, SocketAddr)>>,
mut rx: Receiver<(Bytes, SocketAddr)>,
tx: Sender<(Bytes, SocketAddr)>,
) {
while let Err(e) = do_run(config.clone(), rx.clone(), tx.clone()).await {
while let Err(e) = do_run(config.clone(), &mut rx, tx.clone()).await {
error!("Error in forwarder, {}", e);
}
}
async fn do_run(
config: Config,
rx: Lock<Receiver<(Bytes, SocketAddr)>>,
mut tx: Sender<(Bytes, SocketAddr)>,
rx: &mut Receiver<(Bytes, SocketAddr)>,
tx: Sender<(Bytes, SocketAddr)>,
) -> Result<(), io::Error> {
let any_addr = "0.0.0.0:0".parse::<SocketAddr>().unwrap();
@ -88,16 +87,12 @@ async fn do_run(
let fallback = config.fallback;
let conn = config.connect().await?;
let (mut sender, mut receiver) = conn.split();
let cache = config.cache.clone();
let use_fallback = config.use_fallback.clone();
let f1 = async move {
debug!("Locking");
let mut rx_guard = rx.lock().await;
while let Some((bytes, addr)) = rx_guard.next().await {
while let Some((bytes, addr)) = rx.recv().await {
debug!(
"Inserting {} for {}{}",
addr,
@ -116,7 +111,7 @@ async fn do_run(
};
let cache = config.cache.clone();
let mut tx2 = tx.clone();
let tx2 = tx.clone();
let f2 = async move {
while let Some(res) = receiver.next().await {
let bytes = res?.freeze();

View file

@ -1,4 +1,3 @@
use async_lock::Lock;
use bytes::BytesMut;
use futures::{
future::try_join,
@ -17,6 +16,7 @@ use std::{
use structopt::StructOpt;
use tokio::{net::UdpSocket, sync::mpsc::channel};
use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
use webpki_roots::TLS_SERVER_ROOTS;
@ -86,8 +86,8 @@ async fn main() -> Result<(), anyhow::Error> {
fallback,
);
let (mut ctrl_cmd_tx, ctrl_cmd_rx) = channel(8);
let (mut ctrl_dns_tx, mut ctrl_dns_rx) = channel(8);
let (ctrl_cmd_tx, ctrl_cmd_rx) = channel(8);
let (ctrl_dns_tx, mut ctrl_dns_rx) = channel(8);
let mut unordered = FuturesUnordered::new();
let mut txs = vec![];
@ -96,19 +96,15 @@ async fn main() -> Result<(), anyhow::Error> {
for _ in 0..16 {
let (tx1, rx1) = channel(32);
let (tx2, rx2) = channel(32);
unordered.push(tokio::spawn(self::conn::run(
config.clone(),
Lock::new(rx1),
tx2,
)));
unordered.push(tokio::spawn(self::conn::run(config.clone(), rx1, tx2)));
txs.push(tx1);
rxs.push(rx2);
rxs.push(ReceiverStream::new(rx2));
}
let any_addr = "0.0.0.0:0".parse::<SocketAddr>().unwrap();
let ctrl_fut = tokio::spawn(async move {
use tokio::time::{delay_for, interval, timeout, Duration};
use tokio::time::{interval, sleep, timeout, Duration};
let mut i = interval(Duration::from_secs(5));
@ -121,13 +117,13 @@ async fn main() -> Result<(), anyhow::Error> {
} else if fail_count == max_fails {
error!("Reached max failures, falling back to plaintext");
use_fallback.store(true, Ordering::Relaxed);
delay_for(Duration::from_secs(60)).await;
sleep(Duration::from_secs(60)).await;
i = interval(Duration::from_secs(5));
}
let _ = ctrl_cmd_tx.send(Ok((build_test_query(), any_addr))).await;
if let Err(_) = timeout(Duration::from_secs(2), ctrl_dns_rx.next()).await {
if let Err(_) = timeout(Duration::from_secs(2), ctrl_dns_rx.recv()).await {
warn!("Failed to get response for test query");
fail_count += 1;
} else {
@ -166,7 +162,8 @@ async fn main() -> Result<(), anyhow::Error> {
let request_fut = async move {
let mut idx = 0;
let max_idx = txs.len();
let mut stream = select(udp_stream, ctrl_cmd_rx);
let ctrl_cmd_rx_stream = ReceiverStream::new(ctrl_cmd_rx);
let mut stream = select(udp_stream, ctrl_cmd_rx_stream);
while let Some(res) = stream.next().await {
let (bytes, addr) = res?;