tokio/runtime/blocking/
shutdown.rs

1//! A shutdown channel.
2//!
3//! Each worker holds the `Sender` half. When all the `Sender` halves are
4//! dropped, the `Receiver` receives a notification.
5
6use crate::loom::sync::Arc;
7use crate::sync::oneshot;
8
9use std::time::Duration;
10
11#[derive(Debug, Clone)]
12pub(super) struct Sender {
13    _tx: Arc<oneshot::Sender<()>>,
14}
15
16#[derive(Debug)]
17pub(super) struct Receiver {
18    rx: oneshot::Receiver<()>,
19}
20
21pub(super) fn channel() -> (Sender, Receiver) {
22    let (tx, rx) = oneshot::channel();
23    let tx = Sender { _tx: Arc::new(tx) };
24    let rx = Receiver { rx };
25
26    (tx, rx)
27}
28
29impl Receiver {
30    /// Blocks the current thread until all `Sender` handles drop.
31    ///
32    /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
33    /// duration. If `timeout` is `None`, then the thread is blocked until the
34    /// shutdown signal is received.
35    ///
36    /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
37    pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
38        use crate::runtime::context::try_enter_blocking_region;
39
40        if timeout == Some(Duration::from_nanos(0)) {
41            return false;
42        }
43
44        let mut e = match try_enter_blocking_region() {
45            Some(enter) => enter,
46            _ => {
47                if std::thread::panicking() {
48                    // Don't panic in a panic
49                    return false;
50                } else {
51                    panic!(
52                        "Cannot drop a runtime in a context where blocking is not allowed. \
53                        This happens when a runtime is dropped from within an asynchronous context."
54                    );
55                }
56            }
57        };
58
59        // The oneshot completes with an Err
60        //
61        // If blocking fails to wait, this indicates a problem parking the
62        // current thread (usually, shutting down a runtime stored in a
63        // thread-local).
64        if let Some(timeout) = timeout {
65            e.block_on_timeout(&mut self.rx, timeout).is_ok()
66        } else {
67            let _ = e.block_on(&mut self.rx);
68            true
69        }
70    }
71}