Demonstrate deadlock

This commit is contained in:
asonix 2023-11-25 14:43:11 -06:00
commit 2f9440066f
7 changed files with 549 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
/target
/.direnv
/.envrc

221
Cargo.lock generated Normal file
View file

@ -0,0 +1,221 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bumpalo"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "flume"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"spin",
]
[[package]]
name = "flume-deadlock"
version = "0.1.0"
dependencies = [
"flume",
]
[[package]]
name = "futures-core"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-sink"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]]
name = "getrandom"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
name = "js-sys"
version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "libc"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "lock_api"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom",
]
[[package]]
name = "once_cell"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "proc-macro2"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "syn"
version = "2.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b"

9
Cargo.toml Normal file
View file

@ -0,0 +1,9 @@
[package]
name = "flume-deadlock"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flume = "0.11.0"

61
flake.lock Normal file
View file

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1694529238,
"narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1700612854,
"narHash": "sha256-yrQ8osMD+vDLGFX7pcwsY/Qr5PUd6OmDMYJZzZi0+zc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "19cbff58383a4ae384dea4d1d0c823d72b49d614",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

25
flake.nix Normal file
View file

@ -0,0 +1,25 @@
{
description = "async-cpupool";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
packages.default = pkgs.hello;
devShell = with pkgs; mkShell {
nativeBuildInputs = [ cargo cargo-outdated clippy rust-analyzer rustc rustfmt ];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
};
});
}

84
src/main.rs Normal file
View file

@ -0,0 +1,84 @@
mod selector;
fn main() {
let (sender, receiver) = flume::bounded(8);
let mut signals: Vec<flume::Sender<()>> = Vec::new();
let mut launch = true;
for i in 0u64.. {
if i % 100000 == 0 {
println!("looping");
}
if i % 10000 == 0 {
if signals.len() >= 3 || !launch {
launch = false;
if let Some(signal_tx) = signals.pop() {
signal_tx.send(()).expect("Sent");
drop(signal_tx);
}
}
if signals.len() <= 1 || launch {
launch = true;
let (signal_tx, signal) = flume::bounded(1);
signals.push(signal_tx);
let rx2 = receiver.clone();
std::thread::spawn(move || {
println!("Launching thread");
while !race(&rx2, &signal) {
// spin
}
println!("Closing thread");
});
}
}
let (dropper_tx, dropper) = flume::bounded(1);
sender.send(Dropper { sender: dropper_tx }).expect("sent");
dropper.recv().expect("received");
}
}
struct Dropper {
sender: flume::Sender<()>,
}
impl Drop for Dropper {
fn drop(&mut self) {
self.sender.send(()).expect("sent");
}
}
/* working selector
* fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
* match selector::blocking_select(receiver.recv_async(), signal.recv_async()) {
* selector::Either::Left(res) => {
* let out = res.is_err();
* drop(res);
* out
* }
* selector::Either::Right(_res) => true,
* }
* }
*/
// broken selector
fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
flume::Selector::new()
.recv(receiver, |res| {
let out = res.is_err();
drop(res);
out
})
.recv(signal, |_res| true)
.wait()
}

146
src/selector.rs Normal file
View file

@ -0,0 +1,146 @@
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll, Wake, Waker},
};
struct ThreadWaker {
thread: std::thread::Thread,
}
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
self.thread.unpark();
}
fn wake_by_ref(self: &Arc<Self>) {
self.thread.unpark();
}
}
pub(super) enum Either<L, R> {
Left(L),
Right(R),
}
struct Select<F1, F2> {
left: F1,
left_woken: Arc<AtomicBool>,
right: F2,
right_woken: Arc<AtomicBool>,
}
struct SelectWaker {
inner: Waker,
flag: Arc<AtomicBool>,
}
impl Wake for SelectWaker {
fn wake_by_ref(self: &Arc<Self>) {
self.flag.store(true, Ordering::Release);
self.inner.wake_by_ref();
}
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::Release);
match Arc::try_unwrap(self) {
Ok(this) => this.inner.wake(),
Err(this) => this.inner.wake_by_ref(),
}
}
}
impl<F1, F2> Future for Select<F1, F2>
where
F1: Future + Unpin,
F2: Future + Unpin,
{
type Output = Either<F1::Output, F2::Output>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let left_waker = Arc::new(SelectWaker {
inner: cx.waker().clone(),
flag: self.left_woken.clone(),
})
.into();
let mut left_ctx = Context::from_waker(&left_waker);
if let Poll::Ready(left_out) = Pin::new(&mut self.left).poll(&mut left_ctx) {
return Poll::Ready(Either::Left(left_out));
}
let right_waker = Arc::new(SelectWaker {
inner: cx.waker().clone(),
flag: self.right_woken.clone(),
})
.into();
let mut right_ctx = Context::from_waker(&right_waker);
if let Poll::Ready(right_out) = Pin::new(&mut self.right).poll(&mut right_ctx) {
return Poll::Ready(Either::Right(right_out));
}
Poll::Pending
}
}
pub(super) fn blocking_select<Left, Right>(
left: Left,
right: Right,
) -> Either<Left::Output, Right::Output>
where
Left: Future,
Right: Future,
{
block_on(select(left, right))
}
fn block_on<F>(fut: F) -> F::Output
where
F: Future,
{
let thread_waker = Arc::new(ThreadWaker {
thread: std::thread::current(),
})
.into();
let mut ctx = Context::from_waker(&thread_waker);
let mut fut = std::pin::pin!(fut);
loop {
if let Poll::Ready(out) = fut.as_mut().poll(&mut ctx) {
return out;
}
// doesn't race - unpark followed by park will result in park returning immediately
std::thread::park();
}
}
async fn select<Left, Right>(left: Left, right: Right) -> Either<Left::Output, Right::Output>
where
Left: Future,
Right: Future,
{
let left = std::pin::pin!(left);
let right = std::pin::pin!(right);
Select {
left,
left_woken: Arc::new(AtomicBool::new(true)),
right,
right_woken: Arc::new(AtomicBool::new(true)),
}
.await
}