tokio/runtime/scheduler/multi_thread/park.rs
1//! Parks the runtime.
2//!
3//! A combination of the various resource driver park handles.
4
5use crate::loom::sync::atomic::AtomicUsize;
6use crate::loom::sync::{Arc, Condvar, Mutex};
7use crate::runtime::driver::{self, Driver};
8use crate::util::TryLock;
9
10use std::sync::atomic::Ordering::SeqCst;
11use std::time::Duration;
12
13#[cfg(loom)]
14use crate::runtime::park::CURRENT_THREAD_PARK_COUNT;
15
16pub(crate) struct Parker {
17 inner: Arc<Inner>,
18}
19
20pub(crate) struct Unparker {
21 inner: Arc<Inner>,
22}
23
24struct Inner {
25 /// Avoids entering the park if possible
26 state: AtomicUsize,
27
28 /// Used to coordinate access to the driver / `condvar`
29 mutex: Mutex<()>,
30
31 /// `Condvar` to block on if the driver is unavailable.
32 condvar: Condvar,
33
34 /// Resource (I/O, time, ...) driver
35 shared: Arc<Shared>,
36}
37
38const EMPTY: usize = 0;
39const PARKED_CONDVAR: usize = 1;
40const PARKED_DRIVER: usize = 2;
41const NOTIFIED: usize = 3;
42
43/// Shared across multiple Parker handles
44struct Shared {
45 /// Shared driver. Only one thread at a time can use this
46 driver: TryLock<Driver>,
47}
48
49impl Parker {
50 pub(crate) fn new(driver: Driver) -> Parker {
51 Parker {
52 inner: Arc::new(Inner {
53 state: AtomicUsize::new(EMPTY),
54 mutex: Mutex::new(()),
55 condvar: Condvar::new(),
56 shared: Arc::new(Shared {
57 driver: TryLock::new(driver),
58 }),
59 }),
60 }
61 }
62
63 pub(crate) fn unpark(&self) -> Unparker {
64 Unparker {
65 inner: self.inner.clone(),
66 }
67 }
68
69 pub(crate) fn park(&mut self, handle: &driver::Handle) {
70 self.inner.park(handle);
71 }
72
73 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
74 // Only parking with zero is supported...
75 assert_eq!(duration, Duration::from_millis(0));
76
77 if let Some(mut driver) = self.inner.shared.driver.try_lock() {
78 driver.park_timeout(handle, duration);
79 } else {
80 // https://github.com/tokio-rs/tokio/issues/6536
81 // Hacky, but it's just for loom tests. The counter gets incremented during
82 // `park_timeout`, but we still have to increment the counter if we can't acquire the
83 // lock.
84 #[cfg(loom)]
85 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
86 }
87 }
88
89 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
90 self.inner.shutdown(handle);
91 }
92}
93
94impl Clone for Parker {
95 fn clone(&self) -> Parker {
96 Parker {
97 inner: Arc::new(Inner {
98 state: AtomicUsize::new(EMPTY),
99 mutex: Mutex::new(()),
100 condvar: Condvar::new(),
101 shared: self.inner.shared.clone(),
102 }),
103 }
104 }
105}
106
107impl Unparker {
108 pub(crate) fn unpark(&self, driver: &driver::Handle) {
109 self.inner.unpark(driver);
110 }
111}
112
113impl Inner {
114 /// Parks the current thread for at most `dur`.
115 fn park(&self, handle: &driver::Handle) {
116 // If we were previously notified then we consume this notification and
117 // return quickly.
118 if self
119 .state
120 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
121 .is_ok()
122 {
123 return;
124 }
125
126 if let Some(mut driver) = self.shared.driver.try_lock() {
127 self.park_driver(&mut driver, handle);
128 } else {
129 self.park_condvar();
130 }
131 }
132
133 fn park_condvar(&self) {
134 // Otherwise we need to coordinate going to sleep
135 let mut m = self.mutex.lock();
136
137 match self
138 .state
139 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
140 {
141 Ok(_) => {}
142 Err(NOTIFIED) => {
143 // We must read here, even though we know it will be `NOTIFIED`.
144 // This is because `unpark` may have been called again since we read
145 // `NOTIFIED` in the `compare_exchange` above. We must perform an
146 // acquire operation that synchronizes with that `unpark` to observe
147 // any writes it made before the call to unpark. To do that we must
148 // read from the write it made to `state`.
149 let old = self.state.swap(EMPTY, SeqCst);
150 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
151
152 return;
153 }
154 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
155 }
156
157 loop {
158 m = self.condvar.wait(m).unwrap();
159
160 if self
161 .state
162 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
163 .is_ok()
164 {
165 // got a notification
166 return;
167 }
168
169 // spurious wakeup, go back to sleep
170 }
171 }
172
173 fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
174 match self
175 .state
176 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
177 {
178 Ok(_) => {}
179 Err(NOTIFIED) => {
180 // We must read here, even though we know it will be `NOTIFIED`.
181 // This is because `unpark` may have been called again since we read
182 // `NOTIFIED` in the `compare_exchange` above. We must perform an
183 // acquire operation that synchronizes with that `unpark` to observe
184 // any writes it made before the call to unpark. To do that we must
185 // read from the write it made to `state`.
186 let old = self.state.swap(EMPTY, SeqCst);
187 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
188
189 return;
190 }
191 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
192 }
193
194 driver.park(handle);
195
196 match self.state.swap(EMPTY, SeqCst) {
197 NOTIFIED => {} // got a notification, hurray!
198 PARKED_DRIVER => {} // no notification, alas
199 n => panic!("inconsistent park_timeout state: {n}"),
200 }
201 }
202
203 fn unpark(&self, driver: &driver::Handle) {
204 // To ensure the unparked thread will observe any writes we made before
205 // this call, we must perform a release operation that `park` can
206 // synchronize with. To do that we must write `NOTIFIED` even if `state`
207 // is already `NOTIFIED`. That is why this must be a swap rather than a
208 // compare-and-swap that returns if it reads `NOTIFIED` on failure.
209 match self.state.swap(NOTIFIED, SeqCst) {
210 EMPTY => {} // no one was waiting
211 NOTIFIED => {} // already unparked
212 PARKED_CONDVAR => self.unpark_condvar(),
213 PARKED_DRIVER => driver.unpark(),
214 actual => panic!("inconsistent state in unpark; actual = {actual}"),
215 }
216 }
217
218 fn unpark_condvar(&self) {
219 // There is a period between when the parked thread sets `state` to
220 // `PARKED` (or last checked `state` in the case of a spurious wake
221 // up) and when it actually waits on `cvar`. If we were to notify
222 // during this period it would be ignored and then when the parked
223 // thread went to sleep it would never wake up. Fortunately, it has
224 // `lock` locked at this stage so we can acquire `lock` to wait until
225 // it is ready to receive the notification.
226 //
227 // Releasing `lock` before the call to `notify_one` means that when the
228 // parked thread wakes it doesn't get woken only to have to wait for us
229 // to release `lock`.
230 drop(self.mutex.lock());
231
232 self.condvar.notify_one();
233 }
234
235 fn shutdown(&self, handle: &driver::Handle) {
236 if let Some(mut driver) = self.shared.driver.try_lock() {
237 driver.shutdown(handle);
238 }
239
240 self.condvar.notify_all();
241 }
242}