tokio/runtime/scheduler/multi_thread/
park.rs

1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::Duration;
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17    inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21    inner: Arc<Inner>,
22}
23
24struct Inner {
25    /// Avoids entering the park if possible
26    state: AtomicUsize,
27
28    /// Used to coordinate access to the driver / `condvar`
29    mutex: Mutex<()>,
30
31    /// `Condvar` to block on if the driver is unavailable.
32    condvar: Condvar,
33
34    /// Resource (I/O, time, ...) driver
35    shared: Arc<Shared>,
36}
37
38const EMPTY: usize = 0;
39const PARKED_CONDVAR: usize = 1;
40const PARKED_DRIVER: usize = 2;
41const NOTIFIED: usize = 3;
42
43/// Shared across multiple Parker handles
44struct Shared {
45    /// Shared driver. Only one thread at a time can use this
46    driver: TryLock<Driver>,
47}
48
49impl Parker {
50    pub(crate) fn new(driver: Driver) -> Parker {
51        Parker {
52            inner: Arc::new(Inner {
53                state: AtomicUsize::new(EMPTY),
54                mutex: Mutex::new(()),
55                condvar: Condvar::new(),
56                shared: Arc::new(Shared {
57                    driver: TryLock::new(driver),
58                }),
59            }),
60        }
61    }
62
63    pub(crate) fn unpark(&self) -> Unparker {
64        Unparker {
65            inner: self.inner.clone(),
66        }
67    }
68
69    pub(crate) fn park(&mut self, handle: &driver::Handle) {
70        self.inner.park(handle);
71    }
72
73    pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
74        // Only parking with zero is supported...
75        assert_eq!(duration, Duration::from_millis(0));
76
77        if let Some(mut driver) = self.inner.shared.driver.try_lock() {
78            driver.park_timeout(handle, duration);
79        } else {
80            // https://github.com/tokio-rs/tokio/issues/6536
81            // Hacky, but it's just for loom tests. The counter gets incremented during
82            // `park_timeout`, but we still have to increment the counter if we can't acquire the
83            // lock.
84            #[cfg(loom)]
85            CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
86        }
87    }
88
89    pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
90        self.inner.shutdown(handle);
91    }
92}
93
94impl Clone for Parker {
95    fn clone(&self) -> Parker {
96        Parker {
97            inner: Arc::new(Inner {
98                state: AtomicUsize::new(EMPTY),
99                mutex: Mutex::new(()),
100                condvar: Condvar::new(),
101                shared: self.inner.shared.clone(),
102            }),
103        }
104    }
105}
106
107impl Unparker {
108    pub(crate) fn unpark(&self, driver: &driver::Handle) {
109        self.inner.unpark(driver);
110    }
111}
112
113impl Inner {
114    /// Parks the current thread for at most `dur`.
115    fn park(&self, handle: &driver::Handle) {
116        // If we were previously notified then we consume this notification and
117        // return quickly.
118        if self
119            .state
120            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
121            .is_ok()
122        {
123            return;
124        }
125
126        if let Some(mut driver) = self.shared.driver.try_lock() {
127            self.park_driver(&mut driver, handle);
128        } else {
129            self.park_condvar();
130        }
131    }
132
133    fn park_condvar(&self) {
134        // Otherwise we need to coordinate going to sleep
135        let mut m = self.mutex.lock();
136
137        match self
138            .state
139            .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
140        {
141            Ok(_) => {}
142            Err(NOTIFIED) => {
143                // We must read here, even though we know it will be `NOTIFIED`.
144                // This is because `unpark` may have been called again since we read
145                // `NOTIFIED` in the `compare_exchange` above. We must perform an
146                // acquire operation that synchronizes with that `unpark` to observe
147                // any writes it made before the call to unpark. To do that we must
148                // read from the write it made to `state`.
149                let old = self.state.swap(EMPTY, SeqCst);
150                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
151
152                return;
153            }
154            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
155        }
156
157        loop {
158            m = self.condvar.wait(m).unwrap();
159
160            if self
161                .state
162                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
163                .is_ok()
164            {
165                // got a notification
166                return;
167            }
168
169            // spurious wakeup, go back to sleep
170        }
171    }
172
173    fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
174        match self
175            .state
176            .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
177        {
178            Ok(_) => {}
179            Err(NOTIFIED) => {
180                // We must read here, even though we know it will be `NOTIFIED`.
181                // This is because `unpark` may have been called again since we read
182                // `NOTIFIED` in the `compare_exchange` above. We must perform an
183                // acquire operation that synchronizes with that `unpark` to observe
184                // any writes it made before the call to unpark. To do that we must
185                // read from the write it made to `state`.
186                let old = self.state.swap(EMPTY, SeqCst);
187                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
188
189                return;
190            }
191            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
192        }
193
194        driver.park(handle);
195
196        match self.state.swap(EMPTY, SeqCst) {
197            NOTIFIED => {}      // got a notification, hurray!
198            PARKED_DRIVER => {} // no notification, alas
199            n => panic!("inconsistent park_timeout state: {n}"),
200        }
201    }
202
203    fn unpark(&self, driver: &driver::Handle) {
204        // To ensure the unparked thread will observe any writes we made before
205        // this call, we must perform a release operation that `park` can
206        // synchronize with. To do that we must write `NOTIFIED` even if `state`
207        // is already `NOTIFIED`. That is why this must be a swap rather than a
208        // compare-and-swap that returns if it reads `NOTIFIED` on failure.
209        match self.state.swap(NOTIFIED, SeqCst) {
210            EMPTY => {}    // no one was waiting
211            NOTIFIED => {} // already unparked
212            PARKED_CONDVAR => self.unpark_condvar(),
213            PARKED_DRIVER => driver.unpark(),
214            actual => panic!("inconsistent state in unpark; actual = {actual}"),
215        }
216    }
217
218    fn unpark_condvar(&self) {
219        // There is a period between when the parked thread sets `state` to
220        // `PARKED` (or last checked `state` in the case of a spurious wake
221        // up) and when it actually waits on `cvar`. If we were to notify
222        // during this period it would be ignored and then when the parked
223        // thread went to sleep it would never wake up. Fortunately, it has
224        // `lock` locked at this stage so we can acquire `lock` to wait until
225        // it is ready to receive the notification.
226        //
227        // Releasing `lock` before the call to `notify_one` means that when the
228        // parked thread wakes it doesn't get woken only to have to wait for us
229        // to release `lock`.
230        drop(self.mutex.lock());
231
232        self.condvar.notify_one();
233    }
234
235    fn shutdown(&self, handle: &driver::Handle) {
236        if let Some(mut driver) = self.shared.driver.try_lock() {
237            driver.shutdown(handle);
238        }
239
240        self.condvar.notify_all();
241    }
242}