tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5#[cfg(tokio_unstable)]
6use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
7use crate::util::rand::{RngSeed, RngSeedGenerator};
8
9use crate::runtime::blocking::BlockingPool;
10use crate::runtime::scheduler::CurrentThread;
11use std::fmt;
12use std::io;
13use std::thread::ThreadId;
14use std::time::Duration;
15
16/// Builds Tokio Runtime with custom configuration values.
17///
18/// Methods can be chained in order to set the configuration values. The
19/// Runtime is constructed by calling [`build`].
20///
21/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
22/// or [`Builder::new_current_thread`].
23///
24/// See function level documentation for details on the various configuration
25/// settings.
26///
27/// [`build`]: method@Self::build
28/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
29/// [`Builder::new_current_thread`]: method@Self::new_current_thread
30///
31/// # Examples
32///
33/// ```
34/// use tokio::runtime::Builder;
35///
36/// fn main() {
37/// // build runtime
38/// let runtime = Builder::new_multi_thread()
39/// .worker_threads(4)
40/// .thread_name("my-custom-name")
41/// .thread_stack_size(3 * 1024 * 1024)
42/// .build()
43/// .unwrap();
44///
45/// // use runtime ...
46/// }
47/// ```
48pub struct Builder {
49 /// Runtime type
50 kind: Kind,
51
52 /// Whether or not to enable the I/O driver
53 enable_io: bool,
54 nevents: usize,
55
56 /// Whether or not to enable the time driver
57 enable_time: bool,
58
59 /// Whether or not the clock should start paused.
60 start_paused: bool,
61
62 /// The number of worker threads, used by Runtime.
63 ///
64 /// Only used when not using the current-thread executor.
65 worker_threads: Option<usize>,
66
67 /// Cap on thread usage.
68 max_blocking_threads: usize,
69
70 /// Name fn used for threads spawned by the runtime.
71 pub(super) thread_name: ThreadNameFn,
72
73 /// Stack size used for threads spawned by the runtime.
74 pub(super) thread_stack_size: Option<usize>,
75
76 /// Callback to run after each thread starts.
77 pub(super) after_start: Option<Callback>,
78
79 /// To run before each worker thread stops
80 pub(super) before_stop: Option<Callback>,
81
82 /// To run before each worker thread is parked.
83 pub(super) before_park: Option<Callback>,
84
85 /// To run after each thread is unparked.
86 pub(super) after_unpark: Option<Callback>,
87
88 /// To run before each task is spawned.
89 pub(super) before_spawn: Option<TaskCallback>,
90
91 /// To run before each poll
92 #[cfg(tokio_unstable)]
93 pub(super) before_poll: Option<TaskCallback>,
94
95 /// To run after each poll
96 #[cfg(tokio_unstable)]
97 pub(super) after_poll: Option<TaskCallback>,
98
99 /// To run after each task is terminated.
100 pub(super) after_termination: Option<TaskCallback>,
101
102 /// Customizable keep alive timeout for `BlockingPool`
103 pub(super) keep_alive: Option<Duration>,
104
105 /// How many ticks before pulling a task from the global/remote queue?
106 ///
107 /// When `None`, the value is unspecified and behavior details are left to
108 /// the scheduler. Each scheduler flavor could choose to either pick its own
109 /// default value or use some other strategy to decide when to poll from the
110 /// global queue. For example, the multi-threaded scheduler uses a
111 /// self-tuning strategy based on mean task poll times.
112 pub(super) global_queue_interval: Option<u32>,
113
114 /// How many ticks before yielding to the driver for timer and I/O events?
115 pub(super) event_interval: u32,
116
117 pub(super) local_queue_capacity: usize,
118
119 /// When true, the multi-threade scheduler LIFO slot should not be used.
120 ///
121 /// This option should only be exposed as unstable.
122 pub(super) disable_lifo_slot: bool,
123
124 /// Specify a random number generator seed to provide deterministic results
125 pub(super) seed_generator: RngSeedGenerator,
126
127 /// When true, enables task poll count histogram instrumentation.
128 pub(super) metrics_poll_count_histogram_enable: bool,
129
130 /// Configures the task poll count histogram
131 pub(super) metrics_poll_count_histogram: HistogramBuilder,
132
133 #[cfg(tokio_unstable)]
134 pub(super) unhandled_panic: UnhandledPanic,
135}
136
137cfg_unstable! {
138 /// How the runtime should respond to unhandled panics.
139 ///
140 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
141 /// to configure the runtime behavior when a spawned task panics.
142 ///
143 /// See [`Builder::unhandled_panic`] for more details.
144 #[derive(Debug, Clone)]
145 #[non_exhaustive]
146 pub enum UnhandledPanic {
147 /// The runtime should ignore panics on spawned tasks.
148 ///
149 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
150 /// tasks continue running normally.
151 ///
152 /// This is the default behavior.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use tokio::runtime::{self, UnhandledPanic};
158 ///
159 /// # pub fn main() {
160 /// let rt = runtime::Builder::new_current_thread()
161 /// .unhandled_panic(UnhandledPanic::Ignore)
162 /// .build()
163 /// .unwrap();
164 ///
165 /// let task1 = rt.spawn(async { panic!("boom"); });
166 /// let task2 = rt.spawn(async {
167 /// // This task completes normally
168 /// "done"
169 /// });
170 ///
171 /// rt.block_on(async {
172 /// // The panic on the first task is forwarded to the `JoinHandle`
173 /// assert!(task1.await.is_err());
174 ///
175 /// // The second task completes normally
176 /// assert!(task2.await.is_ok());
177 /// })
178 /// # }
179 /// ```
180 ///
181 /// [`JoinHandle`]: struct@crate::task::JoinHandle
182 Ignore,
183
184 /// The runtime should immediately shutdown if a spawned task panics.
185 ///
186 /// The runtime will immediately shutdown even if the panicked task's
187 /// [`JoinHandle`] is still available. All further spawned tasks will be
188 /// immediately dropped and call to [`Runtime::block_on`] will panic.
189 ///
190 /// # Examples
191 ///
192 /// ```should_panic
193 /// use tokio::runtime::{self, UnhandledPanic};
194 ///
195 /// # pub fn main() {
196 /// let rt = runtime::Builder::new_current_thread()
197 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
198 /// .build()
199 /// .unwrap();
200 ///
201 /// rt.spawn(async { panic!("boom"); });
202 /// rt.spawn(async {
203 /// // This task never completes.
204 /// });
205 ///
206 /// rt.block_on(async {
207 /// // Do some work
208 /// # loop { tokio::task::yield_now().await; }
209 /// })
210 /// # }
211 /// ```
212 ///
213 /// [`JoinHandle`]: struct@crate::task::JoinHandle
214 ShutdownRuntime,
215 }
216}
217
218pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
219
220#[derive(Clone, Copy)]
221pub(crate) enum Kind {
222 CurrentThread,
223 #[cfg(feature = "rt-multi-thread")]
224 MultiThread,
225 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
226 MultiThreadAlt,
227}
228
229impl Builder {
230 /// Returns a new builder with the current thread scheduler selected.
231 ///
232 /// Configuration methods can be chained on the return value.
233 ///
234 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
235 /// [`LocalSet`].
236 ///
237 /// [`LocalSet`]: crate::task::LocalSet
238 pub fn new_current_thread() -> Builder {
239 #[cfg(loom)]
240 const EVENT_INTERVAL: u32 = 4;
241 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
242 #[cfg(not(loom))]
243 const EVENT_INTERVAL: u32 = 61;
244
245 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
246 }
247
248 /// Returns a new builder with the multi thread scheduler selected.
249 ///
250 /// Configuration methods can be chained on the return value.
251 #[cfg(feature = "rt-multi-thread")]
252 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
253 pub fn new_multi_thread() -> Builder {
254 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
255 Builder::new(Kind::MultiThread, 61)
256 }
257
258 cfg_unstable! {
259 /// Returns a new builder with the alternate multi thread scheduler
260 /// selected.
261 ///
262 /// The alternate multi threaded scheduler is an in-progress
263 /// candidate to replace the existing multi threaded scheduler. It
264 /// currently does not scale as well to 16+ processors.
265 ///
266 /// This runtime flavor is currently **not considered production
267 /// ready**.
268 ///
269 /// Configuration methods can be chained on the return value.
270 #[cfg(feature = "rt-multi-thread")]
271 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
272 pub fn new_multi_thread_alt() -> Builder {
273 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
274 Builder::new(Kind::MultiThreadAlt, 61)
275 }
276 }
277
278 /// Returns a new runtime builder initialized with default configuration
279 /// values.
280 ///
281 /// Configuration methods can be chained on the return value.
282 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
283 Builder {
284 kind,
285
286 // I/O defaults to "off"
287 enable_io: false,
288 nevents: 1024,
289
290 // Time defaults to "off"
291 enable_time: false,
292
293 // The clock starts not-paused
294 start_paused: false,
295
296 // Read from environment variable first in multi-threaded mode.
297 // Default to lazy auto-detection (one thread per CPU core)
298 worker_threads: None,
299
300 max_blocking_threads: 512,
301
302 // Default thread name
303 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
304
305 // Do not set a stack size by default
306 thread_stack_size: None,
307
308 // No worker thread callbacks
309 after_start: None,
310 before_stop: None,
311 before_park: None,
312 after_unpark: None,
313
314 before_spawn: None,
315 after_termination: None,
316
317 #[cfg(tokio_unstable)]
318 before_poll: None,
319 #[cfg(tokio_unstable)]
320 after_poll: None,
321
322 keep_alive: None,
323
324 // Defaults for these values depend on the scheduler kind, so we get them
325 // as parameters.
326 global_queue_interval: None,
327 event_interval,
328
329 #[cfg(not(loom))]
330 local_queue_capacity: 256,
331
332 #[cfg(loom)]
333 local_queue_capacity: 4,
334
335 seed_generator: RngSeedGenerator::new(RngSeed::new()),
336
337 #[cfg(tokio_unstable)]
338 unhandled_panic: UnhandledPanic::Ignore,
339
340 metrics_poll_count_histogram_enable: false,
341
342 metrics_poll_count_histogram: HistogramBuilder::default(),
343
344 disable_lifo_slot: false,
345 }
346 }
347
348 /// Enables both I/O and time drivers.
349 ///
350 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
351 /// individually. If additional components are added to Tokio in the future,
352 /// `enable_all` will include these future components.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// use tokio::runtime;
358 ///
359 /// let rt = runtime::Builder::new_multi_thread()
360 /// .enable_all()
361 /// .build()
362 /// .unwrap();
363 /// ```
364 pub fn enable_all(&mut self) -> &mut Self {
365 #[cfg(any(
366 feature = "net",
367 all(unix, feature = "process"),
368 all(unix, feature = "signal")
369 ))]
370 self.enable_io();
371 #[cfg(feature = "time")]
372 self.enable_time();
373
374 self
375 }
376
377 /// Sets the number of worker threads the `Runtime` will use.
378 ///
379 /// This can be any number above 0 though it is advised to keep this value
380 /// on the smaller side.
381 ///
382 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
383 ///
384 /// # Default
385 ///
386 /// The default value is the number of cores available to the system.
387 ///
388 /// When using the `current_thread` runtime this method has no effect.
389 ///
390 /// # Examples
391 ///
392 /// ## Multi threaded runtime with 4 threads
393 ///
394 /// ```
395 /// use tokio::runtime;
396 ///
397 /// // This will spawn a work-stealing runtime with 4 worker threads.
398 /// let rt = runtime::Builder::new_multi_thread()
399 /// .worker_threads(4)
400 /// .build()
401 /// .unwrap();
402 ///
403 /// rt.spawn(async move {});
404 /// ```
405 ///
406 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
407 ///
408 /// ```
409 /// use tokio::runtime;
410 ///
411 /// // Create a runtime that _must_ be driven from a call
412 /// // to `Runtime::block_on`.
413 /// let rt = runtime::Builder::new_current_thread()
414 /// .build()
415 /// .unwrap();
416 ///
417 /// // This will run the runtime and future on the current thread
418 /// rt.block_on(async move {});
419 /// ```
420 ///
421 /// # Panics
422 ///
423 /// This will panic if `val` is not larger than `0`.
424 #[track_caller]
425 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
426 assert!(val > 0, "Worker threads cannot be set to 0");
427 self.worker_threads = Some(val);
428 self
429 }
430
431 /// Specifies the limit for additional threads spawned by the Runtime.
432 ///
433 /// These threads are used for blocking operations like tasks spawned
434 /// through [`spawn_blocking`], this includes but is not limited to:
435 /// - [`fs`] operations
436 /// - dns resolution through [`ToSocketAddrs`]
437 /// - writing to [`Stdout`] or [`Stderr`]
438 /// - reading from [`Stdin`]
439 ///
440 /// Unlike the [`worker_threads`], they are not always active and will exit
441 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
442 ///
443 /// It's recommended to not set this limit too low in order to avoid hanging on operations
444 /// requiring [`spawn_blocking`].
445 ///
446 /// The default value is 512.
447 ///
448 /// # Panics
449 ///
450 /// This will panic if `val` is not larger than `0`.
451 ///
452 /// # Upgrading from 0.x
453 ///
454 /// In old versions `max_threads` limited both blocking and worker threads, but the
455 /// current `max_blocking_threads` does not include async worker threads in the count.
456 ///
457 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
458 /// [`fs`]: mod@crate::fs
459 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
460 /// [`Stdout`]: struct@crate::io::Stdout
461 /// [`Stdin`]: struct@crate::io::Stdin
462 /// [`Stderr`]: struct@crate::io::Stderr
463 /// [`worker_threads`]: Self::worker_threads
464 /// [`thread_keep_alive`]: Self::thread_keep_alive
465 #[track_caller]
466 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
467 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
468 assert!(val > 0, "Max blocking threads cannot be set to 0");
469 self.max_blocking_threads = val;
470 self
471 }
472
473 /// Sets name of threads spawned by the `Runtime`'s thread pool.
474 ///
475 /// The default name is "tokio-runtime-worker".
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// # use tokio::runtime;
481 ///
482 /// # pub fn main() {
483 /// let rt = runtime::Builder::new_multi_thread()
484 /// .thread_name("my-pool")
485 /// .build();
486 /// # }
487 /// ```
488 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
489 let val = val.into();
490 self.thread_name = std::sync::Arc::new(move || val.clone());
491 self
492 }
493
494 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
495 ///
496 /// The default name fn is `|| "tokio-runtime-worker".into()`.
497 ///
498 /// # Examples
499 ///
500 /// ```
501 /// # use tokio::runtime;
502 /// # use std::sync::atomic::{AtomicUsize, Ordering};
503 /// # pub fn main() {
504 /// let rt = runtime::Builder::new_multi_thread()
505 /// .thread_name_fn(|| {
506 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
507 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
508 /// format!("my-pool-{}", id)
509 /// })
510 /// .build();
511 /// # }
512 /// ```
513 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
514 where
515 F: Fn() -> String + Send + Sync + 'static,
516 {
517 self.thread_name = std::sync::Arc::new(f);
518 self
519 }
520
521 /// Sets the stack size (in bytes) for worker threads.
522 ///
523 /// The actual stack size may be greater than this value if the platform
524 /// specifies minimal stack size.
525 ///
526 /// The default stack size for spawned threads is 2 MiB, though this
527 /// particular stack size is subject to change in the future.
528 ///
529 /// # Examples
530 ///
531 /// ```
532 /// # use tokio::runtime;
533 ///
534 /// # pub fn main() {
535 /// let rt = runtime::Builder::new_multi_thread()
536 /// .thread_stack_size(32 * 1024)
537 /// .build();
538 /// # }
539 /// ```
540 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
541 self.thread_stack_size = Some(val);
542 self
543 }
544
545 /// Executes function `f` after each thread is started but before it starts
546 /// doing work.
547 ///
548 /// This is intended for bookkeeping and monitoring use cases.
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// # use tokio::runtime;
554 /// # pub fn main() {
555 /// let runtime = runtime::Builder::new_multi_thread()
556 /// .on_thread_start(|| {
557 /// println!("thread started");
558 /// })
559 /// .build();
560 /// # }
561 /// ```
562 #[cfg(not(loom))]
563 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
564 where
565 F: Fn() + Send + Sync + 'static,
566 {
567 self.after_start = Some(std::sync::Arc::new(f));
568 self
569 }
570
571 /// Executes function `f` before each thread stops.
572 ///
573 /// This is intended for bookkeeping and monitoring use cases.
574 ///
575 /// # Examples
576 ///
577 /// ```
578 /// # use tokio::runtime;
579 /// # pub fn main() {
580 /// let runtime = runtime::Builder::new_multi_thread()
581 /// .on_thread_stop(|| {
582 /// println!("thread stopping");
583 /// })
584 /// .build();
585 /// # }
586 /// ```
587 #[cfg(not(loom))]
588 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
589 where
590 F: Fn() + Send + Sync + 'static,
591 {
592 self.before_stop = Some(std::sync::Arc::new(f));
593 self
594 }
595
596 /// Executes function `f` just before a thread is parked (goes idle).
597 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
598 /// can be called, and may result in this thread being unparked immediately.
599 ///
600 /// This can be used to start work only when the executor is idle, or for bookkeeping
601 /// and monitoring purposes.
602 ///
603 /// Note: There can only be one park callback for a runtime; calling this function
604 /// more than once replaces the last callback defined, rather than adding to it.
605 ///
606 /// # Examples
607 ///
608 /// ## Multithreaded executor
609 /// ```
610 /// # use std::sync::Arc;
611 /// # use std::sync::atomic::{AtomicBool, Ordering};
612 /// # use tokio::runtime;
613 /// # use tokio::sync::Barrier;
614 /// # pub fn main() {
615 /// let once = AtomicBool::new(true);
616 /// let barrier = Arc::new(Barrier::new(2));
617 ///
618 /// let runtime = runtime::Builder::new_multi_thread()
619 /// .worker_threads(1)
620 /// .on_thread_park({
621 /// let barrier = barrier.clone();
622 /// move || {
623 /// let barrier = barrier.clone();
624 /// if once.swap(false, Ordering::Relaxed) {
625 /// tokio::spawn(async move { barrier.wait().await; });
626 /// }
627 /// }
628 /// })
629 /// .build()
630 /// .unwrap();
631 ///
632 /// runtime.block_on(async {
633 /// barrier.wait().await;
634 /// })
635 /// # }
636 /// ```
637 /// ## Current thread executor
638 /// ```
639 /// # use std::sync::Arc;
640 /// # use std::sync::atomic::{AtomicBool, Ordering};
641 /// # use tokio::runtime;
642 /// # use tokio::sync::Barrier;
643 /// # pub fn main() {
644 /// let once = AtomicBool::new(true);
645 /// let barrier = Arc::new(Barrier::new(2));
646 ///
647 /// let runtime = runtime::Builder::new_current_thread()
648 /// .on_thread_park({
649 /// let barrier = barrier.clone();
650 /// move || {
651 /// let barrier = barrier.clone();
652 /// if once.swap(false, Ordering::Relaxed) {
653 /// tokio::spawn(async move { barrier.wait().await; });
654 /// }
655 /// }
656 /// })
657 /// .build()
658 /// .unwrap();
659 ///
660 /// runtime.block_on(async {
661 /// barrier.wait().await;
662 /// })
663 /// # }
664 /// ```
665 #[cfg(not(loom))]
666 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
667 where
668 F: Fn() + Send + Sync + 'static,
669 {
670 self.before_park = Some(std::sync::Arc::new(f));
671 self
672 }
673
674 /// Executes function `f` just after a thread unparks (starts executing tasks).
675 ///
676 /// This is intended for bookkeeping and monitoring use cases; note that work
677 /// in this callback will increase latencies when the application has allowed one or
678 /// more runtime threads to go idle.
679 ///
680 /// Note: There can only be one unpark callback for a runtime; calling this function
681 /// more than once replaces the last callback defined, rather than adding to it.
682 ///
683 /// # Examples
684 ///
685 /// ```
686 /// # use tokio::runtime;
687 /// # pub fn main() {
688 /// let runtime = runtime::Builder::new_multi_thread()
689 /// .on_thread_unpark(|| {
690 /// println!("thread unparking");
691 /// })
692 /// .build();
693 ///
694 /// runtime.unwrap().block_on(async {
695 /// tokio::task::yield_now().await;
696 /// println!("Hello from Tokio!");
697 /// })
698 /// # }
699 /// ```
700 #[cfg(not(loom))]
701 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
702 where
703 F: Fn() + Send + Sync + 'static,
704 {
705 self.after_unpark = Some(std::sync::Arc::new(f));
706 self
707 }
708
709 /// Executes function `f` just before a task is spawned.
710 ///
711 /// `f` is called within the Tokio context, so functions like
712 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
713 /// invoked immediately.
714 ///
715 /// This can be used for bookkeeping or monitoring purposes.
716 ///
717 /// Note: There can only be one spawn callback for a runtime; calling this function more
718 /// than once replaces the last callback defined, rather than adding to it.
719 ///
720 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
721 ///
722 /// **Note**: This is an [unstable API][unstable]. The public API of this type
723 /// may break in 1.x releases. See [the documentation on unstable
724 /// features][unstable] for details.
725 ///
726 /// [unstable]: crate#unstable-features
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// # use tokio::runtime;
732 /// # pub fn main() {
733 /// let runtime = runtime::Builder::new_current_thread()
734 /// .on_task_spawn(|_| {
735 /// println!("spawning task");
736 /// })
737 /// .build()
738 /// .unwrap();
739 ///
740 /// runtime.block_on(async {
741 /// tokio::task::spawn(std::future::ready(()));
742 ///
743 /// for _ in 0..64 {
744 /// tokio::task::yield_now().await;
745 /// }
746 /// })
747 /// # }
748 /// ```
749 #[cfg(all(not(loom), tokio_unstable))]
750 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
751 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
752 where
753 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
754 {
755 self.before_spawn = Some(std::sync::Arc::new(f));
756 self
757 }
758
759 /// Executes function `f` just before a task is polled
760 ///
761 /// `f` is called within the Tokio context, so functions like
762 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
763 /// invoked immediately.
764 ///
765 /// **Note**: This is an [unstable API][unstable]. The public API of this type
766 /// may break in 1.x releases. See [the documentation on unstable
767 /// features][unstable] for details.
768 ///
769 /// [unstable]: crate#unstable-features
770 ///
771 /// # Examples
772 ///
773 /// ```
774 /// # use std::sync::{atomic::AtomicUsize, Arc};
775 /// # use tokio::task::yield_now;
776 /// # pub fn main() {
777 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
778 /// let poll_start = poll_start_counter.clone();
779 /// let rt = tokio::runtime::Builder::new_multi_thread()
780 /// .enable_all()
781 /// .on_before_task_poll(move |meta| {
782 /// println!("task {} is about to be polled", meta.id())
783 /// })
784 /// .build()
785 /// .unwrap();
786 /// let task = rt.spawn(async {
787 /// yield_now().await;
788 /// });
789 /// let _ = rt.block_on(task);
790 ///
791 /// # }
792 /// ```
793 #[cfg(tokio_unstable)]
794 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
795 where
796 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
797 {
798 self.before_poll = Some(std::sync::Arc::new(f));
799 self
800 }
801
802 /// Executes function `f` just after a task is polled
803 ///
804 /// `f` is called within the Tokio context, so functions like
805 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
806 /// invoked immediately.
807 ///
808 /// **Note**: This is an [unstable API][unstable]. The public API of this type
809 /// may break in 1.x releases. See [the documentation on unstable
810 /// features][unstable] for details.
811 ///
812 /// [unstable]: crate#unstable-features
813 ///
814 /// # Examples
815 ///
816 /// ```
817 /// # use std::sync::{atomic::AtomicUsize, Arc};
818 /// # use tokio::task::yield_now;
819 /// # pub fn main() {
820 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
821 /// let poll_stop = poll_stop_counter.clone();
822 /// let rt = tokio::runtime::Builder::new_multi_thread()
823 /// .enable_all()
824 /// .on_after_task_poll(move |meta| {
825 /// println!("task {} completed polling", meta.id());
826 /// })
827 /// .build()
828 /// .unwrap();
829 /// let task = rt.spawn(async {
830 /// yield_now().await;
831 /// });
832 /// let _ = rt.block_on(task);
833 ///
834 /// # }
835 /// ```
836 #[cfg(tokio_unstable)]
837 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
838 where
839 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
840 {
841 self.after_poll = Some(std::sync::Arc::new(f));
842 self
843 }
844
845 /// Executes function `f` just after a task is terminated.
846 ///
847 /// `f` is called within the Tokio context, so functions like
848 /// [`tokio::spawn`](crate::spawn) can be called.
849 ///
850 /// This can be used for bookkeeping or monitoring purposes.
851 ///
852 /// Note: There can only be one task termination callback for a runtime; calling this
853 /// function more than once replaces the last callback defined, rather than adding to it.
854 ///
855 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
856 ///
857 /// **Note**: This is an [unstable API][unstable]. The public API of this type
858 /// may break in 1.x releases. See [the documentation on unstable
859 /// features][unstable] for details.
860 ///
861 /// [unstable]: crate#unstable-features
862 ///
863 /// # Examples
864 ///
865 /// ```
866 /// # use tokio::runtime;
867 /// # pub fn main() {
868 /// let runtime = runtime::Builder::new_current_thread()
869 /// .on_task_terminate(|_| {
870 /// println!("killing task");
871 /// })
872 /// .build()
873 /// .unwrap();
874 ///
875 /// runtime.block_on(async {
876 /// tokio::task::spawn(std::future::ready(()));
877 ///
878 /// for _ in 0..64 {
879 /// tokio::task::yield_now().await;
880 /// }
881 /// })
882 /// # }
883 /// ```
884 #[cfg(all(not(loom), tokio_unstable))]
885 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
886 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
887 where
888 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
889 {
890 self.after_termination = Some(std::sync::Arc::new(f));
891 self
892 }
893
894 /// Creates the configured `Runtime`.
895 ///
896 /// The returned `Runtime` instance is ready to spawn tasks.
897 ///
898 /// # Examples
899 ///
900 /// ```
901 /// use tokio::runtime::Builder;
902 ///
903 /// let rt = Builder::new_multi_thread().build().unwrap();
904 ///
905 /// rt.block_on(async {
906 /// println!("Hello from the Tokio runtime");
907 /// });
908 /// ```
909 pub fn build(&mut self) -> io::Result<Runtime> {
910 match &self.kind {
911 Kind::CurrentThread => self.build_current_thread_runtime(),
912 #[cfg(feature = "rt-multi-thread")]
913 Kind::MultiThread => self.build_threaded_runtime(),
914 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
915 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
916 }
917 }
918
919 /// Creates the configured `LocalRuntime`.
920 ///
921 /// The returned `LocalRuntime` instance is ready to spawn tasks.
922 ///
923 /// # Panics
924 /// This will panic if `current_thread` is not the selected runtime flavor.
925 /// All other runtime flavors are unsupported by [`LocalRuntime`].
926 ///
927 /// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
928 ///
929 /// # Examples
930 ///
931 /// ```
932 /// use tokio::runtime::Builder;
933 ///
934 /// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
935 ///
936 /// rt.block_on(async {
937 /// println!("Hello from the Tokio runtime");
938 /// });
939 /// ```
940 #[allow(unused_variables, unreachable_patterns)]
941 #[cfg(tokio_unstable)]
942 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
943 pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
944 match &self.kind {
945 Kind::CurrentThread => self.build_current_thread_local_runtime(),
946 _ => panic!("Only current_thread is supported when building a local runtime"),
947 }
948 }
949
950 fn get_cfg(&self, workers: usize) -> driver::Cfg {
951 driver::Cfg {
952 enable_pause_time: match self.kind {
953 Kind::CurrentThread => true,
954 #[cfg(feature = "rt-multi-thread")]
955 Kind::MultiThread => false,
956 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
957 Kind::MultiThreadAlt => false,
958 },
959 enable_io: self.enable_io,
960 enable_time: self.enable_time,
961 start_paused: self.start_paused,
962 nevents: self.nevents,
963 workers,
964 }
965 }
966
967 /// Sets a custom timeout for a thread in the blocking pool.
968 ///
969 /// By default, the timeout for a thread is set to 10 seconds. This can
970 /// be overridden using `.thread_keep_alive()`.
971 ///
972 /// # Example
973 ///
974 /// ```
975 /// # use tokio::runtime;
976 /// # use std::time::Duration;
977 /// # pub fn main() {
978 /// let rt = runtime::Builder::new_multi_thread()
979 /// .thread_keep_alive(Duration::from_millis(100))
980 /// .build();
981 /// # }
982 /// ```
983 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
984 self.keep_alive = Some(duration);
985 self
986 }
987
988 /// Sets the number of scheduler ticks after which the scheduler will poll the global
989 /// task queue.
990 ///
991 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
992 ///
993 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
994 /// [the module documentation] for the default behavior of the multi-thread scheduler.
995 ///
996 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
997 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
998 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
999 /// getting started on new work, especially if tasks frequently yield rather than complete
1000 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
1001 /// is a good choice when most tasks quickly complete polling.
1002 ///
1003 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1004 ///
1005 /// # Panics
1006 ///
1007 /// This function will panic if 0 is passed as an argument.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ```
1012 /// # use tokio::runtime;
1013 /// # pub fn main() {
1014 /// let rt = runtime::Builder::new_multi_thread()
1015 /// .global_queue_interval(31)
1016 /// .build();
1017 /// # }
1018 /// ```
1019 #[track_caller]
1020 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1021 assert!(val > 0, "global_queue_interval must be greater than 0");
1022 self.global_queue_interval = Some(val);
1023 self
1024 }
1025
1026 /// Sets the number of scheduler ticks after which the scheduler will poll for
1027 /// external events (timers, I/O, and so on).
1028 ///
1029 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1030 ///
1031 /// By default, the event interval is `61` for all scheduler types.
1032 ///
1033 /// Setting the event interval determines the effective "priority" of delivering
1034 /// these external events (which may wake up additional tasks), compared to
1035 /// executing tasks that are currently ready to run. A smaller value is useful
1036 /// when tasks frequently spend a long time in polling, or frequently yield,
1037 /// which can result in overly long delays picking up I/O events. Conversely,
1038 /// picking up new events requires extra synchronization and syscall overhead,
1039 /// so if tasks generally complete their polling quickly, a higher event interval
1040 /// will minimize that overhead while still keeping the scheduler responsive to
1041 /// events.
1042 ///
1043 /// # Examples
1044 ///
1045 /// ```
1046 /// # use tokio::runtime;
1047 /// # pub fn main() {
1048 /// let rt = runtime::Builder::new_multi_thread()
1049 /// .event_interval(31)
1050 /// .build();
1051 /// # }
1052 /// ```
1053 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1054 self.event_interval = val;
1055 self
1056 }
1057
1058 cfg_unstable! {
1059 /// Configure how the runtime responds to an unhandled panic on a
1060 /// spawned task.
1061 ///
1062 /// By default, an unhandled panic (i.e. a panic not caught by
1063 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1064 /// execution. The panic's error value is forwarded to the task's
1065 /// [`JoinHandle`] and all other spawned tasks continue running.
1066 ///
1067 /// The `unhandled_panic` option enables configuring this behavior.
1068 ///
1069 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1070 /// spawned tasks have no impact on the runtime's execution.
1071 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1072 /// shutdown immediately when a spawned task panics even if that
1073 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1074 /// will immediately terminate and further calls to
1075 /// [`Runtime::block_on`] will panic.
1076 ///
1077 /// # Panics
1078 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1079 /// on a runtime other than the current thread runtime.
1080 ///
1081 /// # Unstable
1082 ///
1083 /// This option is currently unstable and its implementation is
1084 /// incomplete. The API may change or be removed in the future. See
1085 /// issue [tokio-rs/tokio#4516] for more details.
1086 ///
1087 /// # Examples
1088 ///
1089 /// The following demonstrates a runtime configured to shutdown on
1090 /// panic. The first spawned task panics and results in the runtime
1091 /// shutting down. The second spawned task never has a chance to
1092 /// execute. The call to `block_on` will panic due to the runtime being
1093 /// forcibly shutdown.
1094 ///
1095 /// ```should_panic
1096 /// use tokio::runtime::{self, UnhandledPanic};
1097 ///
1098 /// # pub fn main() {
1099 /// let rt = runtime::Builder::new_current_thread()
1100 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1101 /// .build()
1102 /// .unwrap();
1103 ///
1104 /// rt.spawn(async { panic!("boom"); });
1105 /// rt.spawn(async {
1106 /// // This task never completes.
1107 /// });
1108 ///
1109 /// rt.block_on(async {
1110 /// // Do some work
1111 /// # loop { tokio::task::yield_now().await; }
1112 /// })
1113 /// # }
1114 /// ```
1115 ///
1116 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1117 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1118 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1119 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1120 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1121 }
1122
1123 self.unhandled_panic = behavior;
1124 self
1125 }
1126
1127 /// Disables the LIFO task scheduler heuristic.
1128 ///
1129 /// The multi-threaded scheduler includes a heuristic for optimizing
1130 /// message-passing patterns. This heuristic results in the **last**
1131 /// scheduled task being polled first.
1132 ///
1133 /// To implement this heuristic, each worker thread has a slot which
1134 /// holds the task that should be polled next. However, this slot cannot
1135 /// be stolen by other worker threads, which can result in lower total
1136 /// throughput when tasks tend to have longer poll times.
1137 ///
1138 /// This configuration option will disable this heuristic resulting in
1139 /// all scheduled tasks being pushed into the worker-local queue, which
1140 /// is stealable.
1141 ///
1142 /// Consider trying this option when the task "scheduled" time is high
1143 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1144 /// collect this data.
1145 ///
1146 /// # Unstable
1147 ///
1148 /// This configuration option is considered a workaround for the LIFO
1149 /// slot not being stealable. When the slot becomes stealable, we will
1150 /// revisit whether or not this option is necessary. See
1151 /// issue [tokio-rs/tokio#4941].
1152 ///
1153 /// # Examples
1154 ///
1155 /// ```
1156 /// use tokio::runtime;
1157 ///
1158 /// let rt = runtime::Builder::new_multi_thread()
1159 /// .disable_lifo_slot()
1160 /// .build()
1161 /// .unwrap();
1162 /// ```
1163 ///
1164 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1165 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1166 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1167 self.disable_lifo_slot = true;
1168 self
1169 }
1170
1171 /// Specifies the random number generation seed to use within all
1172 /// threads associated with the runtime being built.
1173 ///
1174 /// This option is intended to make certain parts of the runtime
1175 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1176 /// [`tokio::select!`] it will ensure that the order that branches are
1177 /// polled is deterministic.
1178 ///
1179 /// In addition to the code specifying `rng_seed` and interacting with
1180 /// the runtime, the internals of Tokio and the Rust compiler may affect
1181 /// the sequences of random numbers. In order to ensure repeatable
1182 /// results, the version of Tokio, the versions of all other
1183 /// dependencies that interact with Tokio, and the Rust compiler version
1184 /// should also all remain constant.
1185 ///
1186 /// # Examples
1187 ///
1188 /// ```
1189 /// # use tokio::runtime::{self, RngSeed};
1190 /// # pub fn main() {
1191 /// let seed = RngSeed::from_bytes(b"place your seed here");
1192 /// let rt = runtime::Builder::new_current_thread()
1193 /// .rng_seed(seed)
1194 /// .build();
1195 /// # }
1196 /// ```
1197 ///
1198 /// [`tokio::select!`]: crate::select
1199 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1200 self.seed_generator = RngSeedGenerator::new(seed);
1201 self
1202 }
1203 }
1204
1205 cfg_unstable_metrics! {
1206 /// Enables tracking the distribution of task poll times.
1207 ///
1208 /// Task poll times are not instrumented by default as doing so requires
1209 /// calling [`Instant::now()`] twice per task poll, which could add
1210 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1211 /// metrics data.
1212 ///
1213 /// The histogram uses fixed bucket sizes. In other words, the histogram
1214 /// buckets are not dynamic based on input values. Use the
1215 /// `metrics_poll_time_histogram` builder methods to configure the
1216 /// histogram details.
1217 ///
1218 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1219 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1220 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1221 /// to select [`LogHistogram`] instead.
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```
1226 /// use tokio::runtime;
1227 ///
1228 /// let rt = runtime::Builder::new_multi_thread()
1229 /// .enable_metrics_poll_time_histogram()
1230 /// .build()
1231 /// .unwrap();
1232 /// # // Test default values here
1233 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1234 /// # let m = rt.handle().metrics();
1235 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1236 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1237 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1238 /// ```
1239 ///
1240 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1241 /// [`Instant::now()`]: std::time::Instant::now
1242 /// [`LogHistogram`]: crate::runtime::LogHistogram
1243 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1244 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1245 self.metrics_poll_count_histogram_enable = true;
1246 self
1247 }
1248
1249 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1250 ///
1251 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1252 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1253 #[doc(hidden)]
1254 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1255 self.enable_metrics_poll_time_histogram()
1256 }
1257
1258 /// Sets the histogram scale for tracking the distribution of task poll
1259 /// times.
1260 ///
1261 /// Tracking the distribution of task poll times can be done using a
1262 /// linear or log scale. When using linear scale, each histogram bucket
1263 /// will represent the same range of poll times. When using log scale,
1264 /// each histogram bucket will cover a range twice as big as the
1265 /// previous bucket.
1266 ///
1267 /// **Default:** linear scale.
1268 ///
1269 /// # Examples
1270 ///
1271 /// ```
1272 /// use tokio::runtime::{self, HistogramScale};
1273 ///
1274 /// # #[allow(deprecated)]
1275 /// let rt = runtime::Builder::new_multi_thread()
1276 /// .enable_metrics_poll_time_histogram()
1277 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1278 /// .build()
1279 /// .unwrap();
1280 /// ```
1281 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1282 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1283 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1284 self
1285 }
1286
1287 /// Configure the histogram for tracking poll times
1288 ///
1289 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1290 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1291 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1292 ///
1293 /// # Examples
1294 /// Configure a [`LogHistogram`] with [default configuration]:
1295 /// ```
1296 /// use tokio::runtime;
1297 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1298 ///
1299 /// let rt = runtime::Builder::new_multi_thread()
1300 /// .enable_metrics_poll_time_histogram()
1301 /// .metrics_poll_time_histogram_configuration(
1302 /// HistogramConfiguration::log(LogHistogram::default())
1303 /// )
1304 /// .build()
1305 /// .unwrap();
1306 /// ```
1307 ///
1308 /// Configure a linear histogram with 100 buckets, each 10μs wide
1309 /// ```
1310 /// use tokio::runtime;
1311 /// use std::time::Duration;
1312 /// use tokio::runtime::HistogramConfiguration;
1313 ///
1314 /// let rt = runtime::Builder::new_multi_thread()
1315 /// .enable_metrics_poll_time_histogram()
1316 /// .metrics_poll_time_histogram_configuration(
1317 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1318 /// )
1319 /// .build()
1320 /// .unwrap();
1321 /// ```
1322 ///
1323 /// Configure a [`LogHistogram`] with the following settings:
1324 /// - Measure times from 100ns to 120s
1325 /// - Max error of 0.1
1326 /// - No more than 1024 buckets
1327 /// ```
1328 /// use std::time::Duration;
1329 /// use tokio::runtime;
1330 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1331 ///
1332 /// let rt = runtime::Builder::new_multi_thread()
1333 /// .enable_metrics_poll_time_histogram()
1334 /// .metrics_poll_time_histogram_configuration(
1335 /// HistogramConfiguration::log(LogHistogram::builder()
1336 /// .max_value(Duration::from_secs(120))
1337 /// .min_value(Duration::from_nanos(100))
1338 /// .max_error(0.1)
1339 /// .max_buckets(1024)
1340 /// .expect("configuration uses 488 buckets")
1341 /// )
1342 /// )
1343 /// .build()
1344 /// .unwrap();
1345 /// ```
1346 ///
1347 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1348 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1349 /// where each bucket is twice the size of the previous bucket.
1350 /// ```rust
1351 /// use std::time::Duration;
1352 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1353 /// let rt = tokio::runtime::Builder::new_current_thread()
1354 /// .enable_all()
1355 /// .enable_metrics_poll_time_histogram()
1356 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1357 /// LogHistogram::builder()
1358 /// .min_value(Duration::from_micros(20))
1359 /// .max_value(Duration::from_millis(4))
1360 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1361 /// .precision_exact(0)
1362 /// .max_buckets(10)
1363 /// .unwrap(),
1364 /// ))
1365 /// .build()
1366 /// .unwrap();
1367 /// ```
1368 ///
1369 /// [`LogHistogram`]: crate::runtime::LogHistogram
1370 /// [default configuration]: crate::runtime::LogHistogramBuilder
1371 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1372 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1373 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1374 self
1375 }
1376
1377 /// Sets the histogram resolution for tracking the distribution of task
1378 /// poll times.
1379 ///
1380 /// The resolution is the histogram's first bucket's range. When using a
1381 /// linear histogram scale, each bucket will cover the same range. When
1382 /// using a log scale, each bucket will cover a range twice as big as
1383 /// the previous bucket. In the log case, the resolution represents the
1384 /// smallest bucket range.
1385 ///
1386 /// Note that, when using log scale, the resolution is rounded up to the
1387 /// nearest power of 2 in nanoseconds.
1388 ///
1389 /// **Default:** 100 microseconds.
1390 ///
1391 /// # Examples
1392 ///
1393 /// ```
1394 /// use tokio::runtime;
1395 /// use std::time::Duration;
1396 ///
1397 /// # #[allow(deprecated)]
1398 /// let rt = runtime::Builder::new_multi_thread()
1399 /// .enable_metrics_poll_time_histogram()
1400 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1401 /// .build()
1402 /// .unwrap();
1403 /// ```
1404 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1405 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1406 assert!(resolution > Duration::from_secs(0));
1407 // Sanity check the argument and also make the cast below safe.
1408 assert!(resolution <= Duration::from_secs(1));
1409
1410 let resolution = resolution.as_nanos() as u64;
1411
1412 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1413 self
1414 }
1415
1416 /// Sets the number of buckets for the histogram tracking the
1417 /// distribution of task poll times.
1418 ///
1419 /// The last bucket tracks all greater values that fall out of other
1420 /// ranges. So, configuring the histogram using a linear scale,
1421 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1422 /// polls that take more than 450ms to complete.
1423 ///
1424 /// **Default:** 10
1425 ///
1426 /// # Examples
1427 ///
1428 /// ```
1429 /// use tokio::runtime;
1430 ///
1431 /// # #[allow(deprecated)]
1432 /// let rt = runtime::Builder::new_multi_thread()
1433 /// .enable_metrics_poll_time_histogram()
1434 /// .metrics_poll_count_histogram_buckets(15)
1435 /// .build()
1436 /// .unwrap();
1437 /// ```
1438 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1439 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1440 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1441 self
1442 }
1443 }
1444
1445 cfg_loom! {
1446 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1447 assert!(value.is_power_of_two());
1448 self.local_queue_capacity = value;
1449 self
1450 }
1451 }
1452
1453 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1454 use crate::runtime::runtime::Scheduler;
1455
1456 let (scheduler, handle, blocking_pool) =
1457 self.build_current_thread_runtime_components(None)?;
1458
1459 Ok(Runtime::from_parts(
1460 Scheduler::CurrentThread(scheduler),
1461 handle,
1462 blocking_pool,
1463 ))
1464 }
1465
1466 #[cfg(tokio_unstable)]
1467 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1468 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1469
1470 let tid = std::thread::current().id();
1471
1472 let (scheduler, handle, blocking_pool) =
1473 self.build_current_thread_runtime_components(Some(tid))?;
1474
1475 Ok(LocalRuntime::from_parts(
1476 LocalRuntimeScheduler::CurrentThread(scheduler),
1477 handle,
1478 blocking_pool,
1479 ))
1480 }
1481
1482 fn build_current_thread_runtime_components(
1483 &mut self,
1484 local_tid: Option<ThreadId>,
1485 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1486 use crate::runtime::scheduler;
1487 use crate::runtime::Config;
1488
1489 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;
1490
1491 // Blocking pool
1492 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1493 let blocking_spawner = blocking_pool.spawner().clone();
1494
1495 // Generate a rng seed for this runtime.
1496 let seed_generator_1 = self.seed_generator.next_generator();
1497 let seed_generator_2 = self.seed_generator.next_generator();
1498
1499 // And now put a single-threaded scheduler on top of the timer. When
1500 // there are no futures ready to do something, it'll let the timer or
1501 // the reactor to generate some new stimuli for the futures to continue
1502 // in their life.
1503 let (scheduler, handle) = CurrentThread::new(
1504 driver,
1505 driver_handle,
1506 blocking_spawner,
1507 seed_generator_2,
1508 Config {
1509 before_park: self.before_park.clone(),
1510 after_unpark: self.after_unpark.clone(),
1511 before_spawn: self.before_spawn.clone(),
1512 #[cfg(tokio_unstable)]
1513 before_poll: self.before_poll.clone(),
1514 #[cfg(tokio_unstable)]
1515 after_poll: self.after_poll.clone(),
1516 after_termination: self.after_termination.clone(),
1517 global_queue_interval: self.global_queue_interval,
1518 event_interval: self.event_interval,
1519 local_queue_capacity: self.local_queue_capacity,
1520 #[cfg(tokio_unstable)]
1521 unhandled_panic: self.unhandled_panic.clone(),
1522 disable_lifo_slot: self.disable_lifo_slot,
1523 seed_generator: seed_generator_1,
1524 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1525 },
1526 local_tid,
1527 );
1528
1529 let handle = Handle {
1530 inner: scheduler::Handle::CurrentThread(handle),
1531 };
1532
1533 Ok((scheduler, handle, blocking_pool))
1534 }
1535
1536 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1537 if self.metrics_poll_count_histogram_enable {
1538 Some(self.metrics_poll_count_histogram.clone())
1539 } else {
1540 None
1541 }
1542 }
1543}
1544
1545cfg_io_driver! {
1546 impl Builder {
1547 /// Enables the I/O driver.
1548 ///
1549 /// Doing this enables using net, process, signal, and some I/O types on
1550 /// the runtime.
1551 ///
1552 /// # Examples
1553 ///
1554 /// ```
1555 /// use tokio::runtime;
1556 ///
1557 /// let rt = runtime::Builder::new_multi_thread()
1558 /// .enable_io()
1559 /// .build()
1560 /// .unwrap();
1561 /// ```
1562 pub fn enable_io(&mut self) -> &mut Self {
1563 self.enable_io = true;
1564 self
1565 }
1566
1567 /// Enables the I/O driver and configures the max number of events to be
1568 /// processed per tick.
1569 ///
1570 /// # Examples
1571 ///
1572 /// ```
1573 /// use tokio::runtime;
1574 ///
1575 /// let rt = runtime::Builder::new_current_thread()
1576 /// .enable_io()
1577 /// .max_io_events_per_tick(1024)
1578 /// .build()
1579 /// .unwrap();
1580 /// ```
1581 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1582 self.nevents = capacity;
1583 self
1584 }
1585 }
1586}
1587
1588cfg_time! {
1589 impl Builder {
1590 /// Enables the time driver.
1591 ///
1592 /// Doing this enables using `tokio::time` on the runtime.
1593 ///
1594 /// # Examples
1595 ///
1596 /// ```
1597 /// use tokio::runtime;
1598 ///
1599 /// let rt = runtime::Builder::new_multi_thread()
1600 /// .enable_time()
1601 /// .build()
1602 /// .unwrap();
1603 /// ```
1604 pub fn enable_time(&mut self) -> &mut Self {
1605 self.enable_time = true;
1606 self
1607 }
1608 }
1609}
1610
1611cfg_test_util! {
1612 impl Builder {
1613 /// Controls if the runtime's clock starts paused or advancing.
1614 ///
1615 /// Pausing time requires the current-thread runtime; construction of
1616 /// the runtime will panic otherwise.
1617 ///
1618 /// # Examples
1619 ///
1620 /// ```
1621 /// use tokio::runtime;
1622 ///
1623 /// let rt = runtime::Builder::new_current_thread()
1624 /// .enable_time()
1625 /// .start_paused(true)
1626 /// .build()
1627 /// .unwrap();
1628 /// ```
1629 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1630 self.start_paused = start_paused;
1631 self
1632 }
1633 }
1634}
1635
1636cfg_rt_multi_thread! {
1637 impl Builder {
1638 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1639 use crate::loom::sys::num_cpus;
1640 use crate::runtime::{Config, runtime::Scheduler};
1641 use crate::runtime::scheduler::{self, MultiThread};
1642
1643 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1644
1645 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
1646
1647 // Create the blocking pool
1648 let blocking_pool =
1649 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1650 let blocking_spawner = blocking_pool.spawner().clone();
1651
1652 // Generate a rng seed for this runtime.
1653 let seed_generator_1 = self.seed_generator.next_generator();
1654 let seed_generator_2 = self.seed_generator.next_generator();
1655
1656 let (scheduler, handle, launch) = MultiThread::new(
1657 worker_threads,
1658 driver,
1659 driver_handle,
1660 blocking_spawner,
1661 seed_generator_2,
1662 Config {
1663 before_park: self.before_park.clone(),
1664 after_unpark: self.after_unpark.clone(),
1665 before_spawn: self.before_spawn.clone(),
1666 #[cfg(tokio_unstable)]
1667 before_poll: self.before_poll.clone(),
1668 #[cfg(tokio_unstable)]
1669 after_poll: self.after_poll.clone(),
1670 after_termination: self.after_termination.clone(),
1671 global_queue_interval: self.global_queue_interval,
1672 event_interval: self.event_interval,
1673 local_queue_capacity: self.local_queue_capacity,
1674 #[cfg(tokio_unstable)]
1675 unhandled_panic: self.unhandled_panic.clone(),
1676 disable_lifo_slot: self.disable_lifo_slot,
1677 seed_generator: seed_generator_1,
1678 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1679 },
1680 );
1681
1682 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1683
1684 // Spawn the thread pool workers
1685 let _enter = handle.enter();
1686 launch.launch();
1687
1688 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1689 }
1690
1691 cfg_unstable! {
1692 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1693 use crate::loom::sys::num_cpus;
1694 use crate::runtime::{Config, runtime::Scheduler};
1695 use crate::runtime::scheduler::MultiThreadAlt;
1696
1697 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1698 let (driver, driver_handle) = driver::Driver::new(self.get_cfg(worker_threads))?;
1699
1700 // Create the blocking pool
1701 let blocking_pool =
1702 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1703 let blocking_spawner = blocking_pool.spawner().clone();
1704
1705 // Generate a rng seed for this runtime.
1706 let seed_generator_1 = self.seed_generator.next_generator();
1707 let seed_generator_2 = self.seed_generator.next_generator();
1708
1709 let (scheduler, handle) = MultiThreadAlt::new(
1710 worker_threads,
1711 driver,
1712 driver_handle,
1713 blocking_spawner,
1714 seed_generator_2,
1715 Config {
1716 before_park: self.before_park.clone(),
1717 after_unpark: self.after_unpark.clone(),
1718 before_spawn: self.before_spawn.clone(),
1719 after_termination: self.after_termination.clone(),
1720 #[cfg(tokio_unstable)]
1721 before_poll: self.before_poll.clone(),
1722 #[cfg(tokio_unstable)]
1723 after_poll: self.after_poll.clone(),
1724 global_queue_interval: self.global_queue_interval,
1725 event_interval: self.event_interval,
1726 local_queue_capacity: self.local_queue_capacity,
1727 #[cfg(tokio_unstable)]
1728 unhandled_panic: self.unhandled_panic.clone(),
1729 disable_lifo_slot: self.disable_lifo_slot,
1730 seed_generator: seed_generator_1,
1731 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1732 },
1733 );
1734
1735 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1736 }
1737 }
1738 }
1739}
1740
1741impl fmt::Debug for Builder {
1742 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1743 fmt.debug_struct("Builder")
1744 .field("worker_threads", &self.worker_threads)
1745 .field("max_blocking_threads", &self.max_blocking_threads)
1746 .field(
1747 "thread_name",
1748 &"<dyn Fn() -> String + Send + Sync + 'static>",
1749 )
1750 .field("thread_stack_size", &self.thread_stack_size)
1751 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1752 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1753 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1754 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1755 .finish()
1756 }
1757}