1//! Coordinates idling workers
23use crate::loom::sync::atomic::AtomicUsize;
4use crate::runtime::scheduler::multi_thread::Shared;
56use std::fmt;
7use std::sync::atomic::Ordering::{self, SeqCst};
89pub(super) struct Idle {
10/// Tracks both the number of searching workers and the number of unparked
11 /// workers.
12 ///
13 /// Used as a fast-path to avoid acquiring the lock when needed.
14state: AtomicUsize,
1516/// Total number of workers.
17num_workers: usize,
18}
1920/// Data synchronized by the scheduler mutex
21pub(super) struct Synced {
22/// Sleeping workers
23sleepers: Vec<usize>,
24}
2526const UNPARK_SHIFT: usize = 16;
27const UNPARK_MASK: usize = !SEARCH_MASK;
28const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
2930#[derive(Copy, Clone)]
31struct State(usize);
3233impl Idle {
34pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
35let init = State::new(num_workers);
3637let idle = Idle {
38 state: AtomicUsize::new(init.into()),
39 num_workers,
40 };
4142let synced = Synced {
43 sleepers: Vec::with_capacity(num_workers),
44 };
4546 (idle, synced)
47 }
4849/// If there are no workers actively searching, returns the index of a
50 /// worker currently sleeping.
51pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
52// If at least one worker is spinning, work being notified will
53 // eventually be found. A searching thread will find **some** work and
54 // notify another worker, eventually leading to our work being found.
55 //
56 // For this to happen, this load must happen before the thread
57 // transitioning `num_searching` to zero. Acquire / Release does not
58 // provide sufficient guarantees, so this load is done with `SeqCst` and
59 // will pair with the `fetch_sub(1)` when transitioning out of
60 // searching.
61if !self.notify_should_wakeup() {
62return None;
63 }
6465// Acquire the lock
66let mut lock = shared.synced.lock();
6768// Check again, now that the lock is acquired
69if !self.notify_should_wakeup() {
70return None;
71 }
7273// A worker should be woken up, atomically increment the number of
74 // searching workers as well as the number of unparked workers.
75State::unpark_one(&self.state, 1);
7677// Get the worker to unpark
78let ret = lock.idle.sleepers.pop();
79debug_assert!(ret.is_some());
8081 ret
82 }
8384/// Returns `true` if the worker needs to do a final check for submitted
85 /// work.
86pub(super) fn transition_worker_to_parked(
87&self,
88 shared: &Shared,
89 worker: usize,
90 is_searching: bool,
91 ) -> bool {
92// Acquire the lock
93let mut lock = shared.synced.lock();
9495// Decrement the number of unparked threads
96let ret = State::dec_num_unparked(&self.state, is_searching);
9798// Track the sleeping worker
99lock.idle.sleepers.push(worker);
100101 ret
102 }
103104pub(super) fn transition_worker_to_searching(&self) -> bool {
105let state = State::load(&self.state, SeqCst);
106if 2 * state.num_searching() >= self.num_workers {
107return false;
108 }
109110// It is possible for this routine to allow more than 50% of the workers
111 // to search. That is OK. Limiting searchers is only an optimization to
112 // prevent too much contention.
113State::inc_num_searching(&self.state, SeqCst);
114true
115}
116117/// A lightweight transition from searching -> running.
118 ///
119 /// Returns `true` if this is the final searching worker. The caller
120 /// **must** notify a new worker.
121pub(super) fn transition_worker_from_searching(&self) -> bool {
122 State::dec_num_searching(&self.state)
123 }
124125/// Unpark a specific worker. This happens if tasks are submitted from
126 /// within the worker's park routine.
127 ///
128 /// Returns `true` if the worker was parked before calling the method.
129pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
130let mut lock = shared.synced.lock();
131let sleepers = &mut lock.idle.sleepers;
132133for index in 0..sleepers.len() {
134if sleepers[index] == worker_id {
135 sleepers.swap_remove(index);
136137// Update the state accordingly while the lock is held.
138State::unpark_one(&self.state, 0);
139140return true;
141 }
142 }
143144false
145}
146147/// Returns `true` if `worker_id` is contained in the sleep set.
148pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
149let lock = shared.synced.lock();
150 lock.idle.sleepers.contains(&worker_id)
151 }
152153fn notify_should_wakeup(&self) -> bool {
154let state = State(self.state.fetch_add(0, SeqCst));
155 state.num_searching() == 0 && state.num_unparked() < self.num_workers
156 }
157}
158159impl State {
160fn new(num_workers: usize) -> State {
161// All workers start in the unparked state
162let ret = State(num_workers << UNPARK_SHIFT);
163debug_assert_eq!(num_workers, ret.num_unparked());
164debug_assert_eq!(0, ret.num_searching());
165 ret
166 }
167168fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
169 State(cell.load(ordering))
170 }
171172fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
173 cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
174 }
175176fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
177 cell.fetch_add(1, ordering);
178 }
179180/// Returns `true` if this is the final searching worker
181fn dec_num_searching(cell: &AtomicUsize) -> bool {
182let state = State(cell.fetch_sub(1, SeqCst));
183 state.num_searching() == 1
184}
185186/// Track a sleeping worker
187 ///
188 /// Returns `true` if this is the final searching worker.
189fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
190let mut dec = 1 << UNPARK_SHIFT;
191192if is_searching {
193 dec += 1;
194 }
195196let prev = State(cell.fetch_sub(dec, SeqCst));
197 is_searching && prev.num_searching() == 1
198}
199200/// Number of workers currently searching
201fn num_searching(self) -> usize {
202self.0 & SEARCH_MASK
203 }
204205/// Number of workers currently unparked
206fn num_unparked(self) -> usize {
207 (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
208 }
209}
210211impl From<usize> for State {
212fn from(src: usize) -> State {
213 State(src)
214 }
215}
216217impl From<State> for usize {
218fn from(src: State) -> usize {
219 src.0
220}
221}
222223impl fmt::Debug for State {
224fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
225 fmt.debug_struct("worker::State")
226 .field("num_unparked", &self.num_unparked())
227 .field("num_searching", &self.num_searching())
228 .finish()
229 }
230}
231232#[test]
233fn test_state() {
234assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
235assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
236237let state = State::new(10);
238assert_eq!(10, state.num_unparked());
239assert_eq!(0, state.num_searching());
240}