1//! A shutdown channel.
2//!
3//! Each worker holds the `Sender` half. When all the `Sender` halves are
4//! dropped, the `Receiver` receives a notification.
56use crate::loom::sync::Arc;
7use crate::sync::oneshot;
89use std::time::Duration;
1011#[derive(Debug, Clone)]
12pub(super) struct Sender {
13 _tx: Arc<oneshot::Sender<()>>,
14}
1516#[derive(Debug)]
17pub(super) struct Receiver {
18 rx: oneshot::Receiver<()>,
19}
2021pub(super) fn channel() -> (Sender, Receiver) {
22let (tx, rx) = oneshot::channel();
23let tx = Sender { _tx: Arc::new(tx) };
24let rx = Receiver { rx };
2526 (tx, rx)
27}
2829impl Receiver {
30/// Blocks the current thread until all `Sender` handles drop.
31 ///
32 /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
33 /// duration. If `timeout` is `None`, then the thread is blocked until the
34 /// shutdown signal is received.
35 ///
36 /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
37pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
38use crate::runtime::context::try_enter_blocking_region;
3940if timeout == Some(Duration::from_nanos(0)) {
41return false;
42 }
4344let mut e = match try_enter_blocking_region() {
45Some(enter) => enter,
46_ => {
47if std::thread::panicking() {
48// Don't panic in a panic
49return false;
50 } else {
51panic!(
52"Cannot drop a runtime in a context where blocking is not allowed. \
53 This happens when a runtime is dropped from within an asynchronous context."
54);
55 }
56 }
57 };
5859// The oneshot completes with an Err
60 //
61 // If blocking fails to wait, this indicates a problem parking the
62 // current thread (usually, shutting down a runtime stored in a
63 // thread-local).
64if let Some(timeout) = timeout {
65 e.block_on_timeout(&mut self.rx, timeout).is_ok()
66 } else {
67let _ = e.block_on(&mut self.rx);
68true
69}
70 }
71}