tokio/runtime/task/
harness.rs

1use crate::future::Future;
2use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3use crate::runtime::task::state::{Snapshot, State};
4use crate::runtime::task::waker::waker_ref;
5use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6
7use crate::runtime::TaskMeta;
8use std::any::Any;
9use std::mem;
10use std::mem::ManuallyDrop;
11use std::panic;
12use std::ptr::NonNull;
13use std::task::{Context, Poll, Waker};
14
15/// Typed raw task handle.
16pub(super) struct Harness<T: Future, S: 'static> {
17    cell: NonNull<Cell<T, S>>,
18}
19
20impl<T, S> Harness<T, S>
21where
22    T: Future,
23    S: 'static,
24{
25    pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
26        Harness {
27            cell: ptr.cast::<Cell<T, S>>(),
28        }
29    }
30
31    fn header_ptr(&self) -> NonNull<Header> {
32        self.cell.cast()
33    }
34
35    fn header(&self) -> &Header {
36        unsafe { &*self.header_ptr().as_ptr() }
37    }
38
39    fn state(&self) -> &State {
40        &self.header().state
41    }
42
43    fn trailer(&self) -> &Trailer {
44        unsafe { &self.cell.as_ref().trailer }
45    }
46
47    fn core(&self) -> &Core<T, S> {
48        unsafe { &self.cell.as_ref().core }
49    }
50}
51
52/// Task operations that can be implemented without being generic over the
53/// scheduler or task. Only one version of these methods should exist in the
54/// final binary.
55impl RawTask {
56    pub(super) fn drop_reference(self) {
57        if self.state().ref_dec() {
58            self.dealloc();
59        }
60    }
61
62    /// This call consumes a ref-count and notifies the task. This will create a
63    /// new Notified and submit it if necessary.
64    ///
65    /// The caller does not need to hold a ref-count besides the one that was
66    /// passed to this call.
67    pub(super) fn wake_by_val(&self) {
68        use super::state::TransitionToNotifiedByVal;
69
70        match self.state().transition_to_notified_by_val() {
71            TransitionToNotifiedByVal::Submit => {
72                // The caller has given us a ref-count, and the transition has
73                // created a new ref-count, so we now hold two. We turn the new
74                // ref-count Notified and pass it to the call to `schedule`.
75                //
76                // The old ref-count is retained for now to ensure that the task
77                // is not dropped during the call to `schedule` if the call
78                // drops the task it was given.
79                self.schedule();
80
81                // Now that we have completed the call to schedule, we can
82                // release our ref-count.
83                self.drop_reference();
84            }
85            TransitionToNotifiedByVal::Dealloc => {
86                self.dealloc();
87            }
88            TransitionToNotifiedByVal::DoNothing => {}
89        }
90    }
91
92    /// This call notifies the task. It will not consume any ref-counts, but the
93    /// caller should hold a ref-count.  This will create a new Notified and
94    /// submit it if necessary.
95    pub(super) fn wake_by_ref(&self) {
96        use super::state::TransitionToNotifiedByRef;
97
98        match self.state().transition_to_notified_by_ref() {
99            TransitionToNotifiedByRef::Submit => {
100                // The transition above incremented the ref-count for a new task
101                // and the caller also holds a ref-count. The caller's ref-count
102                // ensures that the task is not destroyed even if the new task
103                // is dropped before `schedule` returns.
104                self.schedule();
105            }
106            TransitionToNotifiedByRef::DoNothing => {}
107        }
108    }
109
110    /// Remotely aborts the task.
111    ///
112    /// The caller should hold a ref-count, but we do not consume it.
113    ///
114    /// This is similar to `shutdown` except that it asks the runtime to perform
115    /// the shutdown. This is necessary to avoid the shutdown happening in the
116    /// wrong thread for non-Send tasks.
117    pub(super) fn remote_abort(&self) {
118        if self.state().transition_to_notified_and_cancel() {
119            // The transition has created a new ref-count, which we turn into
120            // a Notified and pass to the task.
121            //
122            // Since the caller holds a ref-count, the task cannot be destroyed
123            // before the call to `schedule` returns even if the call drops the
124            // `Notified` internally.
125            self.schedule();
126        }
127    }
128
129    /// Try to set the waker notified when the task is complete. Returns true if
130    /// the task has already completed. If this call returns false, then the
131    /// waker will not be notified.
132    pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
133        can_read_output(self.header(), self.trailer(), waker)
134    }
135}
136
137impl<T, S> Harness<T, S>
138where
139    T: Future,
140    S: Schedule,
141{
142    pub(super) fn drop_reference(self) {
143        if self.state().ref_dec() {
144            self.dealloc();
145        }
146    }
147
148    /// Polls the inner future. A ref-count is consumed.
149    ///
150    /// All necessary state checks and transitions are performed.
151    /// Panics raised while polling the future are handled.
152    pub(super) fn poll(self) {
153        // We pass our ref-count to `poll_inner`.
154        match self.poll_inner() {
155            PollFuture::Notified => {
156                // The `poll_inner` call has given us two ref-counts back.
157                // We give one of them to a new task and call `yield_now`.
158                self.core()
159                    .scheduler
160                    .yield_now(Notified(self.get_new_task()));
161
162                // The remaining ref-count is now dropped. We kept the extra
163                // ref-count until now to ensure that even if the `yield_now`
164                // call drops the provided task, the task isn't deallocated
165                // before after `yield_now` returns.
166                self.drop_reference();
167            }
168            PollFuture::Complete => {
169                self.complete();
170            }
171            PollFuture::Dealloc => {
172                self.dealloc();
173            }
174            PollFuture::Done => (),
175        }
176    }
177
178    /// Polls the task and cancel it if necessary. This takes ownership of a
179    /// ref-count.
180    ///
181    /// If the return value is Notified, the caller is given ownership of two
182    /// ref-counts.
183    ///
184    /// If the return value is Complete, the caller is given ownership of a
185    /// single ref-count, which should be passed on to `complete`.
186    ///
187    /// If the return value is `Dealloc`, then this call consumed the last
188    /// ref-count and the caller should call `dealloc`.
189    ///
190    /// Otherwise the ref-count is consumed and the caller should not access
191    /// `self` again.
192    fn poll_inner(&self) -> PollFuture {
193        use super::state::{TransitionToIdle, TransitionToRunning};
194
195        match self.state().transition_to_running() {
196            TransitionToRunning::Success => {
197                // Separated to reduce LLVM codegen
198                fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
199                    match result {
200                        TransitionToIdle::Ok => PollFuture::Done,
201                        TransitionToIdle::OkNotified => PollFuture::Notified,
202                        TransitionToIdle::OkDealloc => PollFuture::Dealloc,
203                        TransitionToIdle::Cancelled => PollFuture::Complete,
204                    }
205                }
206                let header_ptr = self.header_ptr();
207                let waker_ref = waker_ref::<S>(&header_ptr);
208                let cx = Context::from_waker(&waker_ref);
209                let res = poll_future(self.core(), cx);
210
211                if res == Poll::Ready(()) {
212                    // The future completed. Move on to complete the task.
213                    return PollFuture::Complete;
214                }
215
216                let transition_res = self.state().transition_to_idle();
217                if let TransitionToIdle::Cancelled = transition_res {
218                    // The transition to idle failed because the task was
219                    // cancelled during the poll.
220                    cancel_task(self.core());
221                }
222                transition_result_to_poll_future(transition_res)
223            }
224            TransitionToRunning::Cancelled => {
225                cancel_task(self.core());
226                PollFuture::Complete
227            }
228            TransitionToRunning::Failed => PollFuture::Done,
229            TransitionToRunning::Dealloc => PollFuture::Dealloc,
230        }
231    }
232
233    /// Forcibly shuts down the task.
234    ///
235    /// Attempt to transition to `Running` in order to forcibly shutdown the
236    /// task. If the task is currently running or in a state of completion, then
237    /// there is nothing further to do. When the task completes running, it will
238    /// notice the `CANCELLED` bit and finalize the task.
239    pub(super) fn shutdown(self) {
240        if !self.state().transition_to_shutdown() {
241            // The task is concurrently running. No further work needed.
242            self.drop_reference();
243            return;
244        }
245
246        // By transitioning the lifecycle to `Running`, we have permission to
247        // drop the future.
248        cancel_task(self.core());
249        self.complete();
250    }
251
252    pub(super) fn dealloc(self) {
253        // Observe that we expect to have mutable access to these objects
254        // because we are going to drop them. This only matters when running
255        // under loom.
256        self.trailer().waker.with_mut(|_| ());
257        self.core().stage.with_mut(|_| ());
258
259        // Safety: The caller of this method just transitioned our ref-count to
260        // zero, so it is our responsibility to release the allocation.
261        //
262        // We don't hold any references into the allocation at this point, but
263        // it is possible for another thread to still hold a `&State` into the
264        // allocation if that other thread has decremented its last ref-count,
265        // but has not yet returned from the relevant method on `State`.
266        //
267        // However, the `State` type consists of just an `AtomicUsize`, and an
268        // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
269        // As explained in the documentation for `UnsafeCell`, such references
270        // are allowed to be dangling after their last use, even if the
271        // reference has not yet gone out of scope.
272        unsafe {
273            drop(Box::from_raw(self.cell.as_ptr()));
274        }
275    }
276
277    // ===== join handle =====
278
279    /// Read the task output into `dst`.
280    pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
281        if can_read_output(self.header(), self.trailer(), waker) {
282            *dst = Poll::Ready(self.core().take_output());
283        }
284    }
285
286    pub(super) fn drop_join_handle_slow(self) {
287        // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in
288        // case the task concurrently completed.
289        let transition = self.state().transition_to_join_handle_dropped();
290
291        if transition.drop_output {
292            // It is our responsibility to drop the output. This is critical as
293            // the task output may not be `Send` and as such must remain with
294            // the scheduler or `JoinHandle`. i.e. if the output remains in the
295            // task structure until the task is deallocated, it may be dropped
296            // by a Waker on any arbitrary thread.
297            //
298            // Panics are delivered to the user via the `JoinHandle`. Given that
299            // they are dropping the `JoinHandle`, we assume they are not
300            // interested in the panic and swallow it.
301            let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
302                self.core().drop_future_or_output();
303            }));
304        }
305
306        if transition.drop_waker {
307            // If the JOIN_WAKER flag is unset at this point, the task is either
308            // already terminal or not complete so the `JoinHandle` is responsible
309            // for dropping the waker.
310            // Safety:
311            // If the JOIN_WAKER bit is not set the join handle has exclusive
312            // access to the waker as per rule 2 in task/mod.rs.
313            // This can only be the case at this point in two scenarios:
314            // 1. The task completed and the runtime unset `JOIN_WAKER` flag
315            //    after accessing the waker during task completion. So the
316            //    `JoinHandle` is the only one to access the  join waker here.
317            // 2. The task is not completed so the `JoinHandle` was able to unset
318            //    `JOIN_WAKER` bit itself to get mutable access to the waker.
319            //    The runtime will not access the waker when this flag is unset.
320            unsafe { self.trailer().set_waker(None) };
321        }
322
323        // Drop the `JoinHandle` reference, possibly deallocating the task
324        self.drop_reference();
325    }
326
327    // ====== internal ======
328
329    /// Completes the task. This method assumes that the state is RUNNING.
330    fn complete(self) {
331        // The future has completed and its output has been written to the task
332        // stage. We transition from running to complete.
333        let snapshot = self.state().transition_to_complete();
334
335        // We catch panics here in case dropping the future or waking the
336        // JoinHandle panics.
337        let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
338            if !snapshot.is_join_interested() {
339                // The `JoinHandle` is not interested in the output of
340                // this task. It is our responsibility to drop the
341                // output. The join waker was already dropped by the
342                // `JoinHandle` before.
343                self.core().drop_future_or_output();
344            } else if snapshot.is_join_waker_set() {
345                // Notify the waker. Reading the waker field is safe per rule 4
346                // in task/mod.rs, since the JOIN_WAKER bit is set and the call
347                // to transition_to_complete() above set the COMPLETE bit.
348                self.trailer().wake_join();
349
350                // Inform the `JoinHandle` that we are done waking the waker by
351                // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has
352                // already been dropped and `JOIN_INTEREST` is unset, then we must
353                // drop the waker ourselves.
354                if !self
355                    .state()
356                    .unset_waker_after_complete()
357                    .is_join_interested()
358                {
359                    // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so
360                    // we have exclusive access to the waker.
361                    unsafe { self.trailer().set_waker(None) };
362                }
363            }
364        }));
365
366        // We catch panics here in case invoking a hook panics.
367        //
368        // We call this in a separate block so that it runs after the task appears to have
369        // completed and will still run if the destructor panics.
370        if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() {
371            let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
372                f(&TaskMeta {
373                    id: self.core().task_id,
374                    _phantom: Default::default(),
375                })
376            }));
377        }
378
379        // The task has completed execution and will no longer be scheduled.
380        let num_release = self.release();
381
382        if self.state().transition_to_terminal(num_release) {
383            self.dealloc();
384        }
385    }
386
387    /// Releases the task from the scheduler. Returns the number of ref-counts
388    /// that should be decremented.
389    fn release(&self) -> usize {
390        // We don't actually increment the ref-count here, but the new task is
391        // never destroyed, so that's ok.
392        let me = ManuallyDrop::new(self.get_new_task());
393
394        if let Some(task) = self.core().scheduler.release(&me) {
395            mem::forget(task);
396            2
397        } else {
398            1
399        }
400    }
401
402    /// Creates a new task that holds its own ref-count.
403    ///
404    /// # Safety
405    ///
406    /// Any use of `self` after this call must ensure that a ref-count to the
407    /// task holds the task alive until after the use of `self`. Passing the
408    /// returned Task to any method on `self` is unsound if dropping the Task
409    /// could drop `self` before the call on `self` returned.
410    fn get_new_task(&self) -> Task<S> {
411        // safety: The header is at the beginning of the cell, so this cast is
412        // safe.
413        unsafe { Task::from_raw(self.cell.cast()) }
414    }
415}
416
417fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
418    // Load a snapshot of the current task state
419    let snapshot = header.state.load();
420
421    debug_assert!(snapshot.is_join_interested());
422
423    if !snapshot.is_complete() {
424        // If the task is not complete, try storing the provided waker in the
425        // task's waker field.
426
427        let res = if snapshot.is_join_waker_set() {
428            // If JOIN_WAKER is set, then JoinHandle has previously stored a
429            // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
430
431            // Optimization: if the stored waker and the provided waker wake the
432            // same task, then return without touching the waker field. (Reading
433            // the waker field below is safe per rule 3 in task/mod.rs.)
434            if unsafe { trailer.will_wake(waker) } {
435                return false;
436            }
437
438            // Otherwise swap the stored waker with the provided waker by
439            // following the rule 5 in task/mod.rs.
440            header
441                .state
442                .unset_waker()
443                .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
444        } else {
445            // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
446            // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
447            // of rule 5 and try to store the provided waker in the waker field.
448            set_join_waker(header, trailer, waker.clone(), snapshot)
449        };
450
451        match res {
452            Ok(_) => return false,
453            Err(snapshot) => {
454                assert!(snapshot.is_complete());
455            }
456        }
457    }
458    true
459}
460
461fn set_join_waker(
462    header: &Header,
463    trailer: &Trailer,
464    waker: Waker,
465    snapshot: Snapshot,
466) -> Result<Snapshot, Snapshot> {
467    assert!(snapshot.is_join_interested());
468    assert!(!snapshot.is_join_waker_set());
469
470    // Safety: Only the `JoinHandle` may set the `waker` field. When
471    // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
472    unsafe {
473        trailer.set_waker(Some(waker));
474    }
475
476    // Update the `JoinWaker` state accordingly
477    let res = header.state.set_join_waker();
478
479    // If the state could not be updated, then clear the join waker
480    if res.is_err() {
481        unsafe {
482            trailer.set_waker(None);
483        }
484    }
485
486    res
487}
488
489enum PollFuture {
490    Complete,
491    Notified,
492    Done,
493    Dealloc,
494}
495
496/// Cancels the task and store the appropriate error in the stage field.
497fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
498    // Drop the future from a panic guard.
499    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
500        core.drop_future_or_output();
501    }));
502
503    core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
504}
505
506fn panic_result_to_join_error(
507    task_id: Id,
508    res: Result<(), Box<dyn Any + Send + 'static>>,
509) -> JoinError {
510    match res {
511        Ok(()) => JoinError::cancelled(task_id),
512        Err(panic) => JoinError::panic(task_id, panic),
513    }
514}
515
516/// Polls the future. If the future completes, the output is written to the
517/// stage field.
518fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
519    // Poll the future.
520    let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
521        struct Guard<'a, T: Future, S: Schedule> {
522            core: &'a Core<T, S>,
523        }
524        impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
525            fn drop(&mut self) {
526                // If the future panics on poll, we drop it inside the panic
527                // guard.
528                self.core.drop_future_or_output();
529            }
530        }
531        let guard = Guard { core };
532        let res = guard.core.poll(cx);
533        mem::forget(guard);
534        res
535    }));
536
537    // Prepare output for being placed in the core stage.
538    let output = match output {
539        Ok(Poll::Pending) => return Poll::Pending,
540        Ok(Poll::Ready(output)) => Ok(output),
541        Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
542    };
543
544    // Catch and ignore panics if the future panics on drop.
545    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
546        core.store_output(output);
547    }));
548
549    if res.is_err() {
550        core.scheduler.unhandled_panic();
551    }
552
553    Poll::Ready(())
554}
555
556#[cold]
557fn panic_to_error<S: Schedule>(
558    scheduler: &S,
559    task_id: Id,
560    panic: Box<dyn Any + Send + 'static>,
561) -> JoinError {
562    scheduler.unhandled_panic();
563    JoinError::panic(task_id, panic)
564}