tokio/runtime/scheduler/multi_thread/
worker.rs

1//! A scheduler is initialized with a fixed number of workers. Each worker is
2//! driven by a thread. Each worker has a "core" which contains data such as the
3//! run queue and other state. When `block_in_place` is called, the worker's
4//! "core" is handed off to a new thread allowing the scheduler to continue to
5//! make progress while the originating thread blocks.
6//!
7//! # Shutdown
8//!
9//! Shutting down the runtime involves the following steps:
10//!
11//!  1. The Shared::close method is called. This closes the inject queue and
12//!     `OwnedTasks` instance and wakes up all worker threads.
13//!
14//!  2. Each worker thread observes the close signal next time it runs
15//!     Core::maintenance by checking whether the inject queue is closed.
16//!     The `Core::is_shutdown` flag is set to true.
17//!
18//!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19//!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20//!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21//!     was closed in step 1.
22//!
23//!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24//!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25//!     and the last thread to push its core will finish the shutdown procedure.
26//!
27//!  6. The local run queue of each core is emptied, then the inject queue is
28//!     emptied.
29//!
30//! At this point, shutdown has completed. It is not possible for any of the
31//! collections to contain any tasks at this point, as each collection was
32//! closed first, then emptied afterwards.
33//!
34//! ## Spawns during shutdown
35//!
36//! When spawning tasks during shutdown, there are two cases:
37//!
38//!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39//!    closed.
40//!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41//!    inject queue.
42//!
43//! The first case can only happen if the `OwnedTasks::bind` call happens before
44//! or during step 1 of shutdown. In this case, the runtime will clean up the
45//! task in step 3 of shutdown.
46//!
47//! In the latter case, the task was not spawned and the task is immediately
48//! cancelled by the spawner.
49//!
50//! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51//! collection to have a closed bit. With a close bit on only the inject queue,
52//! spawning could run in to a situation where a task is successfully bound long
53//! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54//! the first spawning situation could result in the notification being pushed
55//! to the inject queue after step 6 of shutdown, which would leave a task in
56//! the inject queue indefinitely. This would be a ref-count cycle and a memory
57//! leak.
58
59use crate::loom::sync::{Arc, Mutex};
60use crate::runtime;
61use crate::runtime::scheduler::multi_thread::{
62    idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
63};
64use crate::runtime::scheduler::{inject, Defer, Lock};
65use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
66use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics};
67use crate::runtime::{context, TaskHooks};
68use crate::task::coop;
69use crate::util::atomic_cell::AtomicCell;
70use crate::util::rand::{FastRand, RngSeedGenerator};
71
72use std::cell::RefCell;
73use std::task::Waker;
74use std::thread;
75use std::time::Duration;
76
77mod metrics;
78
79cfg_taskdump! {
80    mod taskdump;
81}
82
83cfg_not_taskdump! {
84    mod taskdump_mock;
85}
86
87/// A scheduler worker
88pub(super) struct Worker {
89    /// Reference to scheduler's handle
90    handle: Arc<Handle>,
91
92    /// Index holding this worker's remote state
93    index: usize,
94
95    /// Used to hand-off a worker's core to another thread.
96    core: AtomicCell<Core>,
97}
98
99/// Core data
100struct Core {
101    /// Used to schedule bookkeeping tasks every so often.
102    tick: u32,
103
104    /// When a task is scheduled from a worker, it is stored in this slot. The
105    /// worker will check this slot for a task **before** checking the run
106    /// queue. This effectively results in the **last** scheduled task to be run
107    /// next (LIFO). This is an optimization for improving locality which
108    /// benefits message passing patterns and helps to reduce latency.
109    lifo_slot: Option<Notified>,
110
111    /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
112    /// they go to the back of the `run_queue`.
113    lifo_enabled: bool,
114
115    /// The worker-local run queue.
116    run_queue: queue::Local<Arc<Handle>>,
117
118    /// True if the worker is currently searching for more work. Searching
119    /// involves attempting to steal from other workers.
120    is_searching: bool,
121
122    /// True if the scheduler is being shutdown
123    is_shutdown: bool,
124
125    /// True if the scheduler is being traced
126    is_traced: bool,
127
128    /// Parker
129    ///
130    /// Stored in an `Option` as the parker is added / removed to make the
131    /// borrow checker happy.
132    park: Option<Parker>,
133
134    /// Per-worker runtime stats
135    stats: Stats,
136
137    /// How often to check the global queue
138    global_queue_interval: u32,
139
140    /// Fast random number generator.
141    rand: FastRand,
142}
143
144/// State shared across all workers
145pub(crate) struct Shared {
146    /// Per-worker remote state. All other workers have access to this and is
147    /// how they communicate between each other.
148    remotes: Box<[Remote]>,
149
150    /// Global task queue used for:
151    ///  1. Submit work to the scheduler while **not** currently on a worker thread.
152    ///  2. Submit work to the scheduler when a worker run queue is saturated
153    pub(super) inject: inject::Shared<Arc<Handle>>,
154
155    /// Coordinates idle workers
156    idle: Idle,
157
158    /// Collection of all active tasks spawned onto this executor.
159    pub(crate) owned: OwnedTasks<Arc<Handle>>,
160
161    /// Data synchronized by the scheduler mutex
162    pub(super) synced: Mutex<Synced>,
163
164    /// Cores that have observed the shutdown signal
165    ///
166    /// The core is **not** placed back in the worker to avoid it from being
167    /// stolen by a thread that was spawned as part of `block_in_place`.
168    #[allow(clippy::vec_box)] // we're moving an already-boxed value
169    shutdown_cores: Mutex<Vec<Box<Core>>>,
170
171    /// The number of cores that have observed the trace signal.
172    pub(super) trace_status: TraceStatus,
173
174    /// Scheduler configuration options
175    config: Config,
176
177    /// Collects metrics from the runtime.
178    pub(super) scheduler_metrics: SchedulerMetrics,
179
180    pub(super) worker_metrics: Box<[WorkerMetrics]>,
181
182    /// Only held to trigger some code on drop. This is used to get internal
183    /// runtime metrics that can be useful when doing performance
184    /// investigations. This does nothing (empty struct, no drop impl) unless
185    /// the `tokio_internal_mt_counters` `cfg` flag is set.
186    _counters: Counters,
187}
188
189/// Data synchronized by the scheduler mutex
190pub(crate) struct Synced {
191    /// Synchronized state for `Idle`.
192    pub(super) idle: idle::Synced,
193
194    /// Synchronized state for `Inject`.
195    pub(crate) inject: inject::Synced,
196}
197
198/// Used to communicate with a worker from other threads.
199struct Remote {
200    /// Steals tasks from this worker.
201    pub(super) steal: queue::Steal<Arc<Handle>>,
202
203    /// Unparks the associated worker thread
204    unpark: Unparker,
205}
206
207/// Thread-local context
208pub(crate) struct Context {
209    /// Worker
210    worker: Arc<Worker>,
211
212    /// Core data
213    core: RefCell<Option<Box<Core>>>,
214
215    /// Tasks to wake after resource drivers are polled. This is mostly to
216    /// handle yielded tasks.
217    pub(crate) defer: Defer,
218}
219
220/// Starts the workers
221pub(crate) struct Launch(Vec<Arc<Worker>>);
222
223/// Running a task may consume the core. If the core is still available when
224/// running the task completes, it is returned. Otherwise, the worker will need
225/// to stop processing.
226type RunResult = Result<Box<Core>, ()>;
227
228/// A task handle
229type Task = task::Task<Arc<Handle>>;
230
231/// A notified task handle
232type Notified = task::Notified<Arc<Handle>>;
233
234/// Value picked out of thin-air. Running the LIFO slot a handful of times
235/// seems sufficient to benefit from locality. More than 3 times probably is
236/// overweighing. The value can be tuned in the future with data that shows
237/// improvements.
238const MAX_LIFO_POLLS_PER_TICK: usize = 3;
239
240pub(super) fn create(
241    size: usize,
242    park: Parker,
243    driver_handle: driver::Handle,
244    blocking_spawner: blocking::Spawner,
245    seed_generator: RngSeedGenerator,
246    config: Config,
247) -> (Arc<Handle>, Launch) {
248    let mut cores = Vec::with_capacity(size);
249    let mut remotes = Vec::with_capacity(size);
250    let mut worker_metrics = Vec::with_capacity(size);
251
252    // Create the local queues
253    for _ in 0..size {
254        let (steal, run_queue) = queue::local();
255
256        let park = park.clone();
257        let unpark = park.unpark();
258        let metrics = WorkerMetrics::from_config(&config);
259        let stats = Stats::new(&metrics);
260
261        cores.push(Box::new(Core {
262            tick: 0,
263            lifo_slot: None,
264            lifo_enabled: !config.disable_lifo_slot,
265            run_queue,
266            is_searching: false,
267            is_shutdown: false,
268            is_traced: false,
269            park: Some(park),
270            global_queue_interval: stats.tuned_global_queue_interval(&config),
271            stats,
272            rand: FastRand::from_seed(config.seed_generator.next_seed()),
273        }));
274
275        remotes.push(Remote { steal, unpark });
276        worker_metrics.push(metrics);
277    }
278
279    let (idle, idle_synced) = Idle::new(size);
280    let (inject, inject_synced) = inject::Shared::new();
281
282    let remotes_len = remotes.len();
283    let handle = Arc::new(Handle {
284        task_hooks: TaskHooks::from_config(&config),
285        shared: Shared {
286            remotes: remotes.into_boxed_slice(),
287            inject,
288            idle,
289            owned: OwnedTasks::new(size),
290            synced: Mutex::new(Synced {
291                idle: idle_synced,
292                inject: inject_synced,
293            }),
294            shutdown_cores: Mutex::new(vec![]),
295            trace_status: TraceStatus::new(remotes_len),
296            config,
297            scheduler_metrics: SchedulerMetrics::new(),
298            worker_metrics: worker_metrics.into_boxed_slice(),
299            _counters: Counters,
300        },
301        driver: driver_handle,
302        blocking_spawner,
303        seed_generator,
304    });
305
306    let mut launch = Launch(vec![]);
307
308    for (index, core) in cores.drain(..).enumerate() {
309        launch.0.push(Arc::new(Worker {
310            handle: handle.clone(),
311            index,
312            core: AtomicCell::new(Some(core)),
313        }));
314    }
315
316    (handle, launch)
317}
318
319#[track_caller]
320pub(crate) fn block_in_place<F, R>(f: F) -> R
321where
322    F: FnOnce() -> R,
323{
324    // Try to steal the worker core back
325    struct Reset {
326        take_core: bool,
327        budget: coop::Budget,
328    }
329
330    impl Drop for Reset {
331        fn drop(&mut self) {
332            with_current(|maybe_cx| {
333                if let Some(cx) = maybe_cx {
334                    if self.take_core {
335                        let core = cx.worker.core.take();
336
337                        if core.is_some() {
338                            cx.worker.handle.shared.worker_metrics[cx.worker.index]
339                                .set_thread_id(thread::current().id());
340                        }
341
342                        let mut cx_core = cx.core.borrow_mut();
343                        assert!(cx_core.is_none());
344                        *cx_core = core;
345                    }
346
347                    // Reset the task budget as we are re-entering the
348                    // runtime.
349                    coop::set(self.budget);
350                }
351            });
352        }
353    }
354
355    let mut had_entered = false;
356    let mut take_core = false;
357
358    let setup_result = with_current(|maybe_cx| {
359        match (
360            crate::runtime::context::current_enter_context(),
361            maybe_cx.is_some(),
362        ) {
363            (context::EnterRuntime::Entered { .. }, true) => {
364                // We are on a thread pool runtime thread, so we just need to
365                // set up blocking.
366                had_entered = true;
367            }
368            (
369                context::EnterRuntime::Entered {
370                    allow_block_in_place,
371                },
372                false,
373            ) => {
374                // We are on an executor, but _not_ on the thread pool.  That is
375                // _only_ okay if we are in a thread pool runtime's block_on
376                // method:
377                if allow_block_in_place {
378                    had_entered = true;
379                    return Ok(());
380                } else {
381                    // This probably means we are on the current_thread runtime or in a
382                    // LocalSet, where it is _not_ okay to block.
383                    return Err(
384                        "can call blocking only when running on the multi-threaded runtime",
385                    );
386                }
387            }
388            (context::EnterRuntime::NotEntered, true) => {
389                // This is a nested call to block_in_place (we already exited).
390                // All the necessary setup has already been done.
391                return Ok(());
392            }
393            (context::EnterRuntime::NotEntered, false) => {
394                // We are outside of the tokio runtime, so blocking is fine.
395                // We can also skip all of the thread pool blocking setup steps.
396                return Ok(());
397            }
398        }
399
400        let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
401
402        // Get the worker core. If none is set, then blocking is fine!
403        let mut core = match cx.core.borrow_mut().take() {
404            Some(core) => core,
405            None => return Ok(()),
406        };
407
408        // If we heavily call `spawn_blocking`, there might be no available thread to
409        // run this core. Except for the task in the lifo_slot, all tasks can be
410        // stolen, so we move the task out of the lifo_slot to the run_queue.
411        if let Some(task) = core.lifo_slot.take() {
412            core.run_queue
413                .push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
414        }
415
416        // We are taking the core from the context and sending it to another
417        // thread.
418        take_core = true;
419
420        // The parker should be set here
421        assert!(core.park.is_some());
422
423        // In order to block, the core must be sent to another thread for
424        // execution.
425        //
426        // First, move the core back into the worker's shared core slot.
427        cx.worker.core.set(core);
428
429        // Next, clone the worker handle and send it to a new thread for
430        // processing.
431        //
432        // Once the blocking task is done executing, we will attempt to
433        // steal the core back.
434        let worker = cx.worker.clone();
435        runtime::spawn_blocking(move || run(worker));
436        Ok(())
437    });
438
439    if let Err(panic_message) = setup_result {
440        panic!("{}", panic_message);
441    }
442
443    if had_entered {
444        // Unset the current task's budget. Blocking sections are not
445        // constrained by task budgets.
446        let _reset = Reset {
447            take_core,
448            budget: coop::stop(),
449        };
450
451        crate::runtime::context::exit_runtime(f)
452    } else {
453        f()
454    }
455}
456
457impl Launch {
458    pub(crate) fn launch(mut self) {
459        for worker in self.0.drain(..) {
460            runtime::spawn_blocking(move || run(worker));
461        }
462    }
463}
464
465fn run(worker: Arc<Worker>) {
466    #[allow(dead_code)]
467    struct AbortOnPanic;
468
469    impl Drop for AbortOnPanic {
470        fn drop(&mut self) {
471            if std::thread::panicking() {
472                eprintln!("worker thread panicking; aborting process");
473                std::process::abort();
474            }
475        }
476    }
477
478    // Catching panics on worker threads in tests is quite tricky. Instead, when
479    // debug assertions are enabled, we just abort the process.
480    #[cfg(debug_assertions)]
481    let _abort_on_panic = AbortOnPanic;
482
483    // Acquire a core. If this fails, then another thread is running this
484    // worker and there is nothing further to do.
485    let core = match worker.core.take() {
486        Some(core) => core,
487        None => return,
488    };
489
490    worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());
491
492    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
493
494    crate::runtime::context::enter_runtime(&handle, true, |_| {
495        // Set the worker context.
496        let cx = scheduler::Context::MultiThread(Context {
497            worker,
498            core: RefCell::new(None),
499            defer: Defer::new(),
500        });
501
502        context::set_scheduler(&cx, || {
503            let cx = cx.expect_multi_thread();
504
505            // This should always be an error. It only returns a `Result` to support
506            // using `?` to short circuit.
507            assert!(cx.run(core).is_err());
508
509            // Check if there are any deferred tasks to notify. This can happen when
510            // the worker core is lost due to `block_in_place()` being called from
511            // within the task.
512            cx.defer.wake();
513        });
514    });
515}
516
517impl Context {
518    fn run(&self, mut core: Box<Core>) -> RunResult {
519        // Reset `lifo_enabled` here in case the core was previously stolen from
520        // a task that had the LIFO slot disabled.
521        self.reset_lifo_enabled(&mut core);
522
523        // Start as "processing" tasks as polling tasks from the local queue
524        // will be one of the first things we do.
525        core.stats.start_processing_scheduled_tasks();
526
527        while !core.is_shutdown {
528            self.assert_lifo_enabled_is_correct(&core);
529
530            if core.is_traced {
531                core = self.worker.handle.trace_core(core);
532            }
533
534            // Increment the tick
535            core.tick();
536
537            // Run maintenance, if needed
538            core = self.maintenance(core);
539
540            // First, check work available to the current worker.
541            if let Some(task) = core.next_task(&self.worker) {
542                core = self.run_task(task, core)?;
543                continue;
544            }
545
546            // We consumed all work in the queues and will start searching for work.
547            core.stats.end_processing_scheduled_tasks();
548
549            // There is no more **local** work to process, try to steal work
550            // from other workers.
551            if let Some(task) = core.steal_work(&self.worker) {
552                // Found work, switch back to processing
553                core.stats.start_processing_scheduled_tasks();
554                core = self.run_task(task, core)?;
555            } else {
556                // Wait for work
557                core = if !self.defer.is_empty() {
558                    self.park_timeout(core, Some(Duration::from_millis(0)))
559                } else {
560                    self.park(core)
561                };
562                core.stats.start_processing_scheduled_tasks();
563            }
564        }
565
566        core.pre_shutdown(&self.worker);
567        // Signal shutdown
568        self.worker.handle.shutdown_core(core);
569        Err(())
570    }
571
572    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
573        #[cfg(tokio_unstable)]
574        let task_id = task.task_id();
575
576        let task = self.worker.handle.shared.owned.assert_owner(task);
577
578        // Make sure the worker is not in the **searching** state. This enables
579        // another idle worker to try to steal work.
580        core.transition_from_searching(&self.worker);
581
582        self.assert_lifo_enabled_is_correct(&core);
583
584        // Measure the poll start time. Note that we may end up polling other
585        // tasks under this measurement. In this case, the tasks came from the
586        // LIFO slot and are considered part of the current task for scheduling
587        // purposes. These tasks inherent the "parent"'s limits.
588        core.stats.start_poll();
589
590        // Make the core available to the runtime context
591        *self.core.borrow_mut() = Some(core);
592
593        // Run the task
594        coop::budget(|| {
595            // Unlike the poll time above, poll start callback is attached to the task id,
596            // so it is tightly associated with the actual poll invocation.
597            #[cfg(tokio_unstable)]
598            self.worker.handle.task_hooks.poll_start_callback(task_id);
599
600            task.run();
601
602            #[cfg(tokio_unstable)]
603            self.worker.handle.task_hooks.poll_stop_callback(task_id);
604
605            let mut lifo_polls = 0;
606
607            // As long as there is budget remaining and a task exists in the
608            // `lifo_slot`, then keep running.
609            loop {
610                // Check if we still have the core. If not, the core was stolen
611                // by another worker.
612                let mut core = match self.core.borrow_mut().take() {
613                    Some(core) => core,
614                    None => {
615                        // In this case, we cannot call `reset_lifo_enabled()`
616                        // because the core was stolen. The stealer will handle
617                        // that at the top of `Context::run`
618                        return Err(());
619                    }
620                };
621
622                // Check for a task in the LIFO slot
623                let task = match core.lifo_slot.take() {
624                    Some(task) => task,
625                    None => {
626                        self.reset_lifo_enabled(&mut core);
627                        core.stats.end_poll();
628                        return Ok(core);
629                    }
630                };
631
632                if !coop::has_budget_remaining() {
633                    core.stats.end_poll();
634
635                    // Not enough budget left to run the LIFO task, push it to
636                    // the back of the queue and return.
637                    core.run_queue.push_back_or_overflow(
638                        task,
639                        &*self.worker.handle,
640                        &mut core.stats,
641                    );
642                    // If we hit this point, the LIFO slot should be enabled.
643                    // There is no need to reset it.
644                    debug_assert!(core.lifo_enabled);
645                    return Ok(core);
646                }
647
648                // Track that we are about to run a task from the LIFO slot.
649                lifo_polls += 1;
650                super::counters::inc_lifo_schedules();
651
652                // Disable the LIFO slot if we reach our limit
653                //
654                // In ping-ping style workloads where task A notifies task B,
655                // which notifies task A again, continuously prioritizing the
656                // LIFO slot can cause starvation as these two tasks will
657                // repeatedly schedule the other. To mitigate this, we limit the
658                // number of times the LIFO slot is prioritized.
659                if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
660                    core.lifo_enabled = false;
661                    super::counters::inc_lifo_capped();
662                }
663
664                // Run the LIFO task, then loop
665                *self.core.borrow_mut() = Some(core);
666                let task = self.worker.handle.shared.owned.assert_owner(task);
667
668                #[cfg(tokio_unstable)]
669                let task_id = task.task_id();
670
671                #[cfg(tokio_unstable)]
672                self.worker.handle.task_hooks.poll_start_callback(task_id);
673
674                task.run();
675
676                #[cfg(tokio_unstable)]
677                self.worker.handle.task_hooks.poll_stop_callback(task_id);
678            }
679        })
680    }
681
682    fn reset_lifo_enabled(&self, core: &mut Core) {
683        core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
684    }
685
686    fn assert_lifo_enabled_is_correct(&self, core: &Core) {
687        debug_assert_eq!(
688            core.lifo_enabled,
689            !self.worker.handle.shared.config.disable_lifo_slot
690        );
691    }
692
693    fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
694        if core.tick % self.worker.handle.shared.config.event_interval == 0 {
695            super::counters::inc_num_maintenance();
696
697            core.stats.end_processing_scheduled_tasks();
698
699            // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
700            // to run without actually putting the thread to sleep.
701            core = self.park_timeout(core, Some(Duration::from_millis(0)));
702
703            // Run regularly scheduled maintenance
704            core.maintenance(&self.worker);
705
706            core.stats.start_processing_scheduled_tasks();
707        }
708
709        core
710    }
711
712    /// Parks the worker thread while waiting for tasks to execute.
713    ///
714    /// This function checks if indeed there's no more work left to be done before parking.
715    /// Also important to notice that, before parking, the worker thread will try to take
716    /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
717    /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
718    /// in its own local queue until the queue saturates (ntasks > `LOCAL_QUEUE_CAPACITY`).
719    /// When the local queue is saturated, the overflow tasks are added to the injection queue
720    /// from where other workers can pick them up.
721    /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
722    /// after all the IOs get dispatched
723    fn park(&self, mut core: Box<Core>) -> Box<Core> {
724        if let Some(f) = &self.worker.handle.shared.config.before_park {
725            f();
726        }
727
728        if core.transition_to_parked(&self.worker) {
729            while !core.is_shutdown && !core.is_traced {
730                core.stats.about_to_park();
731                core.stats
732                    .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]);
733
734                core = self.park_timeout(core, None);
735
736                core.stats.unparked();
737
738                // Run regularly scheduled maintenance
739                core.maintenance(&self.worker);
740
741                if core.transition_from_parked(&self.worker) {
742                    break;
743                }
744            }
745        }
746
747        if let Some(f) = &self.worker.handle.shared.config.after_unpark {
748            f();
749        }
750        core
751    }
752
753    fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
754        self.assert_lifo_enabled_is_correct(&core);
755
756        // Take the parker out of core
757        let mut park = core.park.take().expect("park missing");
758
759        // Store `core` in context
760        *self.core.borrow_mut() = Some(core);
761
762        // Park thread
763        if let Some(timeout) = duration {
764            park.park_timeout(&self.worker.handle.driver, timeout);
765        } else {
766            park.park(&self.worker.handle.driver);
767        }
768
769        self.defer.wake();
770
771        // Remove `core` from context
772        core = self.core.borrow_mut().take().expect("core missing");
773
774        // Place `park` back in `core`
775        core.park = Some(park);
776
777        if core.should_notify_others() {
778            self.worker.handle.notify_parked_local();
779        }
780
781        core
782    }
783
784    pub(crate) fn defer(&self, waker: &Waker) {
785        if self.core.borrow().is_none() {
786            // If there is no core, then the worker is currently in a block_in_place. In this case,
787            // we cannot use the defer queue as we aren't really in the current runtime.
788            waker.wake_by_ref();
789        } else {
790            self.defer.defer(waker);
791        }
792    }
793
794    #[allow(dead_code)]
795    pub(crate) fn get_worker_index(&self) -> usize {
796        self.worker.index
797    }
798}
799
800impl Core {
801    /// Increment the tick
802    fn tick(&mut self) {
803        self.tick = self.tick.wrapping_add(1);
804    }
805
806    /// Return the next notified task available to this worker.
807    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
808        if self.tick % self.global_queue_interval == 0 {
809            // Update the global queue interval, if needed
810            self.tune_global_queue_interval(worker);
811
812            worker
813                .handle
814                .next_remote_task()
815                .or_else(|| self.next_local_task())
816        } else {
817            let maybe_task = self.next_local_task();
818
819            if maybe_task.is_some() {
820                return maybe_task;
821            }
822
823            if worker.inject().is_empty() {
824                return None;
825            }
826
827            // Other threads can only **remove** tasks from the current worker's
828            // `run_queue`. So, we can be confident that by the time we call
829            // `run_queue.push_back` below, there will be *at least* `cap`
830            // available slots in the queue.
831            let cap = usize::min(
832                self.run_queue.remaining_slots(),
833                self.run_queue.max_capacity() / 2,
834            );
835
836            // The worker is currently idle, pull a batch of work from the
837            // injection queue. We don't want to pull *all* the work so other
838            // workers can also get some.
839            let n = usize::min(
840                worker.inject().len() / worker.handle.shared.remotes.len() + 1,
841                cap,
842            );
843
844            // Take at least one task since the first task is returned directly
845            // and not pushed onto the local queue.
846            let n = usize::max(1, n);
847
848            let mut synced = worker.handle.shared.synced.lock();
849            // safety: passing in the correct `inject::Synced`.
850            let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
851
852            // Pop the first task to return immediately
853            let ret = tasks.next();
854
855            // Push the rest of the on the run queue
856            self.run_queue.push_back(tasks);
857
858            ret
859        }
860    }
861
862    fn next_local_task(&mut self) -> Option<Notified> {
863        self.lifo_slot.take().or_else(|| self.run_queue.pop())
864    }
865
866    /// Function responsible for stealing tasks from another worker
867    ///
868    /// Note: Only if less than half the workers are searching for tasks to steal
869    /// a new worker will actually try to steal. The idea is to make sure not all
870    /// workers will be trying to steal at the same time.
871    fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
872        if !self.transition_to_searching(worker) {
873            return None;
874        }
875
876        let num = worker.handle.shared.remotes.len();
877        // Start from a random worker
878        let start = self.rand.fastrand_n(num as u32) as usize;
879
880        for i in 0..num {
881            let i = (start + i) % num;
882
883            // Don't steal from ourself! We know we don't have work.
884            if i == worker.index {
885                continue;
886            }
887
888            let target = &worker.handle.shared.remotes[i];
889            if let Some(task) = target
890                .steal
891                .steal_into(&mut self.run_queue, &mut self.stats)
892            {
893                return Some(task);
894            }
895        }
896
897        // Fallback on checking the global queue
898        worker.handle.next_remote_task()
899    }
900
901    fn transition_to_searching(&mut self, worker: &Worker) -> bool {
902        if !self.is_searching {
903            self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
904        }
905
906        self.is_searching
907    }
908
909    fn transition_from_searching(&mut self, worker: &Worker) {
910        if !self.is_searching {
911            return;
912        }
913
914        self.is_searching = false;
915        worker.handle.transition_worker_from_searching();
916    }
917
918    fn has_tasks(&self) -> bool {
919        self.lifo_slot.is_some() || self.run_queue.has_tasks()
920    }
921
922    fn should_notify_others(&self) -> bool {
923        // If there are tasks available to steal, but this worker is not
924        // looking for tasks to steal, notify another worker.
925        if self.is_searching {
926            return false;
927        }
928        self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
929    }
930
931    /// Prepares the worker state for parking.
932    ///
933    /// Returns true if the transition happened, false if there is work to do first.
934    fn transition_to_parked(&mut self, worker: &Worker) -> bool {
935        // Workers should not park if they have work to do
936        if self.has_tasks() || self.is_traced {
937            return false;
938        }
939
940        // When the final worker transitions **out** of searching to parked, it
941        // must check all the queues one last time in case work materialized
942        // between the last work scan and transitioning out of searching.
943        let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
944            &worker.handle.shared,
945            worker.index,
946            self.is_searching,
947        );
948
949        // The worker is no longer searching. Setting this is the local cache
950        // only.
951        self.is_searching = false;
952
953        if is_last_searcher {
954            worker.handle.notify_if_work_pending();
955        }
956
957        true
958    }
959
960    /// Returns `true` if the transition happened.
961    fn transition_from_parked(&mut self, worker: &Worker) -> bool {
962        // If a task is in the lifo slot/run queue, then we must unpark regardless of
963        // being notified
964        if self.has_tasks() {
965            // When a worker wakes, it should only transition to the "searching"
966            // state when the wake originates from another worker *or* a new task
967            // is pushed. We do *not* want the worker to transition to "searching"
968            // when it wakes when the I/O driver receives new events.
969            self.is_searching = !worker
970                .handle
971                .shared
972                .idle
973                .unpark_worker_by_id(&worker.handle.shared, worker.index);
974            return true;
975        }
976
977        if worker
978            .handle
979            .shared
980            .idle
981            .is_parked(&worker.handle.shared, worker.index)
982        {
983            return false;
984        }
985
986        // When unparked, the worker is in the searching state.
987        self.is_searching = true;
988        true
989    }
990
991    /// Runs maintenance work such as checking the pool's state.
992    fn maintenance(&mut self, worker: &Worker) {
993        self.stats
994            .submit(&worker.handle.shared.worker_metrics[worker.index]);
995
996        if !self.is_shutdown {
997            // Check if the scheduler has been shutdown
998            let synced = worker.handle.shared.synced.lock();
999            self.is_shutdown = worker.inject().is_closed(&synced.inject);
1000        }
1001
1002        if !self.is_traced {
1003            // Check if the worker should be tracing.
1004            self.is_traced = worker.handle.shared.trace_status.trace_requested();
1005        }
1006    }
1007
1008    /// Signals all tasks to shut down, and waits for them to complete. Must run
1009    /// before we enter the single-threaded phase of shutdown processing.
1010    fn pre_shutdown(&mut self, worker: &Worker) {
1011        // Start from a random inner list
1012        let start = self
1013            .rand
1014            .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
1015        // Signal to all tasks to shut down.
1016        worker
1017            .handle
1018            .shared
1019            .owned
1020            .close_and_shutdown_all(start as usize);
1021
1022        self.stats
1023            .submit(&worker.handle.shared.worker_metrics[worker.index]);
1024    }
1025
1026    /// Shuts down the core.
1027    fn shutdown(&mut self, handle: &Handle) {
1028        // Take the core
1029        let mut park = self.park.take().expect("park missing");
1030
1031        // Drain the queue
1032        while self.next_local_task().is_some() {}
1033
1034        park.shutdown(&handle.driver);
1035    }
1036
1037    fn tune_global_queue_interval(&mut self, worker: &Worker) {
1038        let next = self
1039            .stats
1040            .tuned_global_queue_interval(&worker.handle.shared.config);
1041
1042        // Smooth out jitter
1043        if u32::abs_diff(self.global_queue_interval, next) > 2 {
1044            self.global_queue_interval = next;
1045        }
1046    }
1047}
1048
1049impl Worker {
1050    /// Returns a reference to the scheduler's injection queue.
1051    fn inject(&self) -> &inject::Shared<Arc<Handle>> {
1052        &self.handle.shared.inject
1053    }
1054}
1055
1056// TODO: Move `Handle` impls into handle.rs
1057impl task::Schedule for Arc<Handle> {
1058    fn release(&self, task: &Task) -> Option<Task> {
1059        self.shared.owned.remove(task)
1060    }
1061
1062    fn schedule(&self, task: Notified) {
1063        self.schedule_task(task, false);
1064    }
1065
1066    fn hooks(&self) -> TaskHarnessScheduleHooks {
1067        TaskHarnessScheduleHooks {
1068            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1069        }
1070    }
1071
1072    fn yield_now(&self, task: Notified) {
1073        self.schedule_task(task, true);
1074    }
1075}
1076
1077impl Handle {
1078    pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1079        with_current(|maybe_cx| {
1080            if let Some(cx) = maybe_cx {
1081                // Make sure the task is part of the **current** scheduler.
1082                if self.ptr_eq(&cx.worker.handle) {
1083                    // And the current thread still holds a core
1084                    if let Some(core) = cx.core.borrow_mut().as_mut() {
1085                        self.schedule_local(core, task, is_yield);
1086                        return;
1087                    }
1088                }
1089            }
1090
1091            // Otherwise, use the inject queue.
1092            self.push_remote_task(task);
1093            self.notify_parked_remote();
1094        });
1095    }
1096
1097    pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1098        if let Some(task) = task {
1099            self.schedule_task(task, false);
1100        }
1101    }
1102
1103    fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1104        core.stats.inc_local_schedule_count();
1105
1106        // Spawning from the worker thread. If scheduling a "yield" then the
1107        // task must always be pushed to the back of the queue, enabling other
1108        // tasks to be executed. If **not** a yield, then there is more
1109        // flexibility and the task may go to the front of the queue.
1110        let should_notify = if is_yield || !core.lifo_enabled {
1111            core.run_queue
1112                .push_back_or_overflow(task, self, &mut core.stats);
1113            true
1114        } else {
1115            // Push to the LIFO slot
1116            let prev = core.lifo_slot.take();
1117            let ret = prev.is_some();
1118
1119            if let Some(prev) = prev {
1120                core.run_queue
1121                    .push_back_or_overflow(prev, self, &mut core.stats);
1122            }
1123
1124            core.lifo_slot = Some(task);
1125
1126            ret
1127        };
1128
1129        // Only notify if not currently parked. If `park` is `None`, then the
1130        // scheduling is from a resource driver. As notifications often come in
1131        // batches, the notification is delayed until the park is complete.
1132        if should_notify && core.park.is_some() {
1133            self.notify_parked_local();
1134        }
1135    }
1136
1137    fn next_remote_task(&self) -> Option<Notified> {
1138        if self.shared.inject.is_empty() {
1139            return None;
1140        }
1141
1142        let mut synced = self.shared.synced.lock();
1143        // safety: passing in correct `idle::Synced`
1144        unsafe { self.shared.inject.pop(&mut synced.inject) }
1145    }
1146
1147    fn push_remote_task(&self, task: Notified) {
1148        self.shared.scheduler_metrics.inc_remote_schedule_count();
1149
1150        let mut synced = self.shared.synced.lock();
1151        // safety: passing in correct `idle::Synced`
1152        unsafe {
1153            self.shared.inject.push(&mut synced.inject, task);
1154        }
1155    }
1156
1157    pub(super) fn close(&self) {
1158        if self
1159            .shared
1160            .inject
1161            .close(&mut self.shared.synced.lock().inject)
1162        {
1163            self.notify_all();
1164        }
1165    }
1166
1167    fn notify_parked_local(&self) {
1168        super::counters::inc_num_inc_notify_local();
1169
1170        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1171            super::counters::inc_num_unparks_local();
1172            self.shared.remotes[index].unpark.unpark(&self.driver);
1173        }
1174    }
1175
1176    fn notify_parked_remote(&self) {
1177        if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1178            self.shared.remotes[index].unpark.unpark(&self.driver);
1179        }
1180    }
1181
1182    pub(super) fn notify_all(&self) {
1183        for remote in &self.shared.remotes[..] {
1184            remote.unpark.unpark(&self.driver);
1185        }
1186    }
1187
1188    fn notify_if_work_pending(&self) {
1189        for remote in &self.shared.remotes[..] {
1190            if !remote.steal.is_empty() {
1191                self.notify_parked_local();
1192                return;
1193            }
1194        }
1195
1196        if !self.shared.inject.is_empty() {
1197            self.notify_parked_local();
1198        }
1199    }
1200
1201    fn transition_worker_from_searching(&self) {
1202        if self.shared.idle.transition_worker_from_searching() {
1203            // We are the final searching worker. Because work was found, we
1204            // need to notify another worker.
1205            self.notify_parked_local();
1206        }
1207    }
1208
1209    /// Signals that a worker has observed the shutdown signal and has replaced
1210    /// its core back into its handle.
1211    ///
1212    /// If all workers have reached this point, the final cleanup is performed.
1213    fn shutdown_core(&self, core: Box<Core>) {
1214        let mut cores = self.shared.shutdown_cores.lock();
1215        cores.push(core);
1216
1217        if cores.len() != self.shared.remotes.len() {
1218            return;
1219        }
1220
1221        debug_assert!(self.shared.owned.is_empty());
1222
1223        for mut core in cores.drain(..) {
1224            core.shutdown(self);
1225        }
1226
1227        // Drain the injection queue
1228        //
1229        // We already shut down every task, so we can simply drop the tasks.
1230        while let Some(task) = self.next_remote_task() {
1231            drop(task);
1232        }
1233    }
1234
1235    fn ptr_eq(&self, other: &Handle) -> bool {
1236        std::ptr::eq(self, other)
1237    }
1238}
1239
1240impl Overflow<Arc<Handle>> for Handle {
1241    fn push(&self, task: task::Notified<Arc<Handle>>) {
1242        self.push_remote_task(task);
1243    }
1244
1245    fn push_batch<I>(&self, iter: I)
1246    where
1247        I: Iterator<Item = task::Notified<Arc<Handle>>>,
1248    {
1249        unsafe {
1250            self.shared.inject.push_batch(self, iter);
1251        }
1252    }
1253}
1254
1255pub(crate) struct InjectGuard<'a> {
1256    lock: crate::loom::sync::MutexGuard<'a, Synced>,
1257}
1258
1259impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
1260    fn as_mut(&mut self) -> &mut inject::Synced {
1261        &mut self.lock.inject
1262    }
1263}
1264
1265impl<'a> Lock<inject::Synced> for &'a Handle {
1266    type Handle = InjectGuard<'a>;
1267
1268    fn lock(self) -> Self::Handle {
1269        InjectGuard {
1270            lock: self.shared.synced.lock(),
1271        }
1272    }
1273}
1274
1275#[track_caller]
1276fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1277    use scheduler::Context::MultiThread;
1278
1279    context::with_scheduler(|ctx| match ctx {
1280        Some(MultiThread(ctx)) => f(Some(ctx)),
1281        _ => f(None),
1282    })
1283}