commit 2f9440066f6c2b33b566d218a50024881b6fbfa1 Author: asonix Date: Sat Nov 25 14:43:11 2023 -0600 Demonstrate deadlock diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1cbd80d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +/.direnv +/.envrc diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..0a15998 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,221 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + +[[package]] +name = "flume-deadlock" +version = "0.1.0" +dependencies = [ + "flume", +] + +[[package]] +name = "futures-core" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" + +[[package]] +name = "futures-sink" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" + +[[package]] +name = "getrandom" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + +[[package]] +name = "js-sys" +version = "0.3.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "proc-macro2" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "syn" +version = "2.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasm-bindgen" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..23068c1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "flume-deadlock" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flume = "0.11.0" diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..3a84ed9 --- /dev/null +++ b/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1694529238, + "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "ff7b65b44d01cf9ba6a71320833626af21126384", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1700612854, + "narHash": "sha256-yrQ8osMD+vDLGFX7pcwsY/Qr5PUd6OmDMYJZzZi0+zc=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "19cbff58383a4ae384dea4d1d0c823d72b49d614", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..4b8a515 --- /dev/null +++ b/flake.nix @@ -0,0 +1,25 @@ +{ + description = "async-cpupool"; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = import nixpkgs { + inherit system; + }; + in + { + packages.default = pkgs.hello; + + devShell = with pkgs; mkShell { + nativeBuildInputs = [ cargo cargo-outdated clippy rust-analyzer rustc rustfmt ]; + + RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}"; + }; + }); +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..cdce7c9 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,84 @@ +mod selector; + +fn main() { + let (sender, receiver) = flume::bounded(8); + + let mut signals: Vec> = Vec::new(); + + let mut launch = true; + + for i in 0u64.. { + if i % 100000 == 0 { + println!("looping"); + } + + if i % 10000 == 0 { + if signals.len() >= 3 || !launch { + launch = false; + + if let Some(signal_tx) = signals.pop() { + signal_tx.send(()).expect("Sent"); + drop(signal_tx); + } + } + + if signals.len() <= 1 || launch { + launch = true; + + let (signal_tx, signal) = flume::bounded(1); + + signals.push(signal_tx); + + let rx2 = receiver.clone(); + std::thread::spawn(move || { + println!("Launching thread"); + while !race(&rx2, &signal) { + // spin + } + println!("Closing thread"); + }); + } + } + + let (dropper_tx, dropper) = flume::bounded(1); + + sender.send(Dropper { sender: dropper_tx }).expect("sent"); + + dropper.recv().expect("received"); + } +} + +struct Dropper { + sender: flume::Sender<()>, +} + +impl Drop for Dropper { + fn drop(&mut self) { + self.sender.send(()).expect("sent"); + } +} + +/* working selector + * fn race(receiver: &flume::Receiver, signal: &flume::Receiver<()>) -> bool { + * match selector::blocking_select(receiver.recv_async(), signal.recv_async()) { + * selector::Either::Left(res) => { + * let out = res.is_err(); + * drop(res); + * out + * } + * selector::Either::Right(_res) => true, + * } + * } + */ + +// broken selector +fn race(receiver: &flume::Receiver, signal: &flume::Receiver<()>) -> bool { + flume::Selector::new() + .recv(receiver, |res| { + let out = res.is_err(); + drop(res); + out + }) + .recv(signal, |_res| true) + .wait() +} diff --git a/src/selector.rs b/src/selector.rs new file mode 100644 index 0000000..5599c9d --- /dev/null +++ b/src/selector.rs @@ -0,0 +1,146 @@ +use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll, Wake, Waker}, +}; + +struct ThreadWaker { + thread: std::thread::Thread, +} + +impl Wake for ThreadWaker { + fn wake(self: Arc) { + self.thread.unpark(); + } + + fn wake_by_ref(self: &Arc) { + self.thread.unpark(); + } +} + +pub(super) enum Either { + Left(L), + Right(R), +} + +struct Select { + left: F1, + left_woken: Arc, + + right: F2, + right_woken: Arc, +} + +struct SelectWaker { + inner: Waker, + flag: Arc, +} + +impl Wake for SelectWaker { + fn wake_by_ref(self: &Arc) { + self.flag.store(true, Ordering::Release); + + self.inner.wake_by_ref(); + } + + fn wake(self: Arc) { + self.flag.store(true, Ordering::Release); + + match Arc::try_unwrap(self) { + Ok(this) => this.inner.wake(), + Err(this) => this.inner.wake_by_ref(), + } + } +} + +impl Future for Select +where + F1: Future + Unpin, + F2: Future + Unpin, +{ + type Output = Either; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let left_waker = Arc::new(SelectWaker { + inner: cx.waker().clone(), + flag: self.left_woken.clone(), + }) + .into(); + + let mut left_ctx = Context::from_waker(&left_waker); + + if let Poll::Ready(left_out) = Pin::new(&mut self.left).poll(&mut left_ctx) { + return Poll::Ready(Either::Left(left_out)); + } + + let right_waker = Arc::new(SelectWaker { + inner: cx.waker().clone(), + flag: self.right_woken.clone(), + }) + .into(); + + let mut right_ctx = Context::from_waker(&right_waker); + + if let Poll::Ready(right_out) = Pin::new(&mut self.right).poll(&mut right_ctx) { + return Poll::Ready(Either::Right(right_out)); + } + + Poll::Pending + } +} + +pub(super) fn blocking_select( + left: Left, + right: Right, +) -> Either +where + Left: Future, + Right: Future, +{ + block_on(select(left, right)) +} + +fn block_on(fut: F) -> F::Output +where + F: Future, +{ + let thread_waker = Arc::new(ThreadWaker { + thread: std::thread::current(), + }) + .into(); + + let mut ctx = Context::from_waker(&thread_waker); + + let mut fut = std::pin::pin!(fut); + + loop { + if let Poll::Ready(out) = fut.as_mut().poll(&mut ctx) { + return out; + } + + // doesn't race - unpark followed by park will result in park returning immediately + std::thread::park(); + } +} + +async fn select(left: Left, right: Right) -> Either +where + Left: Future, + Right: Future, +{ + let left = std::pin::pin!(left); + let right = std::pin::pin!(right); + + Select { + left, + left_woken: Arc::new(AtomicBool::new(true)), + + right, + right_woken: Arc::new(AtomicBool::new(true)), + } + .await +}