tokio/runtime/scheduler/current_thread/
mod.rs

1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6    self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9    blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25/// Executes tasks on the current thread
26pub(crate) struct CurrentThread {
27    /// Core scheduler data is acquired by a thread entering `block_on`.
28    core: AtomicCell<Core>,
29
30    /// Notifier for waking up other threads to steal the
31    /// driver.
32    notify: Notify,
33}
34
35/// Handle to the current thread scheduler
36pub(crate) struct Handle {
37    /// Scheduler state shared across threads
38    shared: Shared,
39
40    /// Resource driver handles
41    pub(crate) driver: driver::Handle,
42
43    /// Blocking pool spawner
44    pub(crate) blocking_spawner: blocking::Spawner,
45
46    /// Current random number generator seed
47    pub(crate) seed_generator: RngSeedGenerator,
48
49    /// User-supplied hooks to invoke for things
50    pub(crate) task_hooks: TaskHooks,
51
52    /// If this is a `LocalRuntime`, flags the owning thread ID.
53    pub(crate) local_tid: Option<ThreadId>,
54}
55
56/// Data required for executing the scheduler. The struct is passed around to
57/// a function that will perform the scheduling work and acts as a capability token.
58struct Core {
59    /// Scheduler run queue
60    tasks: VecDeque<Notified>,
61
62    /// Current tick
63    tick: u32,
64
65    /// Runtime driver
66    ///
67    /// The driver is removed before starting to park the thread
68    driver: Option<Driver>,
69
70    /// Metrics batch
71    metrics: MetricsBatch,
72
73    /// How often to check the global queue
74    global_queue_interval: u32,
75
76    /// True if a task panicked without being handled and the runtime is
77    /// configured to shutdown on unhandled panic.
78    unhandled_panic: bool,
79}
80
81/// Scheduler state shared between threads.
82struct Shared {
83    /// Remote run queue
84    inject: Inject<Arc<Handle>>,
85
86    /// Collection of all active tasks spawned onto this executor.
87    owned: OwnedTasks<Arc<Handle>>,
88
89    /// Indicates whether the blocked on thread was woken.
90    woken: AtomicBool,
91
92    /// Scheduler configuration options
93    config: Config,
94
95    /// Keeps track of various runtime metrics.
96    scheduler_metrics: SchedulerMetrics,
97
98    /// This scheduler only has one worker.
99    worker_metrics: WorkerMetrics,
100}
101
102/// Thread-local context.
103///
104/// pub(crate) to store in `runtime::context`.
105pub(crate) struct Context {
106    /// Scheduler handle
107    handle: Arc<Handle>,
108
109    /// Scheduler core, enabling the holder of `Context` to execute the
110    /// scheduler.
111    core: RefCell<Option<Box<Core>>>,
112
113    /// Deferred tasks, usually ones that called `task::yield_now()`.
114    pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119/// Initial queue capacity.
120const INITIAL_CAPACITY: usize = 64;
121
122/// Used if none is specified. This is a temporary constant and will be removed
123/// as we unify tuning logic between the multi-thread and current-thread
124/// schedulers.
125const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128    pub(crate) fn new(
129        driver: Driver,
130        driver_handle: driver::Handle,
131        blocking_spawner: blocking::Spawner,
132        seed_generator: RngSeedGenerator,
133        config: Config,
134        local_tid: Option<ThreadId>,
135    ) -> (CurrentThread, Arc<Handle>) {
136        let worker_metrics = WorkerMetrics::from_config(&config);
137        worker_metrics.set_thread_id(thread::current().id());
138
139        // Get the configured global queue interval, or use the default.
140        let global_queue_interval = config
141            .global_queue_interval
142            .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144        let handle = Arc::new(Handle {
145            task_hooks: TaskHooks {
146                task_spawn_callback: config.before_spawn.clone(),
147                task_terminate_callback: config.after_termination.clone(),
148                #[cfg(tokio_unstable)]
149                before_poll_callback: config.before_poll.clone(),
150                #[cfg(tokio_unstable)]
151                after_poll_callback: config.after_poll.clone(),
152            },
153            shared: Shared {
154                inject: Inject::new(),
155                owned: OwnedTasks::new(1),
156                woken: AtomicBool::new(false),
157                config,
158                scheduler_metrics: SchedulerMetrics::new(),
159                worker_metrics,
160            },
161            driver: driver_handle,
162            blocking_spawner,
163            seed_generator,
164            local_tid,
165        });
166
167        let core = AtomicCell::new(Some(Box::new(Core {
168            tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169            tick: 0,
170            driver: Some(driver),
171            metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172            global_queue_interval,
173            unhandled_panic: false,
174        })));
175
176        let scheduler = CurrentThread {
177            core,
178            notify: Notify::new(),
179        };
180
181        (scheduler, handle)
182    }
183
184    #[track_caller]
185    pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186        pin!(future);
187
188        crate::runtime::context::enter_runtime(handle, false, |blocking| {
189            let handle = handle.as_current_thread();
190
191            // Attempt to steal the scheduler core and block_on the future if we can
192            // there, otherwise, lets select on a notification that the core is
193            // available or the future is complete.
194            loop {
195                if let Some(core) = self.take_core(handle) {
196                    handle
197                        .shared
198                        .worker_metrics
199                        .set_thread_id(thread::current().id());
200                    return core.block_on(future);
201                } else {
202                    let notified = self.notify.notified();
203                    pin!(notified);
204
205                    if let Some(out) = blocking
206                        .block_on(poll_fn(|cx| {
207                            if notified.as_mut().poll(cx).is_ready() {
208                                return Ready(None);
209                            }
210
211                            if let Ready(out) = future.as_mut().poll(cx) {
212                                return Ready(Some(out));
213                            }
214
215                            Pending
216                        }))
217                        .expect("Failed to `Enter::block_on`")
218                    {
219                        return out;
220                    }
221                }
222            }
223        })
224    }
225
226    fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227        let core = self.core.take()?;
228
229        Some(CoreGuard {
230            context: scheduler::Context::CurrentThread(Context {
231                handle: handle.clone(),
232                core: RefCell::new(Some(core)),
233                defer: Defer::new(),
234            }),
235            scheduler: self,
236        })
237    }
238
239    pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240        let handle = handle.as_current_thread();
241
242        // Avoid a double panic if we are currently panicking and
243        // the lock may be poisoned.
244
245        let core = match self.take_core(handle) {
246            Some(core) => core,
247            None if std::thread::panicking() => return,
248            None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249        };
250
251        // Check that the thread-local is not being destroyed
252        let tls_available = context::with_current(|_| ()).is_ok();
253
254        if tls_available {
255            core.enter(|core, _context| {
256                let core = shutdown2(core, handle);
257                (core, ())
258            });
259        } else {
260            // Shutdown without setting the context. `tokio::spawn` calls will
261            // fail, but those will fail either way because the thread-local is
262            // not available anymore.
263            let context = core.context.expect_current_thread();
264            let core = context.core.borrow_mut().take().unwrap();
265
266            let core = shutdown2(core, handle);
267            *context.core.borrow_mut() = Some(core);
268        }
269    }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273    // Drain the OwnedTasks collection. This call also closes the
274    // collection, ensuring that no tasks are ever pushed after this
275    // call returns.
276    handle.shared.owned.close_and_shutdown_all(0);
277
278    // Drain local queue
279    // We already shut down every task, so we just need to drop the task.
280    while let Some(task) = core.next_local_task(handle) {
281        drop(task);
282    }
283
284    // Close the injection queue
285    handle.shared.inject.close();
286
287    // Drain remote queue
288    while let Some(task) = handle.shared.inject.pop() {
289        drop(task);
290    }
291
292    assert!(handle.shared.owned.is_empty());
293
294    // Submit metrics
295    core.submit_metrics(handle);
296
297    // Shutdown the resource drivers
298    if let Some(driver) = core.driver.as_mut() {
299        driver.shutdown(&handle.driver);
300    }
301
302    core
303}
304
305impl fmt::Debug for CurrentThread {
306    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307        fmt.debug_struct("CurrentThread").finish()
308    }
309}
310
311// ===== impl Core =====
312
313impl Core {
314    /// Get and increment the current tick
315    fn tick(&mut self) {
316        self.tick = self.tick.wrapping_add(1);
317    }
318
319    fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320        if self.tick % self.global_queue_interval == 0 {
321            handle
322                .next_remote_task()
323                .or_else(|| self.next_local_task(handle))
324        } else {
325            self.next_local_task(handle)
326                .or_else(|| handle.next_remote_task())
327        }
328    }
329
330    fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331        let ret = self.tasks.pop_front();
332        handle
333            .shared
334            .worker_metrics
335            .set_queue_depth(self.tasks.len());
336        ret
337    }
338
339    fn push_task(&mut self, handle: &Handle, task: Notified) {
340        self.tasks.push_back(task);
341        self.metrics.inc_local_schedule_count();
342        handle
343            .shared
344            .worker_metrics
345            .set_queue_depth(self.tasks.len());
346    }
347
348    fn submit_metrics(&mut self, handle: &Handle) {
349        self.metrics.submit(&handle.shared.worker_metrics, 0);
350    }
351}
352
353#[cfg(tokio_taskdump)]
354fn wake_deferred_tasks_and_free(context: &Context) {
355    let wakers = context.defer.take_deferred();
356    for waker in wakers {
357        waker.wake();
358    }
359}
360
361// ===== impl Context =====
362
363impl Context {
364    /// Execute the closure with the given scheduler core stored in the
365    /// thread-local context.
366    fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367        core.metrics.start_poll();
368        let mut ret = self.enter(core, || crate::task::coop::budget(f));
369        ret.0.metrics.end_poll();
370        ret
371    }
372
373    /// Blocks the current thread until an event is received by the driver,
374    /// including I/O events, timer events, ...
375    fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376        let mut driver = core.driver.take().expect("driver missing");
377
378        if let Some(f) = &handle.shared.config.before_park {
379            let (c, ()) = self.enter(core, || f());
380            core = c;
381        }
382
383        // This check will fail if `before_park` spawns a task for us to run
384        // instead of parking the thread
385        if core.tasks.is_empty() {
386            // Park until the thread is signaled
387            core.metrics.about_to_park();
388            core.submit_metrics(handle);
389
390            let (c, ()) = self.enter(core, || {
391                driver.park(&handle.driver);
392                self.defer.wake();
393            });
394
395            core = c;
396
397            core.metrics.unparked();
398            core.submit_metrics(handle);
399        }
400
401        if let Some(f) = &handle.shared.config.after_unpark {
402            let (c, ()) = self.enter(core, || f());
403            core = c;
404        }
405
406        core.driver = Some(driver);
407        core
408    }
409
410    /// Checks the driver for new events without blocking the thread.
411    fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412        let mut driver = core.driver.take().expect("driver missing");
413
414        core.submit_metrics(handle);
415
416        let (mut core, ()) = self.enter(core, || {
417            driver.park_timeout(&handle.driver, Duration::from_millis(0));
418            self.defer.wake();
419        });
420
421        core.driver = Some(driver);
422        core
423    }
424
425    fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
426        // Store the scheduler core in the thread-local context
427        //
428        // A drop-guard is employed at a higher level.
429        *self.core.borrow_mut() = Some(core);
430
431        // Execute the closure while tracking the execution budget
432        let ret = f();
433
434        // Take the scheduler core back
435        let core = self.core.borrow_mut().take().expect("core missing");
436        (core, ret)
437    }
438
439    pub(crate) fn defer(&self, waker: &Waker) {
440        self.defer.defer(waker);
441    }
442}
443
444// ===== impl Handle =====
445
446impl Handle {
447    /// Spawns a future onto the `CurrentThread` scheduler
448    pub(crate) fn spawn<F>(
449        me: &Arc<Self>,
450        future: F,
451        id: crate::runtime::task::Id,
452    ) -> JoinHandle<F::Output>
453    where
454        F: crate::future::Future + Send + 'static,
455        F::Output: Send + 'static,
456    {
457        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
458
459        me.task_hooks.spawn(&TaskMeta {
460            id,
461            _phantom: Default::default(),
462        });
463
464        if let Some(notified) = notified {
465            me.schedule(notified);
466        }
467
468        handle
469    }
470
471    /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
472    ///
473    /// # Safety
474    /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
475    /// provably cannot be driven from or moved to different threads from the one on which the task
476    /// is spawned.
477    pub(crate) unsafe fn spawn_local<F>(
478        me: &Arc<Self>,
479        future: F,
480        id: crate::runtime::task::Id,
481    ) -> JoinHandle<F::Output>
482    where
483        F: crate::future::Future + 'static,
484        F::Output: 'static,
485    {
486        let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
487
488        me.task_hooks.spawn(&TaskMeta {
489            id,
490            _phantom: Default::default(),
491        });
492
493        if let Some(notified) = notified {
494            me.schedule(notified);
495        }
496
497        handle
498    }
499
500    /// Capture a snapshot of this runtime's state.
501    #[cfg(all(
502        tokio_unstable,
503        tokio_taskdump,
504        target_os = "linux",
505        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
506    ))]
507    pub(crate) fn dump(&self) -> crate::runtime::Dump {
508        use crate::runtime::dump;
509        use task::trace::trace_current_thread;
510
511        let mut traces = vec![];
512
513        // todo: how to make this work outside of a runtime context?
514        context::with_scheduler(|maybe_context| {
515            // drain the local queue
516            let context = if let Some(context) = maybe_context {
517                context.expect_current_thread()
518            } else {
519                return;
520            };
521            let mut maybe_core = context.core.borrow_mut();
522            let core = if let Some(core) = maybe_core.as_mut() {
523                core
524            } else {
525                return;
526            };
527            let local = &mut core.tasks;
528
529            if self.shared.inject.is_closed() {
530                return;
531            }
532
533            traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
534                .into_iter()
535                .map(|(id, trace)| dump::Task::new(id, trace))
536                .collect();
537
538            // Avoid double borrow panic
539            drop(maybe_core);
540
541            // Taking a taskdump could wakes every task, but we probably don't want
542            // the `yield_now` vector to be that large under normal circumstances.
543            // Therefore, we free its allocation.
544            wake_deferred_tasks_and_free(context);
545        });
546
547        dump::Dump::new(traces)
548    }
549
550    fn next_remote_task(&self) -> Option<Notified> {
551        self.shared.inject.pop()
552    }
553
554    fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
555        // Set woken to true when enter block_on, ensure outer future
556        // be polled for the first time when enter loop
557        me.shared.woken.store(true, Release);
558        waker_ref(me)
559    }
560
561    // reset woken to false and return original value
562    pub(crate) fn reset_woken(&self) -> bool {
563        self.shared.woken.swap(false, AcqRel)
564    }
565
566    pub(crate) fn num_alive_tasks(&self) -> usize {
567        self.shared.owned.num_alive_tasks()
568    }
569
570    pub(crate) fn injection_queue_depth(&self) -> usize {
571        self.shared.inject.len()
572    }
573}
574
575cfg_unstable_metrics! {
576    impl Handle {
577        pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
578            &self.shared.scheduler_metrics
579        }
580
581        pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
582            assert_eq!(0, worker);
583            &self.shared.worker_metrics
584        }
585
586        pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
587            self.worker_metrics(worker).queue_depth()
588        }
589
590        pub(crate) fn num_blocking_threads(&self) -> usize {
591            self.blocking_spawner.num_threads()
592        }
593
594        pub(crate) fn num_idle_blocking_threads(&self) -> usize {
595            self.blocking_spawner.num_idle_threads()
596        }
597
598        pub(crate) fn blocking_queue_depth(&self) -> usize {
599            self.blocking_spawner.queue_depth()
600        }
601
602        cfg_64bit_metrics! {
603            pub(crate) fn spawned_tasks_count(&self) -> u64 {
604                self.shared.owned.spawned_tasks_count()
605            }
606        }
607    }
608}
609
610cfg_unstable! {
611    use std::num::NonZeroU64;
612
613    impl Handle {
614        pub(crate) fn owned_id(&self) -> NonZeroU64 {
615            self.shared.owned.id
616        }
617    }
618}
619
620impl fmt::Debug for Handle {
621    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
622        fmt.debug_struct("current_thread::Handle { ... }").finish()
623    }
624}
625
626// ===== impl Shared =====
627
628impl Schedule for Arc<Handle> {
629    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
630        self.shared.owned.remove(task)
631    }
632
633    fn schedule(&self, task: task::Notified<Self>) {
634        use scheduler::Context::CurrentThread;
635
636        context::with_scheduler(|maybe_cx| match maybe_cx {
637            Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
638                let mut core = cx.core.borrow_mut();
639
640                // If `None`, the runtime is shutting down, so there is no need
641                // to schedule the task.
642                if let Some(core) = core.as_mut() {
643                    core.push_task(self, task);
644                }
645            }
646            _ => {
647                // Track that a task was scheduled from **outside** of the runtime.
648                self.shared.scheduler_metrics.inc_remote_schedule_count();
649
650                // Schedule the task
651                self.shared.inject.push(task);
652                self.driver.unpark();
653            }
654        });
655    }
656
657    fn hooks(&self) -> TaskHarnessScheduleHooks {
658        TaskHarnessScheduleHooks {
659            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
660        }
661    }
662
663    cfg_unstable! {
664        fn unhandled_panic(&self) {
665            use crate::runtime::UnhandledPanic;
666
667            match self.shared.config.unhandled_panic {
668                UnhandledPanic::Ignore => {
669                    // Do nothing
670                }
671                UnhandledPanic::ShutdownRuntime => {
672                    use scheduler::Context::CurrentThread;
673
674                    // This hook is only called from within the runtime, so
675                    // `context::with_scheduler` should match with `&self`, i.e.
676                    // there is no opportunity for a nested scheduler to be
677                    // called.
678                    context::with_scheduler(|maybe_cx| match maybe_cx {
679                        Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
680                            let mut core = cx.core.borrow_mut();
681
682                            // If `None`, the runtime is shutting down, so there is no need to signal shutdown
683                            if let Some(core) = core.as_mut() {
684                                core.unhandled_panic = true;
685                                self.shared.owned.close_and_shutdown_all(0);
686                            }
687                        }
688                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
689                    })
690                }
691            }
692        }
693    }
694}
695
696impl Wake for Handle {
697    fn wake(arc_self: Arc<Self>) {
698        Wake::wake_by_ref(&arc_self);
699    }
700
701    /// Wake by reference
702    fn wake_by_ref(arc_self: &Arc<Self>) {
703        arc_self.shared.woken.store(true, Release);
704        arc_self.driver.unpark();
705    }
706}
707
708// ===== CoreGuard =====
709
710/// Used to ensure we always place the `Core` value back into its slot in
711/// `CurrentThread`, even if the future panics.
712struct CoreGuard<'a> {
713    context: scheduler::Context,
714    scheduler: &'a CurrentThread,
715}
716
717impl CoreGuard<'_> {
718    #[track_caller]
719    fn block_on<F: Future>(self, future: F) -> F::Output {
720        let ret = self.enter(|mut core, context| {
721            let waker = Handle::waker_ref(&context.handle);
722            let mut cx = std::task::Context::from_waker(&waker);
723
724            pin!(future);
725
726            core.metrics.start_processing_scheduled_tasks();
727
728            'outer: loop {
729                let handle = &context.handle;
730
731                if handle.reset_woken() {
732                    let (c, res) = context.enter(core, || {
733                        crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
734                    });
735
736                    core = c;
737
738                    if let Ready(v) = res {
739                        return (core, Some(v));
740                    }
741                }
742
743                for _ in 0..handle.shared.config.event_interval {
744                    // Make sure we didn't hit an unhandled_panic
745                    if core.unhandled_panic {
746                        return (core, None);
747                    }
748
749                    core.tick();
750
751                    let entry = core.next_task(handle);
752
753                    let task = match entry {
754                        Some(entry) => entry,
755                        None => {
756                            core.metrics.end_processing_scheduled_tasks();
757
758                            core = if !context.defer.is_empty() {
759                                context.park_yield(core, handle)
760                            } else {
761                                context.park(core, handle)
762                            };
763
764                            core.metrics.start_processing_scheduled_tasks();
765
766                            // Try polling the `block_on` future next
767                            continue 'outer;
768                        }
769                    };
770
771                    let task = context.handle.shared.owned.assert_owner(task);
772
773                    #[cfg(tokio_unstable)]
774                    let task_id = task.task_id();
775
776                    let (c, ()) = context.run_task(core, || {
777                        #[cfg(tokio_unstable)]
778                        context.handle.task_hooks.poll_start_callback(task_id);
779
780                        task.run();
781
782                        #[cfg(tokio_unstable)]
783                        context.handle.task_hooks.poll_stop_callback(task_id);
784                    });
785
786                    core = c;
787                }
788
789                core.metrics.end_processing_scheduled_tasks();
790
791                // Yield to the driver, this drives the timer and pulls any
792                // pending I/O events.
793                core = context.park_yield(core, handle);
794
795                core.metrics.start_processing_scheduled_tasks();
796            }
797        });
798
799        match ret {
800            Some(ret) => ret,
801            None => {
802                // `block_on` panicked.
803                panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
804            }
805        }
806    }
807
808    /// Enters the scheduler context. This sets the queue and other necessary
809    /// scheduler state in the thread-local.
810    fn enter<F, R>(self, f: F) -> R
811    where
812        F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
813    {
814        let context = self.context.expect_current_thread();
815
816        // Remove `core` from `context` to pass into the closure.
817        let core = context.core.borrow_mut().take().expect("core missing");
818
819        // Call the closure and place `core` back
820        let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
821
822        *context.core.borrow_mut() = Some(core);
823
824        ret
825    }
826}
827
828impl Drop for CoreGuard<'_> {
829    fn drop(&mut self) {
830        let context = self.context.expect_current_thread();
831
832        if let Some(core) = context.core.borrow_mut().take() {
833            // Replace old scheduler back into the state to allow
834            // other threads to pick it up and drive it.
835            self.scheduler.core.set(core);
836
837            // Wake up other possible threads that could steal the driver.
838            self.scheduler.notify.notify_one();
839        }
840    }
841}