添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

I am trying to use flume in a toy project of mine and I'm seeing some odd behaviour. A summary is:

  • Create 3 send/rx pair incoming to processing-thread
  • Create another pair for an ack channel
  • Spin up a processing thread and a "user" thread
  • Send a,a,b,a from user thread with some sleeps between
  • See on processing thread a,a,a,b with the sleeps showing that it is blocking waiting on the third a
  • This seems odd to me. What am I doing wrong?

    Trivial example below, running on nightly:

    Cargo.toml:
    [dependencies]
    flume = { version = "0.10.1" }
    main.rs
    
    #![allow(unused_imports)]
    #![allow(unused_variables)]
    #![allow(dead_code)]
    // use minalog::*;
    use std::env;
    use std::fs;
    use flume::{Receiver, Sender, SendError, Selector};
    use std::thread;
    use std::thread::sleep;
    use std::time::{Instant, Duration};
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::{Arc, Mutex};
    fn main() {
        let (a_send, a_rx) = flume::unbounded::<u8>();
        let (b_send, b_rx) = flume::unbounded::<u8>();
        let (c_send, c_rx) = flume::unbounded::<u8>();
        let (processed_send, processed_rx) = flume::unbounded::<u8>();
        let last_seen = Arc::new(Mutex::new(Instant::now()));
        let a_running = Arc::new(AtomicBool::new(true));
        thread::spawn(move || {
            let done = |name| {
                processed_send.send(1).unwrap();
            loop {
                let sel = Selector::new().
                    recv(&a_rx, |msg| {
                        let ty = "a";
                        let since = {
                            let mut last_seen = last_seen.lock().unwrap();
                            let now = Instant::now();
                            let s = now.duration_since(*last_seen);
                            *last_seen = now;
                            s.as_millis()
                        println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
                        done(ty)
                    recv(&b_rx, |msg| {
                        let ty = "b";
                        let since = {
                            let mut last_seen = last_seen.lock().unwrap();
                            let now = Instant::now();
                            let s = now.duration_since(*last_seen);
                            *last_seen = now;
                            s.as_millis()
                        println!("{} Got incoming of {:?}, took {:?}ms", ty, msg, since);
                        done(ty)
                    recv(&c_rx, |msg| {
                        println!("Should never get here");
                        done("c")
                match sel.wait_timeout(Duration::from_millis(300)) {
                    Ok(event) => {
                        println!("INFO Just finished event {}", event);
                    Err(e) => {
                        println!("WARN Got nothing for a while in main-loop, this should be impossible");
        thread::spawn(move || {
            println!("Starting sender");
            for _ in 1..10 {
                a_send.send(0).unwrap();
                sleep(Duration::from_millis(20));
                a_send.send(1).unwrap();
                sleep(Duration::from_millis(20));
                b_send.send(2).unwrap();
                sleep(Duration::from_millis(200));
                a_send.send(3).unwrap();
                sleep(Duration::from_millis(20));
            println!("Done sender");
            sleep(Duration::from_secs(5));
        sleep(Duration::from_secs(5));
        println!("Done");
    

    rust versions etc

    $ rustup show Default host: x86_64-pc-windows-msvc rustup home: C:\Users\dave\.rustup installed toolchains -------------------- stable-x86_64-pc-windows-msvc nightly-x86_64-pc-windows-msvc (default) active toolchain ---------------- nightly-x86_64-pc-windows-msvc (default) rustc 1.51.0-nightly (04caa632d 2021-01-30)

    Running your example with some debug logging, it looks like whenever the selector thread gets woken up by a "b" message, it hits this "spurious wakeup" case and goes back to sleep until woken by the next "a" message: select.rs - source

    I haven't figured out why the token isn't set on these wakeups.

    More info: After each successful message is handled, the loop in your select-receive threat creates a new Selector. However, for some reason, the tokens from "b" messages are pushed to a queue owned by a selector from an earlier iteration of the loop (which has since been destroyed).

    This appears to happen because old hooks are not successfully removed in RecvSelection::deinit. I suspect this is because it is comparing the addresses of trait object pointers using Arc::ptr_eq, and running into this issue: https://github.com/rust-lang/rust/issues/46139

    I submitted a fix for this here:

    https://github.com/zesterer/flume/pull/69

    Edit: For now, you can also work around this bug by setting codegen-units = 1 in your Cargo.toml. For example:

    [profile.dev]
    codegen-units = 1
    [profile.release]
    codegen-units = 1
                  

    To avoid similar issues in the future, I think that Arc::ptr_eq should require Self: Sized to avoid people accidentally falling into this trap. To my knowledge, there's no real case in which the current behaviour with trait objects would be desirable (correct me if I'm wrong). This would be a breaking change, but breakages would only be highlighting likely bugs.