tokio/runtime/task/
state.rs

1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5
6pub(super) struct State {
7    val: AtomicUsize,
8}
9
10/// Current state value.
11#[derive(Copy, Clone)]
12pub(super) struct Snapshot(usize);
13
14type UpdateResult = Result<Snapshot, Snapshot>;
15
16/// The task is currently being run.
17const RUNNING: usize = 0b0001;
18
19/// The task is complete.
20///
21/// Once this bit is set, it is never unset.
22const COMPLETE: usize = 0b0010;
23
24/// Extracts the task's lifecycle value from the state.
25const LIFECYCLE_MASK: usize = 0b11;
26
27/// Flag tracking if the task has been pushed into a run queue.
28const NOTIFIED: usize = 0b100;
29
30/// The join handle is still around.
31const JOIN_INTEREST: usize = 0b1_000;
32
33/// A join handle waker has been set.
34const JOIN_WAKER: usize = 0b10_000;
35
36/// The task has been forcibly cancelled.
37const CANCELLED: usize = 0b100_000;
38
39/// All bits.
40const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41
42/// Bits used by the ref count portion of the state.
43const REF_COUNT_MASK: usize = !STATE_MASK;
44
45/// Number of positions to shift the ref count.
46const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
47
48/// One ref count.
49const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
50
51/// State a task is initialized with.
52///
53/// A task is initialized with three references:
54///
55///  * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`.
56///  * A reference that will be sent to the scheduler as an ordinary notification.
57///  * A reference for the `JoinHandle`.
58///
59/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
60/// As the task starts with a `Notified`, `NOTIFIED` is set.
61const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
62
63#[must_use]
64pub(super) enum TransitionToRunning {
65    Success,
66    Cancelled,
67    Failed,
68    Dealloc,
69}
70
71#[must_use]
72pub(super) enum TransitionToIdle {
73    Ok,
74    OkNotified,
75    OkDealloc,
76    Cancelled,
77}
78
79#[must_use]
80pub(super) enum TransitionToNotifiedByVal {
81    DoNothing,
82    Submit,
83    Dealloc,
84}
85
86#[must_use]
87pub(crate) enum TransitionToNotifiedByRef {
88    DoNothing,
89    Submit,
90}
91
92#[must_use]
93pub(super) struct TransitionToJoinHandleDrop {
94    pub(super) drop_waker: bool,
95    pub(super) drop_output: bool,
96}
97
98/// All transitions are performed via RMW operations. This establishes an
99/// unambiguous modification order.
100impl State {
101    /// Returns a task's initial state.
102    pub(super) fn new() -> State {
103        // The raw task returned by this method has a ref-count of three. See
104        // the comment on INITIAL_STATE for more.
105        State {
106            val: AtomicUsize::new(INITIAL_STATE),
107        }
108    }
109
110    /// Loads the current state, establishes `Acquire` ordering.
111    pub(super) fn load(&self) -> Snapshot {
112        Snapshot(self.val.load(Acquire))
113    }
114
115    /// Attempts to transition the lifecycle to `Running`. This sets the
116    /// notified bit to false so notifications during the poll can be detected.
117    pub(super) fn transition_to_running(&self) -> TransitionToRunning {
118        self.fetch_update_action(|mut next| {
119            let action;
120            assert!(next.is_notified());
121
122            if !next.is_idle() {
123                // This happens if the task is either currently running or if it
124                // has already completed, e.g. if it was cancelled during
125                // shutdown. Consume the ref-count and return.
126                next.ref_dec();
127                if next.ref_count() == 0 {
128                    action = TransitionToRunning::Dealloc;
129                } else {
130                    action = TransitionToRunning::Failed;
131                }
132            } else {
133                // We are able to lock the RUNNING bit.
134                next.set_running();
135                next.unset_notified();
136
137                if next.is_cancelled() {
138                    action = TransitionToRunning::Cancelled;
139                } else {
140                    action = TransitionToRunning::Success;
141                }
142            }
143            (action, Some(next))
144        })
145    }
146
147    /// Transitions the task from `Running` -> `Idle`.
148    ///
149    /// The transition to `Idle` fails if the task has been flagged to be
150    /// cancelled.
151    pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
152        self.fetch_update_action(|curr| {
153            assert!(curr.is_running());
154
155            if curr.is_cancelled() {
156                return (TransitionToIdle::Cancelled, None);
157            }
158
159            let mut next = curr;
160            let action;
161            next.unset_running();
162
163            if !next.is_notified() {
164                // Polling the future consumes the ref-count of the Notified.
165                next.ref_dec();
166                if next.ref_count() == 0 {
167                    action = TransitionToIdle::OkDealloc;
168                } else {
169                    action = TransitionToIdle::Ok;
170                }
171            } else {
172                // The caller will schedule a new notification, so we create a
173                // new ref-count for the notification. Our own ref-count is kept
174                // for now, and the caller will drop it shortly.
175                next.ref_inc();
176                action = TransitionToIdle::OkNotified;
177            }
178
179            (action, Some(next))
180        })
181    }
182
183    /// Transitions the task from `Running` -> `Complete`.
184    pub(super) fn transition_to_complete(&self) -> Snapshot {
185        const DELTA: usize = RUNNING | COMPLETE;
186
187        let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
188        assert!(prev.is_running());
189        assert!(!prev.is_complete());
190
191        Snapshot(prev.0 ^ DELTA)
192    }
193
194    /// Transitions from `Complete` -> `Terminal`, decrementing the reference
195    /// count the specified number of times.
196    ///
197    /// Returns true if the task should be deallocated.
198    pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
199        let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
200        assert!(
201            prev.ref_count() >= count,
202            "current: {}, sub: {}",
203            prev.ref_count(),
204            count
205        );
206        prev.ref_count() == count
207    }
208
209    /// Transitions the state to `NOTIFIED`.
210    ///
211    /// If no task needs to be submitted, a ref-count is consumed.
212    ///
213    /// If a task needs to be submitted, the ref-count is incremented for the
214    /// new Notified.
215    pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
216        self.fetch_update_action(|mut snapshot| {
217            let action;
218
219            if snapshot.is_running() {
220                // If the task is running, we mark it as notified, but we should
221                // not submit anything as the thread currently running the
222                // future is responsible for that.
223                snapshot.set_notified();
224                snapshot.ref_dec();
225
226                // The thread that set the running bit also holds a ref-count.
227                assert!(snapshot.ref_count() > 0);
228
229                action = TransitionToNotifiedByVal::DoNothing;
230            } else if snapshot.is_complete() || snapshot.is_notified() {
231                // We do not need to submit any notifications, but we have to
232                // decrement the ref-count.
233                snapshot.ref_dec();
234
235                if snapshot.ref_count() == 0 {
236                    action = TransitionToNotifiedByVal::Dealloc;
237                } else {
238                    action = TransitionToNotifiedByVal::DoNothing;
239                }
240            } else {
241                // We create a new notified that we can submit. The caller
242                // retains ownership of the ref-count they passed in.
243                snapshot.set_notified();
244                snapshot.ref_inc();
245                action = TransitionToNotifiedByVal::Submit;
246            }
247
248            (action, Some(snapshot))
249        })
250    }
251
252    /// Transitions the state to `NOTIFIED`.
253    pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
254        self.fetch_update_action(|mut snapshot| {
255            if snapshot.is_complete() || snapshot.is_notified() {
256                // There is nothing to do in this case.
257                (TransitionToNotifiedByRef::DoNothing, None)
258            } else if snapshot.is_running() {
259                // If the task is running, we mark it as notified, but we should
260                // not submit as the thread currently running the future is
261                // responsible for that.
262                snapshot.set_notified();
263                (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
264            } else {
265                // The task is idle and not notified. We should submit a
266                // notification.
267                snapshot.set_notified();
268                snapshot.ref_inc();
269                (TransitionToNotifiedByRef::Submit, Some(snapshot))
270            }
271        })
272    }
273
274    /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
275    /// count.
276    ///
277    /// Returns `true` if the notified bit was transitioned from `0` to `1`;
278    /// otherwise `false.`
279    #[cfg(all(
280        tokio_unstable,
281        tokio_taskdump,
282        feature = "rt",
283        target_os = "linux",
284        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
285    ))]
286    pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
287        self.fetch_update_action(|mut snapshot| {
288            if snapshot.is_notified() {
289                (false, None)
290            } else {
291                snapshot.set_notified();
292                snapshot.ref_inc();
293                (true, Some(snapshot))
294            }
295        })
296    }
297
298    /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
299    ///
300    /// Returns `true` if the task needs to be submitted to the pool for
301    /// execution.
302    pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
303        self.fetch_update_action(|mut snapshot| {
304            if snapshot.is_cancelled() || snapshot.is_complete() {
305                // Aborts to completed or cancelled tasks are no-ops.
306                (false, None)
307            } else if snapshot.is_running() {
308                // If the task is running, we mark it as cancelled. The thread
309                // running the task will notice the cancelled bit when it
310                // stops polling and it will kill the task.
311                //
312                // The set_notified() call is not strictly necessary but it will
313                // in some cases let a wake_by_ref call return without having
314                // to perform a compare_exchange.
315                snapshot.set_notified();
316                snapshot.set_cancelled();
317                (false, Some(snapshot))
318            } else {
319                // The task is idle. We set the cancelled and notified bits and
320                // submit a notification if the notified bit was not already
321                // set.
322                snapshot.set_cancelled();
323                if !snapshot.is_notified() {
324                    snapshot.set_notified();
325                    snapshot.ref_inc();
326                    (true, Some(snapshot))
327                } else {
328                    (false, Some(snapshot))
329                }
330            }
331        })
332    }
333
334    /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
335    ///
336    /// Returns `true` if the transition to `Running` succeeded.
337    pub(super) fn transition_to_shutdown(&self) -> bool {
338        let mut prev = Snapshot(0);
339
340        let _ = self.fetch_update(|mut snapshot| {
341            prev = snapshot;
342
343            if snapshot.is_idle() {
344                snapshot.set_running();
345            }
346
347            // If the task was not idle, the thread currently running the task
348            // will notice the cancelled bit and cancel it once the poll
349            // completes.
350            snapshot.set_cancelled();
351            Some(snapshot)
352        });
353
354        prev.is_idle()
355    }
356
357    /// Optimistically tries to swap the state assuming the join handle is
358    /// __immediately__ dropped on spawn.
359    pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
360        use std::sync::atomic::Ordering::Relaxed;
361
362        // Relaxed is acceptable as if this function is called and succeeds,
363        // then nothing has been done w/ the join handle.
364        //
365        // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
366        // set, at which point the CAS will fail.
367        //
368        // Given this, there is no risk if this operation is reordered.
369        self.val
370            .compare_exchange_weak(
371                INITIAL_STATE,
372                (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
373                Release,
374                Relaxed,
375            )
376            .map(|_| ())
377            .map_err(|_| ())
378    }
379
380    /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER`
381    /// flag is also unset.
382    /// The returned `TransitionToJoinHandleDrop` indicates whether the `JoinHandle` should drop
383    /// the output of the future or the join waker after the transition.
384    pub(super) fn transition_to_join_handle_dropped(&self) -> TransitionToJoinHandleDrop {
385        self.fetch_update_action(|mut snapshot| {
386            assert!(snapshot.is_join_interested());
387
388            let mut transition = TransitionToJoinHandleDrop {
389                drop_waker: false,
390                drop_output: false,
391            };
392
393            snapshot.unset_join_interested();
394
395            if !snapshot.is_complete() {
396                // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the
397                // `JoinHandle` exclusive access to the waker following rule 6 in task/mod.rs.
398                // The `JoinHandle` will drop the waker if it has exclusive access
399                // to drop it.
400                snapshot.unset_join_waker();
401            } else {
402                // If `COMPLETE` is set the task is completed so the `JoinHandle` is responsible
403                // for dropping the output.
404                transition.drop_output = true;
405            }
406
407            if !snapshot.is_join_waker_set() {
408                // If the `JOIN_WAKER` bit is unset and the `JOIN_HANDLE` has exclusive access to
409                // the join waker and should drop it following this transition.
410                // This might happen in two situations:
411                //  1. The task is not completed and we just unset the `JOIN_WAKer` above in this
412                //     function.
413                //  2. The task is completed. In that case the `JOIN_WAKER` bit was already unset
414                //     by the runtime during completion.
415                transition.drop_waker = true;
416            }
417
418            (transition, Some(snapshot))
419        })
420    }
421
422    /// Sets the `JOIN_WAKER` bit.
423    ///
424    /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
425    /// the task has completed.
426    pub(super) fn set_join_waker(&self) -> UpdateResult {
427        self.fetch_update(|curr| {
428            assert!(curr.is_join_interested());
429            assert!(!curr.is_join_waker_set());
430
431            if curr.is_complete() {
432                return None;
433            }
434
435            let mut next = curr;
436            next.set_join_waker();
437
438            Some(next)
439        })
440    }
441
442    /// Unsets the `JOIN_WAKER` bit.
443    ///
444    /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
445    /// the task has completed.
446    pub(super) fn unset_waker(&self) -> UpdateResult {
447        self.fetch_update(|curr| {
448            assert!(curr.is_join_interested());
449
450            if curr.is_complete() {
451                return None;
452            }
453
454            // If the task is completed, this bit may have been unset by
455            // `unset_waker_after_complete`.
456            assert!(curr.is_join_waker_set());
457
458            let mut next = curr;
459            next.unset_join_waker();
460
461            Some(next)
462        })
463    }
464
465    /// Unsets the `JOIN_WAKER` bit unconditionally after task completion.
466    ///
467    /// This operation requires the task to be completed.
468    pub(super) fn unset_waker_after_complete(&self) -> Snapshot {
469        let prev = Snapshot(self.val.fetch_and(!JOIN_WAKER, AcqRel));
470        assert!(prev.is_complete());
471        assert!(prev.is_join_waker_set());
472        Snapshot(prev.0 & !JOIN_WAKER)
473    }
474
475    pub(super) fn ref_inc(&self) {
476        use std::process;
477        use std::sync::atomic::Ordering::Relaxed;
478
479        // Using a relaxed ordering is alright here, as knowledge of the
480        // original reference prevents other threads from erroneously deleting
481        // the object.
482        //
483        // As explained in the [Boost documentation][1], Increasing the
484        // reference counter can always be done with memory_order_relaxed: New
485        // references to an object can only be formed from an existing
486        // reference, and passing an existing reference from one thread to
487        // another must already provide any required synchronization.
488        //
489        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
490        let prev = self.val.fetch_add(REF_ONE, Relaxed);
491
492        // If the reference count overflowed, abort.
493        if prev > isize::MAX as usize {
494            process::abort();
495        }
496    }
497
498    /// Returns `true` if the task should be released.
499    pub(super) fn ref_dec(&self) -> bool {
500        let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
501        assert!(prev.ref_count() >= 1);
502        prev.ref_count() == 1
503    }
504
505    /// Returns `true` if the task should be released.
506    pub(super) fn ref_dec_twice(&self) -> bool {
507        let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
508        assert!(prev.ref_count() >= 2);
509        prev.ref_count() == 2
510    }
511
512    fn fetch_update_action<F, T>(&self, mut f: F) -> T
513    where
514        F: FnMut(Snapshot) -> (T, Option<Snapshot>),
515    {
516        let mut curr = self.load();
517
518        loop {
519            let (output, next) = f(curr);
520            let next = match next {
521                Some(next) => next,
522                None => return output,
523            };
524
525            let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
526
527            match res {
528                Ok(_) => return output,
529                Err(actual) => curr = Snapshot(actual),
530            }
531        }
532    }
533
534    fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
535    where
536        F: FnMut(Snapshot) -> Option<Snapshot>,
537    {
538        let mut curr = self.load();
539
540        loop {
541            let next = match f(curr) {
542                Some(next) => next,
543                None => return Err(curr),
544            };
545
546            let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
547
548            match res {
549                Ok(_) => return Ok(next),
550                Err(actual) => curr = Snapshot(actual),
551            }
552        }
553    }
554}
555
556// ===== impl Snapshot =====
557
558impl Snapshot {
559    /// Returns `true` if the task is in an idle state.
560    pub(super) fn is_idle(self) -> bool {
561        self.0 & (RUNNING | COMPLETE) == 0
562    }
563
564    /// Returns `true` if the task has been flagged as notified.
565    pub(super) fn is_notified(self) -> bool {
566        self.0 & NOTIFIED == NOTIFIED
567    }
568
569    fn unset_notified(&mut self) {
570        self.0 &= !NOTIFIED;
571    }
572
573    fn set_notified(&mut self) {
574        self.0 |= NOTIFIED;
575    }
576
577    pub(super) fn is_running(self) -> bool {
578        self.0 & RUNNING == RUNNING
579    }
580
581    fn set_running(&mut self) {
582        self.0 |= RUNNING;
583    }
584
585    fn unset_running(&mut self) {
586        self.0 &= !RUNNING;
587    }
588
589    pub(super) fn is_cancelled(self) -> bool {
590        self.0 & CANCELLED == CANCELLED
591    }
592
593    fn set_cancelled(&mut self) {
594        self.0 |= CANCELLED;
595    }
596
597    /// Returns `true` if the task's future has completed execution.
598    pub(super) fn is_complete(self) -> bool {
599        self.0 & COMPLETE == COMPLETE
600    }
601
602    pub(super) fn is_join_interested(self) -> bool {
603        self.0 & JOIN_INTEREST == JOIN_INTEREST
604    }
605
606    fn unset_join_interested(&mut self) {
607        self.0 &= !JOIN_INTEREST;
608    }
609
610    pub(super) fn is_join_waker_set(self) -> bool {
611        self.0 & JOIN_WAKER == JOIN_WAKER
612    }
613
614    fn set_join_waker(&mut self) {
615        self.0 |= JOIN_WAKER;
616    }
617
618    fn unset_join_waker(&mut self) {
619        self.0 &= !JOIN_WAKER;
620    }
621
622    pub(super) fn ref_count(self) -> usize {
623        (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
624    }
625
626    fn ref_inc(&mut self) {
627        assert!(self.0 <= isize::MAX as usize);
628        self.0 += REF_ONE;
629    }
630
631    pub(super) fn ref_dec(&mut self) {
632        assert!(self.ref_count() > 0);
633        self.0 -= REF_ONE;
634    }
635}
636
637impl fmt::Debug for State {
638    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
639        let snapshot = self.load();
640        snapshot.fmt(fmt)
641    }
642}
643
644impl fmt::Debug for Snapshot {
645    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
646        fmt.debug_struct("Snapshot")
647            .field("is_running", &self.is_running())
648            .field("is_complete", &self.is_complete())
649            .field("is_notified", &self.is_notified())
650            .field("is_cancelled", &self.is_cancelled())
651            .field("is_join_interested", &self.is_join_interested())
652            .field("is_join_waker_set", &self.is_join_waker_set())
653            .field("ref_count", &self.ref_count())
654            .finish()
655    }
656}