tokio/runtime/task/harness.rs
1use crate::future::Future;
2use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3use crate::runtime::task::state::{Snapshot, State};
4use crate::runtime::task::waker::waker_ref;
5use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6
7use crate::runtime::TaskMeta;
8use std::any::Any;
9use std::mem;
10use std::mem::ManuallyDrop;
11use std::panic;
12use std::ptr::NonNull;
13use std::task::{Context, Poll, Waker};
14
15/// Typed raw task handle.
16pub(super) struct Harness<T: Future, S: 'static> {
17 cell: NonNull<Cell<T, S>>,
18}
19
20impl<T, S> Harness<T, S>
21where
22 T: Future,
23 S: 'static,
24{
25 pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
26 Harness {
27 cell: ptr.cast::<Cell<T, S>>(),
28 }
29 }
30
31 fn header_ptr(&self) -> NonNull<Header> {
32 self.cell.cast()
33 }
34
35 fn header(&self) -> &Header {
36 unsafe { &*self.header_ptr().as_ptr() }
37 }
38
39 fn state(&self) -> &State {
40 &self.header().state
41 }
42
43 fn trailer(&self) -> &Trailer {
44 unsafe { &self.cell.as_ref().trailer }
45 }
46
47 fn core(&self) -> &Core<T, S> {
48 unsafe { &self.cell.as_ref().core }
49 }
50}
51
52/// Task operations that can be implemented without being generic over the
53/// scheduler or task. Only one version of these methods should exist in the
54/// final binary.
55impl RawTask {
56 pub(super) fn drop_reference(self) {
57 if self.state().ref_dec() {
58 self.dealloc();
59 }
60 }
61
62 /// This call consumes a ref-count and notifies the task. This will create a
63 /// new Notified and submit it if necessary.
64 ///
65 /// The caller does not need to hold a ref-count besides the one that was
66 /// passed to this call.
67 pub(super) fn wake_by_val(&self) {
68 use super::state::TransitionToNotifiedByVal;
69
70 match self.state().transition_to_notified_by_val() {
71 TransitionToNotifiedByVal::Submit => {
72 // The caller has given us a ref-count, and the transition has
73 // created a new ref-count, so we now hold two. We turn the new
74 // ref-count Notified and pass it to the call to `schedule`.
75 //
76 // The old ref-count is retained for now to ensure that the task
77 // is not dropped during the call to `schedule` if the call
78 // drops the task it was given.
79 self.schedule();
80
81 // Now that we have completed the call to schedule, we can
82 // release our ref-count.
83 self.drop_reference();
84 }
85 TransitionToNotifiedByVal::Dealloc => {
86 self.dealloc();
87 }
88 TransitionToNotifiedByVal::DoNothing => {}
89 }
90 }
91
92 /// This call notifies the task. It will not consume any ref-counts, but the
93 /// caller should hold a ref-count. This will create a new Notified and
94 /// submit it if necessary.
95 pub(super) fn wake_by_ref(&self) {
96 use super::state::TransitionToNotifiedByRef;
97
98 match self.state().transition_to_notified_by_ref() {
99 TransitionToNotifiedByRef::Submit => {
100 // The transition above incremented the ref-count for a new task
101 // and the caller also holds a ref-count. The caller's ref-count
102 // ensures that the task is not destroyed even if the new task
103 // is dropped before `schedule` returns.
104 self.schedule();
105 }
106 TransitionToNotifiedByRef::DoNothing => {}
107 }
108 }
109
110 /// Remotely aborts the task.
111 ///
112 /// The caller should hold a ref-count, but we do not consume it.
113 ///
114 /// This is similar to `shutdown` except that it asks the runtime to perform
115 /// the shutdown. This is necessary to avoid the shutdown happening in the
116 /// wrong thread for non-Send tasks.
117 pub(super) fn remote_abort(&self) {
118 if self.state().transition_to_notified_and_cancel() {
119 // The transition has created a new ref-count, which we turn into
120 // a Notified and pass to the task.
121 //
122 // Since the caller holds a ref-count, the task cannot be destroyed
123 // before the call to `schedule` returns even if the call drops the
124 // `Notified` internally.
125 self.schedule();
126 }
127 }
128
129 /// Try to set the waker notified when the task is complete. Returns true if
130 /// the task has already completed. If this call returns false, then the
131 /// waker will not be notified.
132 pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
133 can_read_output(self.header(), self.trailer(), waker)
134 }
135}
136
137impl<T, S> Harness<T, S>
138where
139 T: Future,
140 S: Schedule,
141{
142 pub(super) fn drop_reference(self) {
143 if self.state().ref_dec() {
144 self.dealloc();
145 }
146 }
147
148 /// Polls the inner future. A ref-count is consumed.
149 ///
150 /// All necessary state checks and transitions are performed.
151 /// Panics raised while polling the future are handled.
152 pub(super) fn poll(self) {
153 // We pass our ref-count to `poll_inner`.
154 match self.poll_inner() {
155 PollFuture::Notified => {
156 // The `poll_inner` call has given us two ref-counts back.
157 // We give one of them to a new task and call `yield_now`.
158 self.core()
159 .scheduler
160 .yield_now(Notified(self.get_new_task()));
161
162 // The remaining ref-count is now dropped. We kept the extra
163 // ref-count until now to ensure that even if the `yield_now`
164 // call drops the provided task, the task isn't deallocated
165 // before after `yield_now` returns.
166 self.drop_reference();
167 }
168 PollFuture::Complete => {
169 self.complete();
170 }
171 PollFuture::Dealloc => {
172 self.dealloc();
173 }
174 PollFuture::Done => (),
175 }
176 }
177
178 /// Polls the task and cancel it if necessary. This takes ownership of a
179 /// ref-count.
180 ///
181 /// If the return value is Notified, the caller is given ownership of two
182 /// ref-counts.
183 ///
184 /// If the return value is Complete, the caller is given ownership of a
185 /// single ref-count, which should be passed on to `complete`.
186 ///
187 /// If the return value is `Dealloc`, then this call consumed the last
188 /// ref-count and the caller should call `dealloc`.
189 ///
190 /// Otherwise the ref-count is consumed and the caller should not access
191 /// `self` again.
192 fn poll_inner(&self) -> PollFuture {
193 use super::state::{TransitionToIdle, TransitionToRunning};
194
195 match self.state().transition_to_running() {
196 TransitionToRunning::Success => {
197 // Separated to reduce LLVM codegen
198 fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
199 match result {
200 TransitionToIdle::Ok => PollFuture::Done,
201 TransitionToIdle::OkNotified => PollFuture::Notified,
202 TransitionToIdle::OkDealloc => PollFuture::Dealloc,
203 TransitionToIdle::Cancelled => PollFuture::Complete,
204 }
205 }
206 let header_ptr = self.header_ptr();
207 let waker_ref = waker_ref::<S>(&header_ptr);
208 let cx = Context::from_waker(&waker_ref);
209 let res = poll_future(self.core(), cx);
210
211 if res == Poll::Ready(()) {
212 // The future completed. Move on to complete the task.
213 return PollFuture::Complete;
214 }
215
216 let transition_res = self.state().transition_to_idle();
217 if let TransitionToIdle::Cancelled = transition_res {
218 // The transition to idle failed because the task was
219 // cancelled during the poll.
220 cancel_task(self.core());
221 }
222 transition_result_to_poll_future(transition_res)
223 }
224 TransitionToRunning::Cancelled => {
225 cancel_task(self.core());
226 PollFuture::Complete
227 }
228 TransitionToRunning::Failed => PollFuture::Done,
229 TransitionToRunning::Dealloc => PollFuture::Dealloc,
230 }
231 }
232
233 /// Forcibly shuts down the task.
234 ///
235 /// Attempt to transition to `Running` in order to forcibly shutdown the
236 /// task. If the task is currently running or in a state of completion, then
237 /// there is nothing further to do. When the task completes running, it will
238 /// notice the `CANCELLED` bit and finalize the task.
239 pub(super) fn shutdown(self) {
240 if !self.state().transition_to_shutdown() {
241 // The task is concurrently running. No further work needed.
242 self.drop_reference();
243 return;
244 }
245
246 // By transitioning the lifecycle to `Running`, we have permission to
247 // drop the future.
248 cancel_task(self.core());
249 self.complete();
250 }
251
252 pub(super) fn dealloc(self) {
253 // Observe that we expect to have mutable access to these objects
254 // because we are going to drop them. This only matters when running
255 // under loom.
256 self.trailer().waker.with_mut(|_| ());
257 self.core().stage.with_mut(|_| ());
258
259 // Safety: The caller of this method just transitioned our ref-count to
260 // zero, so it is our responsibility to release the allocation.
261 //
262 // We don't hold any references into the allocation at this point, but
263 // it is possible for another thread to still hold a `&State` into the
264 // allocation if that other thread has decremented its last ref-count,
265 // but has not yet returned from the relevant method on `State`.
266 //
267 // However, the `State` type consists of just an `AtomicUsize`, and an
268 // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
269 // As explained in the documentation for `UnsafeCell`, such references
270 // are allowed to be dangling after their last use, even if the
271 // reference has not yet gone out of scope.
272 unsafe {
273 drop(Box::from_raw(self.cell.as_ptr()));
274 }
275 }
276
277 // ===== join handle =====
278
279 /// Read the task output into `dst`.
280 pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
281 if can_read_output(self.header(), self.trailer(), waker) {
282 *dst = Poll::Ready(self.core().take_output());
283 }
284 }
285
286 pub(super) fn drop_join_handle_slow(self) {
287 // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in
288 // case the task concurrently completed.
289 let transition = self.state().transition_to_join_handle_dropped();
290
291 if transition.drop_output {
292 // It is our responsibility to drop the output. This is critical as
293 // the task output may not be `Send` and as such must remain with
294 // the scheduler or `JoinHandle`. i.e. if the output remains in the
295 // task structure until the task is deallocated, it may be dropped
296 // by a Waker on any arbitrary thread.
297 //
298 // Panics are delivered to the user via the `JoinHandle`. Given that
299 // they are dropping the `JoinHandle`, we assume they are not
300 // interested in the panic and swallow it.
301 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
302 self.core().drop_future_or_output();
303 }));
304 }
305
306 if transition.drop_waker {
307 // If the JOIN_WAKER flag is unset at this point, the task is either
308 // already terminal or not complete so the `JoinHandle` is responsible
309 // for dropping the waker.
310 // Safety:
311 // If the JOIN_WAKER bit is not set the join handle has exclusive
312 // access to the waker as per rule 2 in task/mod.rs.
313 // This can only be the case at this point in two scenarios:
314 // 1. The task completed and the runtime unset `JOIN_WAKER` flag
315 // after accessing the waker during task completion. So the
316 // `JoinHandle` is the only one to access the join waker here.
317 // 2. The task is not completed so the `JoinHandle` was able to unset
318 // `JOIN_WAKER` bit itself to get mutable access to the waker.
319 // The runtime will not access the waker when this flag is unset.
320 unsafe { self.trailer().set_waker(None) };
321 }
322
323 // Drop the `JoinHandle` reference, possibly deallocating the task
324 self.drop_reference();
325 }
326
327 // ====== internal ======
328
329 /// Completes the task. This method assumes that the state is RUNNING.
330 fn complete(self) {
331 // The future has completed and its output has been written to the task
332 // stage. We transition from running to complete.
333 let snapshot = self.state().transition_to_complete();
334
335 // We catch panics here in case dropping the future or waking the
336 // JoinHandle panics.
337 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
338 if !snapshot.is_join_interested() {
339 // The `JoinHandle` is not interested in the output of
340 // this task. It is our responsibility to drop the
341 // output. The join waker was already dropped by the
342 // `JoinHandle` before.
343 self.core().drop_future_or_output();
344 } else if snapshot.is_join_waker_set() {
345 // Notify the waker. Reading the waker field is safe per rule 4
346 // in task/mod.rs, since the JOIN_WAKER bit is set and the call
347 // to transition_to_complete() above set the COMPLETE bit.
348 self.trailer().wake_join();
349
350 // Inform the `JoinHandle` that we are done waking the waker by
351 // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has
352 // already been dropped and `JOIN_INTEREST` is unset, then we must
353 // drop the waker ourselves.
354 if !self
355 .state()
356 .unset_waker_after_complete()
357 .is_join_interested()
358 {
359 // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so
360 // we have exclusive access to the waker.
361 unsafe { self.trailer().set_waker(None) };
362 }
363 }
364 }));
365
366 // We catch panics here in case invoking a hook panics.
367 //
368 // We call this in a separate block so that it runs after the task appears to have
369 // completed and will still run if the destructor panics.
370 if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() {
371 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
372 f(&TaskMeta {
373 id: self.core().task_id,
374 _phantom: Default::default(),
375 })
376 }));
377 }
378
379 // The task has completed execution and will no longer be scheduled.
380 let num_release = self.release();
381
382 if self.state().transition_to_terminal(num_release) {
383 self.dealloc();
384 }
385 }
386
387 /// Releases the task from the scheduler. Returns the number of ref-counts
388 /// that should be decremented.
389 fn release(&self) -> usize {
390 // We don't actually increment the ref-count here, but the new task is
391 // never destroyed, so that's ok.
392 let me = ManuallyDrop::new(self.get_new_task());
393
394 if let Some(task) = self.core().scheduler.release(&me) {
395 mem::forget(task);
396 2
397 } else {
398 1
399 }
400 }
401
402 /// Creates a new task that holds its own ref-count.
403 ///
404 /// # Safety
405 ///
406 /// Any use of `self` after this call must ensure that a ref-count to the
407 /// task holds the task alive until after the use of `self`. Passing the
408 /// returned Task to any method on `self` is unsound if dropping the Task
409 /// could drop `self` before the call on `self` returned.
410 fn get_new_task(&self) -> Task<S> {
411 // safety: The header is at the beginning of the cell, so this cast is
412 // safe.
413 unsafe { Task::from_raw(self.cell.cast()) }
414 }
415}
416
417fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
418 // Load a snapshot of the current task state
419 let snapshot = header.state.load();
420
421 debug_assert!(snapshot.is_join_interested());
422
423 if !snapshot.is_complete() {
424 // If the task is not complete, try storing the provided waker in the
425 // task's waker field.
426
427 let res = if snapshot.is_join_waker_set() {
428 // If JOIN_WAKER is set, then JoinHandle has previously stored a
429 // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
430
431 // Optimization: if the stored waker and the provided waker wake the
432 // same task, then return without touching the waker field. (Reading
433 // the waker field below is safe per rule 3 in task/mod.rs.)
434 if unsafe { trailer.will_wake(waker) } {
435 return false;
436 }
437
438 // Otherwise swap the stored waker with the provided waker by
439 // following the rule 5 in task/mod.rs.
440 header
441 .state
442 .unset_waker()
443 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
444 } else {
445 // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
446 // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
447 // of rule 5 and try to store the provided waker in the waker field.
448 set_join_waker(header, trailer, waker.clone(), snapshot)
449 };
450
451 match res {
452 Ok(_) => return false,
453 Err(snapshot) => {
454 assert!(snapshot.is_complete());
455 }
456 }
457 }
458 true
459}
460
461fn set_join_waker(
462 header: &Header,
463 trailer: &Trailer,
464 waker: Waker,
465 snapshot: Snapshot,
466) -> Result<Snapshot, Snapshot> {
467 assert!(snapshot.is_join_interested());
468 assert!(!snapshot.is_join_waker_set());
469
470 // Safety: Only the `JoinHandle` may set the `waker` field. When
471 // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
472 unsafe {
473 trailer.set_waker(Some(waker));
474 }
475
476 // Update the `JoinWaker` state accordingly
477 let res = header.state.set_join_waker();
478
479 // If the state could not be updated, then clear the join waker
480 if res.is_err() {
481 unsafe {
482 trailer.set_waker(None);
483 }
484 }
485
486 res
487}
488
489enum PollFuture {
490 Complete,
491 Notified,
492 Done,
493 Dealloc,
494}
495
496/// Cancels the task and store the appropriate error in the stage field.
497fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
498 // Drop the future from a panic guard.
499 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
500 core.drop_future_or_output();
501 }));
502
503 core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
504}
505
506fn panic_result_to_join_error(
507 task_id: Id,
508 res: Result<(), Box<dyn Any + Send + 'static>>,
509) -> JoinError {
510 match res {
511 Ok(()) => JoinError::cancelled(task_id),
512 Err(panic) => JoinError::panic(task_id, panic),
513 }
514}
515
516/// Polls the future. If the future completes, the output is written to the
517/// stage field.
518fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
519 // Poll the future.
520 let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
521 struct Guard<'a, T: Future, S: Schedule> {
522 core: &'a Core<T, S>,
523 }
524 impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
525 fn drop(&mut self) {
526 // If the future panics on poll, we drop it inside the panic
527 // guard.
528 self.core.drop_future_or_output();
529 }
530 }
531 let guard = Guard { core };
532 let res = guard.core.poll(cx);
533 mem::forget(guard);
534 res
535 }));
536
537 // Prepare output for being placed in the core stage.
538 let output = match output {
539 Ok(Poll::Pending) => return Poll::Pending,
540 Ok(Poll::Ready(output)) => Ok(output),
541 Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
542 };
543
544 // Catch and ignore panics if the future panics on drop.
545 let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
546 core.store_output(output);
547 }));
548
549 if res.is_err() {
550 core.scheduler.unhandled_panic();
551 }
552
553 Poll::Ready(())
554}
555
556#[cold]
557fn panic_to_error<S: Schedule>(
558 scheduler: &S,
559 task_id: Id,
560 panic: Box<dyn Any + Send + 'static>,
561) -> JoinError {
562 scheduler.unhandled_panic();
563 JoinError::panic(task_id, panic)
564}