tokio/runtime/scheduler/
inject.rs

1//! Inject queue used to send wakeups to a work-stealing scheduler
2
3use crate::loom::sync::Mutex;
4use crate::runtime::task;
5
6mod pop;
7pub(crate) use pop::Pop;
8
9mod shared;
10pub(crate) use shared::Shared;
11
12mod synced;
13pub(crate) use synced::Synced;
14
15cfg_rt_multi_thread! {
16    mod rt_multi_thread;
17}
18
19mod metrics;
20
21/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
22/// overflow queue when the local, fixed-size, array queue overflows.
23pub(crate) struct Inject<T: 'static> {
24    shared: Shared<T>,
25    synced: Mutex<Synced>,
26}
27
28impl<T: 'static> Inject<T> {
29    pub(crate) fn new() -> Inject<T> {
30        let (shared, synced) = Shared::new();
31
32        Inject {
33            shared,
34            synced: Mutex::new(synced),
35        }
36    }
37
38    // Kind of annoying to have to include the cfg here
39    #[cfg(tokio_taskdump)]
40    pub(crate) fn is_closed(&self) -> bool {
41        let synced = self.synced.lock();
42        self.shared.is_closed(&synced)
43    }
44
45    /// Closes the injection queue, returns `true` if the queue is open when the
46    /// transition is made.
47    pub(crate) fn close(&self) -> bool {
48        let mut synced = self.synced.lock();
49        self.shared.close(&mut synced)
50    }
51
52    /// Pushes a value into the queue.
53    ///
54    /// This does nothing if the queue is closed.
55    pub(crate) fn push(&self, task: task::Notified<T>) {
56        let mut synced = self.synced.lock();
57        // safety: passing correct `Synced`
58        unsafe { self.shared.push(&mut synced, task) }
59    }
60
61    pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
62        if self.shared.is_empty() {
63            return None;
64        }
65
66        let mut synced = self.synced.lock();
67        // safety: passing correct `Synced`
68        unsafe { self.shared.pop(&mut synced) }
69    }
70}