1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 shared: Shared,
39
40 pub(crate) driver: driver::Handle,
42
43 pub(crate) blocking_spawner: blocking::Spawner,
45
46 pub(crate) seed_generator: RngSeedGenerator,
48
49 pub(crate) task_hooks: TaskHooks,
51
52 pub(crate) local_tid: Option<ThreadId>,
54}
55
56struct Core {
59 tasks: VecDeque<Notified>,
61
62 tick: u32,
64
65 driver: Option<Driver>,
69
70 metrics: MetricsBatch,
72
73 global_queue_interval: u32,
75
76 unhandled_panic: bool,
79}
80
81struct Shared {
83 inject: Inject<Arc<Handle>>,
85
86 owned: OwnedTasks<Arc<Handle>>,
88
89 woken: AtomicBool,
91
92 config: Config,
94
95 scheduler_metrics: SchedulerMetrics,
97
98 worker_metrics: WorkerMetrics,
100}
101
102pub(crate) struct Context {
106 handle: Arc<Handle>,
108
109 core: RefCell<Option<Box<Core>>>,
112
113 pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119const INITIAL_CAPACITY: usize = 64;
121
122const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 #[cfg(tokio_unstable)]
149 before_poll_callback: config.before_poll.clone(),
150 #[cfg(tokio_unstable)]
151 after_poll_callback: config.after_poll.clone(),
152 },
153 shared: Shared {
154 inject: Inject::new(),
155 owned: OwnedTasks::new(1),
156 woken: AtomicBool::new(false),
157 config,
158 scheduler_metrics: SchedulerMetrics::new(),
159 worker_metrics,
160 },
161 driver: driver_handle,
162 blocking_spawner,
163 seed_generator,
164 local_tid,
165 });
166
167 let core = AtomicCell::new(Some(Box::new(Core {
168 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169 tick: 0,
170 driver: Some(driver),
171 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172 global_queue_interval,
173 unhandled_panic: false,
174 })));
175
176 let scheduler = CurrentThread {
177 core,
178 notify: Notify::new(),
179 };
180
181 (scheduler, handle)
182 }
183
184 #[track_caller]
185 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186 pin!(future);
187
188 crate::runtime::context::enter_runtime(handle, false, |blocking| {
189 let handle = handle.as_current_thread();
190
191 loop {
195 if let Some(core) = self.take_core(handle) {
196 handle
197 .shared
198 .worker_metrics
199 .set_thread_id(thread::current().id());
200 return core.block_on(future);
201 } else {
202 let notified = self.notify.notified();
203 pin!(notified);
204
205 if let Some(out) = blocking
206 .block_on(poll_fn(|cx| {
207 if notified.as_mut().poll(cx).is_ready() {
208 return Ready(None);
209 }
210
211 if let Ready(out) = future.as_mut().poll(cx) {
212 return Ready(Some(out));
213 }
214
215 Pending
216 }))
217 .expect("Failed to `Enter::block_on`")
218 {
219 return out;
220 }
221 }
222 }
223 })
224 }
225
226 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227 let core = self.core.take()?;
228
229 Some(CoreGuard {
230 context: scheduler::Context::CurrentThread(Context {
231 handle: handle.clone(),
232 core: RefCell::new(Some(core)),
233 defer: Defer::new(),
234 }),
235 scheduler: self,
236 })
237 }
238
239 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240 let handle = handle.as_current_thread();
241
242 let core = match self.take_core(handle) {
246 Some(core) => core,
247 None if std::thread::panicking() => return,
248 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249 };
250
251 let tls_available = context::with_current(|_| ()).is_ok();
253
254 if tls_available {
255 core.enter(|core, _context| {
256 let core = shutdown2(core, handle);
257 (core, ())
258 });
259 } else {
260 let context = core.context.expect_current_thread();
264 let core = context.core.borrow_mut().take().unwrap();
265
266 let core = shutdown2(core, handle);
267 *context.core.borrow_mut() = Some(core);
268 }
269 }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273 handle.shared.owned.close_and_shutdown_all(0);
277
278 while let Some(task) = core.next_local_task(handle) {
281 drop(task);
282 }
283
284 handle.shared.inject.close();
286
287 while let Some(task) = handle.shared.inject.pop() {
289 drop(task);
290 }
291
292 assert!(handle.shared.owned.is_empty());
293
294 core.submit_metrics(handle);
296
297 if let Some(driver) = core.driver.as_mut() {
299 driver.shutdown(&handle.driver);
300 }
301
302 core
303}
304
305impl fmt::Debug for CurrentThread {
306 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307 fmt.debug_struct("CurrentThread").finish()
308 }
309}
310
311impl Core {
314 fn tick(&mut self) {
316 self.tick = self.tick.wrapping_add(1);
317 }
318
319 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320 if self.tick % self.global_queue_interval == 0 {
321 handle
322 .next_remote_task()
323 .or_else(|| self.next_local_task(handle))
324 } else {
325 self.next_local_task(handle)
326 .or_else(|| handle.next_remote_task())
327 }
328 }
329
330 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331 let ret = self.tasks.pop_front();
332 handle
333 .shared
334 .worker_metrics
335 .set_queue_depth(self.tasks.len());
336 ret
337 }
338
339 fn push_task(&mut self, handle: &Handle, task: Notified) {
340 self.tasks.push_back(task);
341 self.metrics.inc_local_schedule_count();
342 handle
343 .shared
344 .worker_metrics
345 .set_queue_depth(self.tasks.len());
346 }
347
348 fn submit_metrics(&mut self, handle: &Handle) {
349 self.metrics.submit(&handle.shared.worker_metrics, 0);
350 }
351}
352
353#[cfg(tokio_taskdump)]
354fn wake_deferred_tasks_and_free(context: &Context) {
355 let wakers = context.defer.take_deferred();
356 for waker in wakers {
357 waker.wake();
358 }
359}
360
361impl Context {
364 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367 core.metrics.start_poll();
368 let mut ret = self.enter(core, || crate::task::coop::budget(f));
369 ret.0.metrics.end_poll();
370 ret
371 }
372
373 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376 let mut driver = core.driver.take().expect("driver missing");
377
378 if let Some(f) = &handle.shared.config.before_park {
379 let (c, ()) = self.enter(core, || f());
380 core = c;
381 }
382
383 if core.tasks.is_empty() {
386 core.metrics.about_to_park();
388 core.submit_metrics(handle);
389
390 let (c, ()) = self.enter(core, || {
391 driver.park(&handle.driver);
392 self.defer.wake();
393 });
394
395 core = c;
396
397 core.metrics.unparked();
398 core.submit_metrics(handle);
399 }
400
401 if let Some(f) = &handle.shared.config.after_unpark {
402 let (c, ()) = self.enter(core, || f());
403 core = c;
404 }
405
406 core.driver = Some(driver);
407 core
408 }
409
410 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412 let mut driver = core.driver.take().expect("driver missing");
413
414 core.submit_metrics(handle);
415
416 let (mut core, ()) = self.enter(core, || {
417 driver.park_timeout(&handle.driver, Duration::from_millis(0));
418 self.defer.wake();
419 });
420
421 core.driver = Some(driver);
422 core
423 }
424
425 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
426 *self.core.borrow_mut() = Some(core);
430
431 let ret = f();
433
434 let core = self.core.borrow_mut().take().expect("core missing");
436 (core, ret)
437 }
438
439 pub(crate) fn defer(&self, waker: &Waker) {
440 self.defer.defer(waker);
441 }
442}
443
444impl Handle {
447 pub(crate) fn spawn<F>(
449 me: &Arc<Self>,
450 future: F,
451 id: crate::runtime::task::Id,
452 ) -> JoinHandle<F::Output>
453 where
454 F: crate::future::Future + Send + 'static,
455 F::Output: Send + 'static,
456 {
457 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
458
459 me.task_hooks.spawn(&TaskMeta {
460 id,
461 _phantom: Default::default(),
462 });
463
464 if let Some(notified) = notified {
465 me.schedule(notified);
466 }
467
468 handle
469 }
470
471 pub(crate) unsafe fn spawn_local<F>(
478 me: &Arc<Self>,
479 future: F,
480 id: crate::runtime::task::Id,
481 ) -> JoinHandle<F::Output>
482 where
483 F: crate::future::Future + 'static,
484 F::Output: 'static,
485 {
486 let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
487
488 me.task_hooks.spawn(&TaskMeta {
489 id,
490 _phantom: Default::default(),
491 });
492
493 if let Some(notified) = notified {
494 me.schedule(notified);
495 }
496
497 handle
498 }
499
500 #[cfg(all(
502 tokio_unstable,
503 tokio_taskdump,
504 target_os = "linux",
505 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
506 ))]
507 pub(crate) fn dump(&self) -> crate::runtime::Dump {
508 use crate::runtime::dump;
509 use task::trace::trace_current_thread;
510
511 let mut traces = vec![];
512
513 context::with_scheduler(|maybe_context| {
515 let context = if let Some(context) = maybe_context {
517 context.expect_current_thread()
518 } else {
519 return;
520 };
521 let mut maybe_core = context.core.borrow_mut();
522 let core = if let Some(core) = maybe_core.as_mut() {
523 core
524 } else {
525 return;
526 };
527 let local = &mut core.tasks;
528
529 if self.shared.inject.is_closed() {
530 return;
531 }
532
533 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
534 .into_iter()
535 .map(|(id, trace)| dump::Task::new(id, trace))
536 .collect();
537
538 drop(maybe_core);
540
541 wake_deferred_tasks_and_free(context);
545 });
546
547 dump::Dump::new(traces)
548 }
549
550 fn next_remote_task(&self) -> Option<Notified> {
551 self.shared.inject.pop()
552 }
553
554 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
555 me.shared.woken.store(true, Release);
558 waker_ref(me)
559 }
560
561 pub(crate) fn reset_woken(&self) -> bool {
563 self.shared.woken.swap(false, AcqRel)
564 }
565
566 pub(crate) fn num_alive_tasks(&self) -> usize {
567 self.shared.owned.num_alive_tasks()
568 }
569
570 pub(crate) fn injection_queue_depth(&self) -> usize {
571 self.shared.inject.len()
572 }
573}
574
575cfg_unstable_metrics! {
576 impl Handle {
577 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
578 &self.shared.scheduler_metrics
579 }
580
581 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
582 assert_eq!(0, worker);
583 &self.shared.worker_metrics
584 }
585
586 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
587 self.worker_metrics(worker).queue_depth()
588 }
589
590 pub(crate) fn num_blocking_threads(&self) -> usize {
591 self.blocking_spawner.num_threads()
592 }
593
594 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
595 self.blocking_spawner.num_idle_threads()
596 }
597
598 pub(crate) fn blocking_queue_depth(&self) -> usize {
599 self.blocking_spawner.queue_depth()
600 }
601
602 cfg_64bit_metrics! {
603 pub(crate) fn spawned_tasks_count(&self) -> u64 {
604 self.shared.owned.spawned_tasks_count()
605 }
606 }
607 }
608}
609
610cfg_unstable! {
611 use std::num::NonZeroU64;
612
613 impl Handle {
614 pub(crate) fn owned_id(&self) -> NonZeroU64 {
615 self.shared.owned.id
616 }
617 }
618}
619
620impl fmt::Debug for Handle {
621 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
622 fmt.debug_struct("current_thread::Handle { ... }").finish()
623 }
624}
625
626impl Schedule for Arc<Handle> {
629 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
630 self.shared.owned.remove(task)
631 }
632
633 fn schedule(&self, task: task::Notified<Self>) {
634 use scheduler::Context::CurrentThread;
635
636 context::with_scheduler(|maybe_cx| match maybe_cx {
637 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
638 let mut core = cx.core.borrow_mut();
639
640 if let Some(core) = core.as_mut() {
643 core.push_task(self, task);
644 }
645 }
646 _ => {
647 self.shared.scheduler_metrics.inc_remote_schedule_count();
649
650 self.shared.inject.push(task);
652 self.driver.unpark();
653 }
654 });
655 }
656
657 fn hooks(&self) -> TaskHarnessScheduleHooks {
658 TaskHarnessScheduleHooks {
659 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
660 }
661 }
662
663 cfg_unstable! {
664 fn unhandled_panic(&self) {
665 use crate::runtime::UnhandledPanic;
666
667 match self.shared.config.unhandled_panic {
668 UnhandledPanic::Ignore => {
669 }
671 UnhandledPanic::ShutdownRuntime => {
672 use scheduler::Context::CurrentThread;
673
674 context::with_scheduler(|maybe_cx| match maybe_cx {
679 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
680 let mut core = cx.core.borrow_mut();
681
682 if let Some(core) = core.as_mut() {
684 core.unhandled_panic = true;
685 self.shared.owned.close_and_shutdown_all(0);
686 }
687 }
688 _ => unreachable!("runtime core not set in CURRENT thread-local"),
689 })
690 }
691 }
692 }
693 }
694}
695
696impl Wake for Handle {
697 fn wake(arc_self: Arc<Self>) {
698 Wake::wake_by_ref(&arc_self);
699 }
700
701 fn wake_by_ref(arc_self: &Arc<Self>) {
703 arc_self.shared.woken.store(true, Release);
704 arc_self.driver.unpark();
705 }
706}
707
708struct CoreGuard<'a> {
713 context: scheduler::Context,
714 scheduler: &'a CurrentThread,
715}
716
717impl CoreGuard<'_> {
718 #[track_caller]
719 fn block_on<F: Future>(self, future: F) -> F::Output {
720 let ret = self.enter(|mut core, context| {
721 let waker = Handle::waker_ref(&context.handle);
722 let mut cx = std::task::Context::from_waker(&waker);
723
724 pin!(future);
725
726 core.metrics.start_processing_scheduled_tasks();
727
728 'outer: loop {
729 let handle = &context.handle;
730
731 if handle.reset_woken() {
732 let (c, res) = context.enter(core, || {
733 crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
734 });
735
736 core = c;
737
738 if let Ready(v) = res {
739 return (core, Some(v));
740 }
741 }
742
743 for _ in 0..handle.shared.config.event_interval {
744 if core.unhandled_panic {
746 return (core, None);
747 }
748
749 core.tick();
750
751 let entry = core.next_task(handle);
752
753 let task = match entry {
754 Some(entry) => entry,
755 None => {
756 core.metrics.end_processing_scheduled_tasks();
757
758 core = if !context.defer.is_empty() {
759 context.park_yield(core, handle)
760 } else {
761 context.park(core, handle)
762 };
763
764 core.metrics.start_processing_scheduled_tasks();
765
766 continue 'outer;
768 }
769 };
770
771 let task = context.handle.shared.owned.assert_owner(task);
772
773 #[cfg(tokio_unstable)]
774 let task_id = task.task_id();
775
776 let (c, ()) = context.run_task(core, || {
777 #[cfg(tokio_unstable)]
778 context.handle.task_hooks.poll_start_callback(task_id);
779
780 task.run();
781
782 #[cfg(tokio_unstable)]
783 context.handle.task_hooks.poll_stop_callback(task_id);
784 });
785
786 core = c;
787 }
788
789 core.metrics.end_processing_scheduled_tasks();
790
791 core = context.park_yield(core, handle);
794
795 core.metrics.start_processing_scheduled_tasks();
796 }
797 });
798
799 match ret {
800 Some(ret) => ret,
801 None => {
802 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
804 }
805 }
806 }
807
808 fn enter<F, R>(self, f: F) -> R
811 where
812 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
813 {
814 let context = self.context.expect_current_thread();
815
816 let core = context.core.borrow_mut().take().expect("core missing");
818
819 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
821
822 *context.core.borrow_mut() = Some(core);
823
824 ret
825 }
826}
827
828impl Drop for CoreGuard<'_> {
829 fn drop(&mut self) {
830 let context = self.context.expect_current_thread();
831
832 if let Some(core) = context.core.borrow_mut().take() {
833 self.scheduler.core.set(core);
836
837 self.scheduler.notify.notify_one();
839 }
840 }
841}