tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41///     // build runtime
42///     let runtime = Builder::new_multi_thread()
43///         .worker_threads(4)
44///         .thread_name("my-custom-name")
45///         .thread_stack_size(3 * 1024 * 1024)
46///         .build()
47///         .unwrap();
48///
49///     // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54    /// Runtime type
55    kind: Kind,
56
57    /// Whether or not to enable the I/O driver
58    enable_io: bool,
59    nevents: usize,
60
61    /// Whether or not to enable the time driver
62    enable_time: bool,
63
64    /// Whether or not the clock should start paused.
65    start_paused: bool,
66
67    /// The number of worker threads, used by Runtime.
68    ///
69    /// Only used when not using the current-thread executor.
70    worker_threads: Option<usize>,
71
72    /// Cap on thread usage.
73    max_blocking_threads: usize,
74
75    /// Name fn used for threads spawned by the runtime.
76    pub(super) thread_name: ThreadNameFn,
77
78    /// Stack size used for threads spawned by the runtime.
79    pub(super) thread_stack_size: Option<usize>,
80
81    /// Callback to run after each thread starts.
82    pub(super) after_start: Option<Callback>,
83
84    /// To run before each worker thread stops
85    pub(super) before_stop: Option<Callback>,
86
87    /// To run before each worker thread is parked.
88    pub(super) before_park: Option<Callback>,
89
90    /// To run after each thread is unparked.
91    pub(super) after_unpark: Option<Callback>,
92
93    /// To run before each task is spawned.
94    pub(super) before_spawn: Option<TaskCallback>,
95
96    /// To run before each poll
97    #[cfg(tokio_unstable)]
98    pub(super) before_poll: Option<TaskCallback>,
99
100    /// To run after each poll
101    #[cfg(tokio_unstable)]
102    pub(super) after_poll: Option<TaskCallback>,
103
104    /// To run after each task is terminated.
105    pub(super) after_termination: Option<TaskCallback>,
106
107    /// Customizable keep alive timeout for `BlockingPool`
108    pub(super) keep_alive: Option<Duration>,
109
110    /// How many ticks before pulling a task from the global/remote queue?
111    ///
112    /// When `None`, the value is unspecified and behavior details are left to
113    /// the scheduler. Each scheduler flavor could choose to either pick its own
114    /// default value or use some other strategy to decide when to poll from the
115    /// global queue. For example, the multi-threaded scheduler uses a
116    /// self-tuning strategy based on mean task poll times.
117    pub(super) global_queue_interval: Option<u32>,
118
119    /// How many ticks before yielding to the driver for timer and I/O events?
120    pub(super) event_interval: u32,
121
122    /// When true, the multi-threade scheduler LIFO slot should not be used.
123    ///
124    /// This option should only be exposed as unstable.
125    pub(super) disable_lifo_slot: bool,
126
127    /// Specify a random number generator seed to provide deterministic results
128    pub(super) seed_generator: RngSeedGenerator,
129
130    /// When true, enables task poll count histogram instrumentation.
131    pub(super) metrics_poll_count_histogram_enable: bool,
132
133    /// Configures the task poll count histogram
134    pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136    #[cfg(tokio_unstable)]
137    pub(super) unhandled_panic: UnhandledPanic,
138
139    timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143    /// How the runtime should respond to unhandled panics.
144    ///
145    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146    /// to configure the runtime behavior when a spawned task panics.
147    ///
148    /// See [`Builder::unhandled_panic`] for more details.
149    #[derive(Debug, Clone)]
150    #[non_exhaustive]
151    pub enum UnhandledPanic {
152        /// The runtime should ignore panics on spawned tasks.
153        ///
154        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155        /// tasks continue running normally.
156        ///
157        /// This is the default behavior.
158        ///
159        /// # Examples
160        ///
161        /// ```
162        /// # #[cfg(not(target_family = "wasm"))]
163        /// # {
164        /// use tokio::runtime::{self, UnhandledPanic};
165        ///
166        /// # pub fn main() {
167        /// let rt = runtime::Builder::new_current_thread()
168        ///     .unhandled_panic(UnhandledPanic::Ignore)
169        ///     .build()
170        ///     .unwrap();
171        ///
172        /// let task1 = rt.spawn(async { panic!("boom"); });
173        /// let task2 = rt.spawn(async {
174        ///     // This task completes normally
175        ///     "done"
176        /// });
177        ///
178        /// rt.block_on(async {
179        ///     // The panic on the first task is forwarded to the `JoinHandle`
180        ///     assert!(task1.await.is_err());
181        ///
182        ///     // The second task completes normally
183        ///     assert!(task2.await.is_ok());
184        /// })
185        /// # }
186        /// # }
187        /// ```
188        ///
189        /// [`JoinHandle`]: struct@crate::task::JoinHandle
190        Ignore,
191
192        /// The runtime should immediately shutdown if a spawned task panics.
193        ///
194        /// The runtime will immediately shutdown even if the panicked task's
195        /// [`JoinHandle`] is still available. All further spawned tasks will be
196        /// immediately dropped and call to [`Runtime::block_on`] will panic.
197        ///
198        /// # Examples
199        ///
200        /// ```should_panic
201        /// use tokio::runtime::{self, UnhandledPanic};
202        ///
203        /// # pub fn main() {
204        /// let rt = runtime::Builder::new_current_thread()
205        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206        ///     .build()
207        ///     .unwrap();
208        ///
209        /// rt.spawn(async { panic!("boom"); });
210        /// rt.spawn(async {
211        ///     // This task never completes.
212        /// });
213        ///
214        /// rt.block_on(async {
215        ///     // Do some work
216        /// # loop { tokio::task::yield_now().await; }
217        /// })
218        /// # }
219        /// ```
220        ///
221        /// [`JoinHandle`]: struct@crate::task::JoinHandle
222        ShutdownRuntime,
223    }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230    CurrentThread,
231    #[cfg(feature = "rt-multi-thread")]
232    MultiThread,
233}
234
235impl Builder {
236    /// Returns a new builder with the current thread scheduler selected.
237    ///
238    /// Configuration methods can be chained on the return value.
239    ///
240    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241    /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable).
242    ///
243    /// [`LocalSet`]: crate::task::LocalSet
244    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
245    /// [`build_local`]: crate::runtime::Builder::build_local
246    pub fn new_current_thread() -> Builder {
247        #[cfg(loom)]
248        const EVENT_INTERVAL: u32 = 4;
249        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
250        #[cfg(not(loom))]
251        const EVENT_INTERVAL: u32 = 61;
252
253        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
254    }
255
256    /// Returns a new builder with the multi thread scheduler selected.
257    ///
258    /// Configuration methods can be chained on the return value.
259    #[cfg(feature = "rt-multi-thread")]
260    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
261    pub fn new_multi_thread() -> Builder {
262        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
263        Builder::new(Kind::MultiThread, 61)
264    }
265
266    /// Returns a new runtime builder initialized with default configuration
267    /// values.
268    ///
269    /// Configuration methods can be chained on the return value.
270    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
271        Builder {
272            kind,
273
274            // I/O defaults to "off"
275            enable_io: false,
276            nevents: 1024,
277
278            // Time defaults to "off"
279            enable_time: false,
280
281            // The clock starts not-paused
282            start_paused: false,
283
284            // Read from environment variable first in multi-threaded mode.
285            // Default to lazy auto-detection (one thread per CPU core)
286            worker_threads: None,
287
288            max_blocking_threads: 512,
289
290            // Default thread name
291            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
292
293            // Do not set a stack size by default
294            thread_stack_size: None,
295
296            // No worker thread callbacks
297            after_start: None,
298            before_stop: None,
299            before_park: None,
300            after_unpark: None,
301
302            before_spawn: None,
303            after_termination: None,
304
305            #[cfg(tokio_unstable)]
306            before_poll: None,
307            #[cfg(tokio_unstable)]
308            after_poll: None,
309
310            keep_alive: None,
311
312            // Defaults for these values depend on the scheduler kind, so we get them
313            // as parameters.
314            global_queue_interval: None,
315            event_interval,
316
317            seed_generator: RngSeedGenerator::new(RngSeed::new()),
318
319            #[cfg(tokio_unstable)]
320            unhandled_panic: UnhandledPanic::Ignore,
321
322            metrics_poll_count_histogram_enable: false,
323
324            metrics_poll_count_histogram: HistogramBuilder::default(),
325
326            disable_lifo_slot: false,
327
328            timer_flavor: TimerFlavor::Traditional,
329        }
330    }
331
332    /// Enables both I/O and time drivers.
333    ///
334    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
335    /// individually. If additional components are added to Tokio in the future,
336    /// `enable_all` will include these future components.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// # #[cfg(not(target_family = "wasm"))]
342    /// # {
343    /// use tokio::runtime;
344    ///
345    /// let rt = runtime::Builder::new_multi_thread()
346    ///     .enable_all()
347    ///     .build()
348    ///     .unwrap();
349    /// # }
350    /// ```
351    pub fn enable_all(&mut self) -> &mut Self {
352        #[cfg(any(
353            feature = "net",
354            all(unix, feature = "process"),
355            all(unix, feature = "signal")
356        ))]
357        self.enable_io();
358
359        #[cfg(all(
360            tokio_unstable,
361            feature = "io-uring",
362            feature = "rt",
363            feature = "fs",
364            target_os = "linux",
365        ))]
366        self.enable_io_uring();
367
368        #[cfg(feature = "time")]
369        self.enable_time();
370
371        self
372    }
373
374    /// Enables the alternative timer implementation, which is disabled by default.
375    ///
376    /// The alternative timer implementation is an unstable feature that may
377    /// provide better performance on multi-threaded runtimes with a large number
378    /// of worker threads.
379    ///
380    /// This option only applies to multi-threaded runtimes. Attempting to use
381    /// this option with any other runtime type will have no effect.
382    ///
383    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// # #[cfg(not(target_family = "wasm"))]
389    /// # {
390    /// use tokio::runtime;
391    ///
392    /// let rt = runtime::Builder::new_multi_thread()
393    ///   .enable_alt_timer()
394    ///   .build()
395    ///   .unwrap();
396    /// # }
397    /// ```
398    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
399    #[cfg_attr(
400        docsrs,
401        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
402    )]
403    pub fn enable_alt_timer(&mut self) -> &mut Self {
404        self.enable_time();
405        self.timer_flavor = TimerFlavor::Alternative;
406        self
407    }
408
409    /// Sets the number of worker threads the `Runtime` will use.
410    ///
411    /// This can be any number above 0 though it is advised to keep this value
412    /// on the smaller side.
413    ///
414    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
415    ///
416    /// # Default
417    ///
418    /// The default value is the number of cores available to the system.
419    ///
420    /// When using the `current_thread` runtime this method has no effect.
421    ///
422    /// # Examples
423    ///
424    /// ## Multi threaded runtime with 4 threads
425    ///
426    /// ```
427    /// # #[cfg(not(target_family = "wasm"))]
428    /// # {
429    /// use tokio::runtime;
430    ///
431    /// // This will spawn a work-stealing runtime with 4 worker threads.
432    /// let rt = runtime::Builder::new_multi_thread()
433    ///     .worker_threads(4)
434    ///     .build()
435    ///     .unwrap();
436    ///
437    /// rt.spawn(async move {});
438    /// # }
439    /// ```
440    ///
441    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
442    ///
443    /// ```
444    /// use tokio::runtime;
445    ///
446    /// // Create a runtime that _must_ be driven from a call
447    /// // to `Runtime::block_on`.
448    /// let rt = runtime::Builder::new_current_thread()
449    ///     .build()
450    ///     .unwrap();
451    ///
452    /// // This will run the runtime and future on the current thread
453    /// rt.block_on(async move {});
454    /// ```
455    ///
456    /// # Panics
457    ///
458    /// This will panic if `val` is not larger than `0`.
459    #[track_caller]
460    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
461        assert!(val > 0, "Worker threads cannot be set to 0");
462        self.worker_threads = Some(val);
463        self
464    }
465
466    /// Specifies the limit for additional threads spawned by the Runtime.
467    ///
468    /// These threads are used for blocking operations like tasks spawned
469    /// through [`spawn_blocking`], this includes but is not limited to:
470    /// - [`fs`] operations
471    /// - dns resolution through [`ToSocketAddrs`]
472    /// - writing to [`Stdout`] or [`Stderr`]
473    /// - reading from [`Stdin`]
474    ///
475    /// Unlike the [`worker_threads`], they are not always active and will exit
476    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
477    ///
478    /// It's recommended to not set this limit too low in order to avoid hanging on operations
479    /// requiring [`spawn_blocking`].
480    ///
481    /// The default value is 512.
482    ///
483    /// # Queue Behavior
484    ///
485    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
486    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
487    /// method has not been reached, a new thread will be spawned. If no idle thread is available
488    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
489    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
490    /// it could potentially grow unbounded.
491    ///
492    /// # Panics
493    ///
494    /// This will panic if `val` is not larger than `0`.
495    ///
496    /// # Upgrading from 0.x
497    ///
498    /// In old versions `max_threads` limited both blocking and worker threads, but the
499    /// current `max_blocking_threads` does not include async worker threads in the count.
500    ///
501    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
502    /// [`fs`]: mod@crate::fs
503    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
504    /// [`Stdout`]: struct@crate::io::Stdout
505    /// [`Stdin`]: struct@crate::io::Stdin
506    /// [`Stderr`]: struct@crate::io::Stderr
507    /// [`worker_threads`]: Self::worker_threads
508    /// [`thread_keep_alive`]: Self::thread_keep_alive
509    #[track_caller]
510    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
511    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
512        assert!(val > 0, "Max blocking threads cannot be set to 0");
513        self.max_blocking_threads = val;
514        self
515    }
516
517    /// Sets name of threads spawned by the `Runtime`'s thread pool.
518    ///
519    /// The default name is "tokio-runtime-worker".
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// # #[cfg(not(target_family = "wasm"))]
525    /// # {
526    /// # use tokio::runtime;
527    ///
528    /// # pub fn main() {
529    /// let rt = runtime::Builder::new_multi_thread()
530    ///     .thread_name("my-pool")
531    ///     .build();
532    /// # }
533    /// # }
534    /// ```
535    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
536        let val = val.into();
537        self.thread_name = std::sync::Arc::new(move || val.clone());
538        self
539    }
540
541    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
542    ///
543    /// The default name fn is `|| "tokio-runtime-worker".into()`.
544    ///
545    /// # Examples
546    ///
547    /// ```
548    /// # #[cfg(not(target_family = "wasm"))]
549    /// # {
550    /// # use tokio::runtime;
551    /// # use std::sync::atomic::{AtomicUsize, Ordering};
552    /// # pub fn main() {
553    /// let rt = runtime::Builder::new_multi_thread()
554    ///     .thread_name_fn(|| {
555    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
556    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
557    ///        format!("my-pool-{}", id)
558    ///     })
559    ///     .build();
560    /// # }
561    /// # }
562    /// ```
563    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
564    where
565        F: Fn() -> String + Send + Sync + 'static,
566    {
567        self.thread_name = std::sync::Arc::new(f);
568        self
569    }
570
571    /// Sets the stack size (in bytes) for worker threads.
572    ///
573    /// The actual stack size may be greater than this value if the platform
574    /// specifies minimal stack size.
575    ///
576    /// The default stack size for spawned threads is 2 MiB, though this
577    /// particular stack size is subject to change in the future.
578    ///
579    /// # Examples
580    ///
581    /// ```
582    /// # #[cfg(not(target_family = "wasm"))]
583    /// # {
584    /// # use tokio::runtime;
585    ///
586    /// # pub fn main() {
587    /// let rt = runtime::Builder::new_multi_thread()
588    ///     .thread_stack_size(32 * 1024)
589    ///     .build();
590    /// # }
591    /// # }
592    /// ```
593    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
594        self.thread_stack_size = Some(val);
595        self
596    }
597
598    /// Executes function `f` after each thread is started but before it starts
599    /// doing work.
600    ///
601    /// This is intended for bookkeeping and monitoring use cases.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// # #[cfg(not(target_family = "wasm"))]
607    /// # {
608    /// # use tokio::runtime;
609    /// # pub fn main() {
610    /// let runtime = runtime::Builder::new_multi_thread()
611    ///     .on_thread_start(|| {
612    ///         println!("thread started");
613    ///     })
614    ///     .build();
615    /// # }
616    /// # }
617    /// ```
618    #[cfg(not(loom))]
619    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
620    where
621        F: Fn() + Send + Sync + 'static,
622    {
623        self.after_start = Some(std::sync::Arc::new(f));
624        self
625    }
626
627    /// Executes function `f` before each thread stops.
628    ///
629    /// This is intended for bookkeeping and monitoring use cases.
630    ///
631    /// # Examples
632    ///
633    /// ```
634    /// # #[cfg(not(target_family = "wasm"))]
635    /// {
636    /// # use tokio::runtime;
637    /// # pub fn main() {
638    /// let runtime = runtime::Builder::new_multi_thread()
639    ///     .on_thread_stop(|| {
640    ///         println!("thread stopping");
641    ///     })
642    ///     .build();
643    /// # }
644    /// # }
645    /// ```
646    #[cfg(not(loom))]
647    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
648    where
649        F: Fn() + Send + Sync + 'static,
650    {
651        self.before_stop = Some(std::sync::Arc::new(f));
652        self
653    }
654
655    /// Executes function `f` just before a thread is parked (goes idle).
656    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
657    /// can be called, and may result in this thread being unparked immediately.
658    ///
659    /// This can be used to start work only when the executor is idle, or for bookkeeping
660    /// and monitoring purposes.
661    ///
662    /// Note: There can only be one park callback for a runtime; calling this function
663    /// more than once replaces the last callback defined, rather than adding to it.
664    ///
665    /// # Examples
666    ///
667    /// ## Multithreaded executor
668    /// ```
669    /// # #[cfg(not(target_family = "wasm"))]
670    /// # {
671    /// # use std::sync::Arc;
672    /// # use std::sync::atomic::{AtomicBool, Ordering};
673    /// # use tokio::runtime;
674    /// # use tokio::sync::Barrier;
675    /// # pub fn main() {
676    /// let once = AtomicBool::new(true);
677    /// let barrier = Arc::new(Barrier::new(2));
678    ///
679    /// let runtime = runtime::Builder::new_multi_thread()
680    ///     .worker_threads(1)
681    ///     .on_thread_park({
682    ///         let barrier = barrier.clone();
683    ///         move || {
684    ///             let barrier = barrier.clone();
685    ///             if once.swap(false, Ordering::Relaxed) {
686    ///                 tokio::spawn(async move { barrier.wait().await; });
687    ///            }
688    ///         }
689    ///     })
690    ///     .build()
691    ///     .unwrap();
692    ///
693    /// runtime.block_on(async {
694    ///    barrier.wait().await;
695    /// })
696    /// # }
697    /// # }
698    /// ```
699    /// ## Current thread executor
700    /// ```
701    /// # use std::sync::Arc;
702    /// # use std::sync::atomic::{AtomicBool, Ordering};
703    /// # use tokio::runtime;
704    /// # use tokio::sync::Barrier;
705    /// # pub fn main() {
706    /// let once = AtomicBool::new(true);
707    /// let barrier = Arc::new(Barrier::new(2));
708    ///
709    /// let runtime = runtime::Builder::new_current_thread()
710    ///     .on_thread_park({
711    ///         let barrier = barrier.clone();
712    ///         move || {
713    ///             let barrier = barrier.clone();
714    ///             if once.swap(false, Ordering::Relaxed) {
715    ///                 tokio::spawn(async move { barrier.wait().await; });
716    ///            }
717    ///         }
718    ///     })
719    ///     .build()
720    ///     .unwrap();
721    ///
722    /// runtime.block_on(async {
723    ///    barrier.wait().await;
724    /// })
725    /// # }
726    /// ```
727    #[cfg(not(loom))]
728    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
729    where
730        F: Fn() + Send + Sync + 'static,
731    {
732        self.before_park = Some(std::sync::Arc::new(f));
733        self
734    }
735
736    /// Executes function `f` just after a thread unparks (starts executing tasks).
737    ///
738    /// This is intended for bookkeeping and monitoring use cases; note that work
739    /// in this callback will increase latencies when the application has allowed one or
740    /// more runtime threads to go idle.
741    ///
742    /// Note: There can only be one unpark callback for a runtime; calling this function
743    /// more than once replaces the last callback defined, rather than adding to it.
744    ///
745    /// # Examples
746    ///
747    /// ```
748    /// # #[cfg(not(target_family = "wasm"))]
749    /// # {
750    /// # use tokio::runtime;
751    /// # pub fn main() {
752    /// let runtime = runtime::Builder::new_multi_thread()
753    ///     .on_thread_unpark(|| {
754    ///         println!("thread unparking");
755    ///     })
756    ///     .build();
757    ///
758    /// runtime.unwrap().block_on(async {
759    ///    tokio::task::yield_now().await;
760    ///    println!("Hello from Tokio!");
761    /// })
762    /// # }
763    /// # }
764    /// ```
765    #[cfg(not(loom))]
766    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
767    where
768        F: Fn() + Send + Sync + 'static,
769    {
770        self.after_unpark = Some(std::sync::Arc::new(f));
771        self
772    }
773
774    /// Executes function `f` just before a task is spawned.
775    ///
776    /// `f` is called within the Tokio context, so functions like
777    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
778    /// invoked immediately.
779    ///
780    /// This can be used for bookkeeping or monitoring purposes.
781    ///
782    /// Note: There can only be one spawn callback for a runtime; calling this function more
783    /// than once replaces the last callback defined, rather than adding to it.
784    ///
785    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
786    ///
787    /// **Note**: This is an [unstable API][unstable]. The public API of this type
788    /// may break in 1.x releases. See [the documentation on unstable
789    /// features][unstable] for details.
790    ///
791    /// [unstable]: crate#unstable-features
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// # use tokio::runtime;
797    /// # pub fn main() {
798    /// let runtime = runtime::Builder::new_current_thread()
799    ///     .on_task_spawn(|_| {
800    ///         println!("spawning task");
801    ///     })
802    ///     .build()
803    ///     .unwrap();
804    ///
805    /// runtime.block_on(async {
806    ///     tokio::task::spawn(std::future::ready(()));
807    ///
808    ///     for _ in 0..64 {
809    ///         tokio::task::yield_now().await;
810    ///     }
811    /// })
812    /// # }
813    /// ```
814    #[cfg(all(not(loom), tokio_unstable))]
815    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
817    where
818        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819    {
820        self.before_spawn = Some(std::sync::Arc::new(f));
821        self
822    }
823
824    /// Executes function `f` just before a task is polled
825    ///
826    /// `f` is called within the Tokio context, so functions like
827    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
828    /// invoked immediately.
829    ///
830    /// **Note**: This is an [unstable API][unstable]. The public API of this type
831    /// may break in 1.x releases. See [the documentation on unstable
832    /// features][unstable] for details.
833    ///
834    /// [unstable]: crate#unstable-features
835    ///
836    /// # Examples
837    ///
838    /// ```
839    /// # #[cfg(not(target_family = "wasm"))]
840    /// # {
841    /// # use std::sync::{atomic::AtomicUsize, Arc};
842    /// # use tokio::task::yield_now;
843    /// # pub fn main() {
844    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
845    /// let poll_start = poll_start_counter.clone();
846    /// let rt = tokio::runtime::Builder::new_multi_thread()
847    ///     .enable_all()
848    ///     .on_before_task_poll(move |meta| {
849    ///         println!("task {} is about to be polled", meta.id())
850    ///     })
851    ///     .build()
852    ///     .unwrap();
853    /// let task = rt.spawn(async {
854    ///     yield_now().await;
855    /// });
856    /// let _ = rt.block_on(task);
857    ///
858    /// # }
859    /// # }
860    /// ```
861    #[cfg(tokio_unstable)]
862    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
863    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
864    where
865        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
866    {
867        self.before_poll = Some(std::sync::Arc::new(f));
868        self
869    }
870
871    /// Executes function `f` just after a task is polled
872    ///
873    /// `f` is called within the Tokio context, so functions like
874    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
875    /// invoked immediately.
876    ///
877    /// **Note**: This is an [unstable API][unstable]. The public API of this type
878    /// may break in 1.x releases. See [the documentation on unstable
879    /// features][unstable] for details.
880    ///
881    /// [unstable]: crate#unstable-features
882    ///
883    /// # Examples
884    ///
885    /// ```
886    /// # #[cfg(not(target_family = "wasm"))]
887    /// # {
888    /// # use std::sync::{atomic::AtomicUsize, Arc};
889    /// # use tokio::task::yield_now;
890    /// # pub fn main() {
891    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
892    /// let poll_stop = poll_stop_counter.clone();
893    /// let rt = tokio::runtime::Builder::new_multi_thread()
894    ///     .enable_all()
895    ///     .on_after_task_poll(move |meta| {
896    ///         println!("task {} completed polling", meta.id());
897    ///     })
898    ///     .build()
899    ///     .unwrap();
900    /// let task = rt.spawn(async {
901    ///     yield_now().await;
902    /// });
903    /// let _ = rt.block_on(task);
904    ///
905    /// # }
906    /// # }
907    /// ```
908    #[cfg(tokio_unstable)]
909    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
910    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
911    where
912        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
913    {
914        self.after_poll = Some(std::sync::Arc::new(f));
915        self
916    }
917
918    /// Executes function `f` just after a task is terminated.
919    ///
920    /// `f` is called within the Tokio context, so functions like
921    /// [`tokio::spawn`](crate::spawn) can be called.
922    ///
923    /// This can be used for bookkeeping or monitoring purposes.
924    ///
925    /// Note: There can only be one task termination callback for a runtime; calling this
926    /// function more than once replaces the last callback defined, rather than adding to it.
927    ///
928    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
929    ///
930    /// **Note**: This is an [unstable API][unstable]. The public API of this type
931    /// may break in 1.x releases. See [the documentation on unstable
932    /// features][unstable] for details.
933    ///
934    /// [unstable]: crate#unstable-features
935    ///
936    /// # Examples
937    ///
938    /// ```
939    /// # use tokio::runtime;
940    /// # pub fn main() {
941    /// let runtime = runtime::Builder::new_current_thread()
942    ///     .on_task_terminate(|_| {
943    ///         println!("killing task");
944    ///     })
945    ///     .build()
946    ///     .unwrap();
947    ///
948    /// runtime.block_on(async {
949    ///     tokio::task::spawn(std::future::ready(()));
950    ///
951    ///     for _ in 0..64 {
952    ///         tokio::task::yield_now().await;
953    ///     }
954    /// })
955    /// # }
956    /// ```
957    #[cfg(all(not(loom), tokio_unstable))]
958    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
959    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
960    where
961        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
962    {
963        self.after_termination = Some(std::sync::Arc::new(f));
964        self
965    }
966
967    /// Creates the configured `Runtime`.
968    ///
969    /// The returned `Runtime` instance is ready to spawn tasks.
970    ///
971    /// # Examples
972    ///
973    /// ```
974    /// # #[cfg(not(target_family = "wasm"))]
975    /// # {
976    /// use tokio::runtime::Builder;
977    ///
978    /// let rt  = Builder::new_multi_thread().build().unwrap();
979    ///
980    /// rt.block_on(async {
981    ///     println!("Hello from the Tokio runtime");
982    /// });
983    /// # }
984    /// ```
985    pub fn build(&mut self) -> io::Result<Runtime> {
986        match &self.kind {
987            Kind::CurrentThread => self.build_current_thread_runtime(),
988            #[cfg(feature = "rt-multi-thread")]
989            Kind::MultiThread => self.build_threaded_runtime(),
990        }
991    }
992
993    /// Creates the configured [`LocalRuntime`].
994    ///
995    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
996    ///
997    /// # Panics
998    ///
999    /// This will panic if the runtime is configured with [`new_multi_thread()`].
1000    ///
1001    /// [`new_multi_thread()`]: Builder::new_multi_thread
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```
1006    /// use tokio::runtime::{Builder, LocalOptions};
1007    ///
1008    /// let rt = Builder::new_current_thread()
1009    ///     .build_local(LocalOptions::default())
1010    ///     .unwrap();
1011    ///
1012    /// rt.spawn_local(async {
1013    ///     println!("Hello from the Tokio runtime");
1014    /// });
1015    /// ```
1016    #[allow(unused_variables, unreachable_patterns)]
1017    #[cfg(tokio_unstable)]
1018    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1019    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1020        match &self.kind {
1021            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1022            #[cfg(feature = "rt-multi-thread")]
1023            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1024        }
1025    }
1026
1027    fn get_cfg(&self) -> driver::Cfg {
1028        driver::Cfg {
1029            enable_pause_time: match self.kind {
1030                Kind::CurrentThread => true,
1031                #[cfg(feature = "rt-multi-thread")]
1032                Kind::MultiThread => false,
1033            },
1034            enable_io: self.enable_io,
1035            enable_time: self.enable_time,
1036            start_paused: self.start_paused,
1037            nevents: self.nevents,
1038            timer_flavor: self.timer_flavor,
1039        }
1040    }
1041
1042    /// Sets a custom timeout for a thread in the blocking pool.
1043    ///
1044    /// By default, the timeout for a thread is set to 10 seconds. This can
1045    /// be overridden using `.thread_keep_alive()`.
1046    ///
1047    /// # Example
1048    ///
1049    /// ```
1050    /// # #[cfg(not(target_family = "wasm"))]
1051    /// # {
1052    /// # use tokio::runtime;
1053    /// # use std::time::Duration;
1054    /// # pub fn main() {
1055    /// let rt = runtime::Builder::new_multi_thread()
1056    ///     .thread_keep_alive(Duration::from_millis(100))
1057    ///     .build();
1058    /// # }
1059    /// # }
1060    /// ```
1061    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1062        self.keep_alive = Some(duration);
1063        self
1064    }
1065
1066    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1067    /// task queue.
1068    ///
1069    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1070    ///
1071    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1072    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1073    ///
1074    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1075    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1076    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1077    /// getting started on new work, especially if tasks frequently yield rather than complete
1078    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1079    /// tasks from the local queue will be executed only if the global queue is empty.
1080    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1081    /// tasks quickly complete polling.
1082    ///
1083    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1084    ///
1085    /// # Panics
1086    ///
1087    /// This function will panic if 0 is passed as an argument.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```
1092    /// # #[cfg(not(target_family = "wasm"))]
1093    /// # {
1094    /// # use tokio::runtime;
1095    /// # pub fn main() {
1096    /// let rt = runtime::Builder::new_multi_thread()
1097    ///     .global_queue_interval(31)
1098    ///     .build();
1099    /// # }
1100    /// # }
1101    /// ```
1102    #[track_caller]
1103    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1104        assert!(val > 0, "global_queue_interval must be greater than 0");
1105        self.global_queue_interval = Some(val);
1106        self
1107    }
1108
1109    /// Sets the number of scheduler ticks after which the scheduler will poll for
1110    /// external events (timers, I/O, and so on).
1111    ///
1112    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1113    ///
1114    /// By default, the event interval is `61` for all scheduler types.
1115    ///
1116    /// Setting the event interval determines the effective "priority" of delivering
1117    /// these external events (which may wake up additional tasks), compared to
1118    /// executing tasks that are currently ready to run. A smaller value is useful
1119    /// when tasks frequently spend a long time in polling, or frequently yield,
1120    /// which can result in overly long delays picking up I/O events. Conversely,
1121    /// picking up new events requires extra synchronization and syscall overhead,
1122    /// so if tasks generally complete their polling quickly, a higher event interval
1123    /// will minimize that overhead while still keeping the scheduler responsive to
1124    /// events.
1125    ///
1126    /// # Examples
1127    ///
1128    /// ```
1129    /// # #[cfg(not(target_family = "wasm"))]
1130    /// # {
1131    /// # use tokio::runtime;
1132    /// # pub fn main() {
1133    /// let rt = runtime::Builder::new_multi_thread()
1134    ///     .event_interval(31)
1135    ///     .build();
1136    /// # }
1137    /// # }
1138    /// ```
1139    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1140        self.event_interval = val;
1141        self
1142    }
1143
1144    cfg_unstable! {
1145        /// Configure how the runtime responds to an unhandled panic on a
1146        /// spawned task.
1147        ///
1148        /// By default, an unhandled panic (i.e. a panic not caught by
1149        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1150        /// execution. The panic's error value is forwarded to the task's
1151        /// [`JoinHandle`] and all other spawned tasks continue running.
1152        ///
1153        /// The `unhandled_panic` option enables configuring this behavior.
1154        ///
1155        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1156        ///   spawned tasks have no impact on the runtime's execution.
1157        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1158        ///   shutdown immediately when a spawned task panics even if that
1159        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1160        ///   will immediately terminate and further calls to
1161        ///   [`Runtime::block_on`] will panic.
1162        ///
1163        /// # Panics
1164        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1165        /// on a runtime other than the current thread runtime.
1166        ///
1167        /// # Unstable
1168        ///
1169        /// This option is currently unstable and its implementation is
1170        /// incomplete. The API may change or be removed in the future. See
1171        /// issue [tokio-rs/tokio#4516] for more details.
1172        ///
1173        /// # Examples
1174        ///
1175        /// The following demonstrates a runtime configured to shutdown on
1176        /// panic. The first spawned task panics and results in the runtime
1177        /// shutting down. The second spawned task never has a chance to
1178        /// execute. The call to `block_on` will panic due to the runtime being
1179        /// forcibly shutdown.
1180        ///
1181        /// ```should_panic
1182        /// use tokio::runtime::{self, UnhandledPanic};
1183        ///
1184        /// # pub fn main() {
1185        /// let rt = runtime::Builder::new_current_thread()
1186        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1187        ///     .build()
1188        ///     .unwrap();
1189        ///
1190        /// rt.spawn(async { panic!("boom"); });
1191        /// rt.spawn(async {
1192        ///     // This task never completes.
1193        /// });
1194        ///
1195        /// rt.block_on(async {
1196        ///     // Do some work
1197        /// # loop { tokio::task::yield_now().await; }
1198        /// })
1199        /// # }
1200        /// ```
1201        ///
1202        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1203        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1204        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1205            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1206                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1207            }
1208
1209            self.unhandled_panic = behavior;
1210            self
1211        }
1212
1213        /// Disables the LIFO task scheduler heuristic.
1214        ///
1215        /// The multi-threaded scheduler includes a heuristic for optimizing
1216        /// message-passing patterns. This heuristic results in the **last**
1217        /// scheduled task being polled first.
1218        ///
1219        /// To implement this heuristic, each worker thread has a slot which
1220        /// holds the task that should be polled next. However, this slot cannot
1221        /// be stolen by other worker threads, which can result in lower total
1222        /// throughput when tasks tend to have longer poll times.
1223        ///
1224        /// This configuration option will disable this heuristic resulting in
1225        /// all scheduled tasks being pushed into the worker-local queue, which
1226        /// is stealable.
1227        ///
1228        /// Consider trying this option when the task "scheduled" time is high
1229        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1230        /// collect this data.
1231        ///
1232        /// # Unstable
1233        ///
1234        /// This configuration option is considered a workaround for the LIFO
1235        /// slot not being stealable. When the slot becomes stealable, we will
1236        /// revisit whether or not this option is necessary. See
1237        /// issue [tokio-rs/tokio#4941].
1238        ///
1239        /// # Examples
1240        ///
1241        /// ```
1242        /// # #[cfg(not(target_family = "wasm"))]
1243        /// # {
1244        /// use tokio::runtime;
1245        ///
1246        /// let rt = runtime::Builder::new_multi_thread()
1247        ///     .disable_lifo_slot()
1248        ///     .build()
1249        ///     .unwrap();
1250        /// # }
1251        /// ```
1252        ///
1253        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1254        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1255        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1256            self.disable_lifo_slot = true;
1257            self
1258        }
1259
1260        /// Specifies the random number generation seed to use within all
1261        /// threads associated with the runtime being built.
1262        ///
1263        /// This option is intended to make certain parts of the runtime
1264        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1265        /// [`tokio::select!`] it will ensure that the order that branches are
1266        /// polled is deterministic.
1267        ///
1268        /// In addition to the code specifying `rng_seed` and interacting with
1269        /// the runtime, the internals of Tokio and the Rust compiler may affect
1270        /// the sequences of random numbers. In order to ensure repeatable
1271        /// results, the version of Tokio, the versions of all other
1272        /// dependencies that interact with Tokio, and the Rust compiler version
1273        /// should also all remain constant.
1274        ///
1275        /// # Examples
1276        ///
1277        /// ```
1278        /// # use tokio::runtime::{self, RngSeed};
1279        /// # pub fn main() {
1280        /// let seed = RngSeed::from_bytes(b"place your seed here");
1281        /// let rt = runtime::Builder::new_current_thread()
1282        ///     .rng_seed(seed)
1283        ///     .build();
1284        /// # }
1285        /// ```
1286        ///
1287        /// [`tokio::select!`]: crate::select
1288        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1289            self.seed_generator = RngSeedGenerator::new(seed);
1290            self
1291        }
1292    }
1293
1294    cfg_unstable_metrics! {
1295        /// Enables tracking the distribution of task poll times.
1296        ///
1297        /// Task poll times are not instrumented by default as doing so requires
1298        /// calling [`Instant::now()`] twice per task poll, which could add
1299        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1300        /// metrics data.
1301        ///
1302        /// The histogram uses fixed bucket sizes. In other words, the histogram
1303        /// buckets are not dynamic based on input values. Use the
1304        /// `metrics_poll_time_histogram` builder methods to configure the
1305        /// histogram details.
1306        ///
1307        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1308        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1309        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1310        /// to select [`LogHistogram`] instead.
1311        ///
1312        /// # Examples
1313        ///
1314        /// ```
1315        /// # #[cfg(not(target_family = "wasm"))]
1316        /// # {
1317        /// use tokio::runtime;
1318        ///
1319        /// let rt = runtime::Builder::new_multi_thread()
1320        ///     .enable_metrics_poll_time_histogram()
1321        ///     .build()
1322        ///     .unwrap();
1323        /// # // Test default values here
1324        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1325        /// # let m = rt.handle().metrics();
1326        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1327        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1328        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1329        /// # }
1330        /// ```
1331        ///
1332        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1333        /// [`Instant::now()`]: std::time::Instant::now
1334        /// [`LogHistogram`]: crate::runtime::LogHistogram
1335        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1336        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1337            self.metrics_poll_count_histogram_enable = true;
1338            self
1339        }
1340
1341        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1342        ///
1343        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1344        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1345        #[doc(hidden)]
1346        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1347            self.enable_metrics_poll_time_histogram()
1348        }
1349
1350        /// Sets the histogram scale for tracking the distribution of task poll
1351        /// times.
1352        ///
1353        /// Tracking the distribution of task poll times can be done using a
1354        /// linear or log scale. When using linear scale, each histogram bucket
1355        /// will represent the same range of poll times. When using log scale,
1356        /// each histogram bucket will cover a range twice as big as the
1357        /// previous bucket.
1358        ///
1359        /// **Default:** linear scale.
1360        ///
1361        /// # Examples
1362        ///
1363        /// ```
1364        /// # #[cfg(not(target_family = "wasm"))]
1365        /// # {
1366        /// use tokio::runtime::{self, HistogramScale};
1367        ///
1368        /// # #[allow(deprecated)]
1369        /// let rt = runtime::Builder::new_multi_thread()
1370        ///     .enable_metrics_poll_time_histogram()
1371        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1372        ///     .build()
1373        ///     .unwrap();
1374        /// # }
1375        /// ```
1376        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1377        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1378            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1379            self
1380        }
1381
1382        /// Configure the histogram for tracking poll times
1383        ///
1384        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1385        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1386        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1387        ///
1388        /// # Examples
1389        /// Configure a [`LogHistogram`] with [default configuration]:
1390        /// ```
1391        /// # #[cfg(not(target_family = "wasm"))]
1392        /// # {
1393        /// use tokio::runtime;
1394        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1395        ///
1396        /// let rt = runtime::Builder::new_multi_thread()
1397        ///     .enable_metrics_poll_time_histogram()
1398        ///     .metrics_poll_time_histogram_configuration(
1399        ///         HistogramConfiguration::log(LogHistogram::default())
1400        ///     )
1401        ///     .build()
1402        ///     .unwrap();
1403        /// # }
1404        /// ```
1405        ///
1406        /// Configure a linear histogram with 100 buckets, each 10μs wide
1407        /// ```
1408        /// # #[cfg(not(target_family = "wasm"))]
1409        /// # {
1410        /// use tokio::runtime;
1411        /// use std::time::Duration;
1412        /// use tokio::runtime::HistogramConfiguration;
1413        ///
1414        /// let rt = runtime::Builder::new_multi_thread()
1415        ///     .enable_metrics_poll_time_histogram()
1416        ///     .metrics_poll_time_histogram_configuration(
1417        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1418        ///     )
1419        ///     .build()
1420        ///     .unwrap();
1421        /// # }
1422        /// ```
1423        ///
1424        /// Configure a [`LogHistogram`] with the following settings:
1425        /// - Measure times from 100ns to 120s
1426        /// - Max error of 0.1
1427        /// - No more than 1024 buckets
1428        /// ```
1429        /// # #[cfg(not(target_family = "wasm"))]
1430        /// # {
1431        /// use std::time::Duration;
1432        /// use tokio::runtime;
1433        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1434        ///
1435        /// let rt = runtime::Builder::new_multi_thread()
1436        ///     .enable_metrics_poll_time_histogram()
1437        ///     .metrics_poll_time_histogram_configuration(
1438        ///         HistogramConfiguration::log(LogHistogram::builder()
1439        ///             .max_value(Duration::from_secs(120))
1440        ///             .min_value(Duration::from_nanos(100))
1441        ///             .max_error(0.1)
1442        ///             .max_buckets(1024)
1443        ///             .expect("configuration uses 488 buckets")
1444        ///         )
1445        ///     )
1446        ///     .build()
1447        ///     .unwrap();
1448        /// # }
1449        /// ```
1450        ///
1451        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1452        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1453        /// where each bucket is twice the size of the previous bucket.
1454        /// ```rust
1455        /// use std::time::Duration;
1456        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1457        /// let rt = tokio::runtime::Builder::new_current_thread()
1458        ///     .enable_all()
1459        ///     .enable_metrics_poll_time_histogram()
1460        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1461        ///         LogHistogram::builder()
1462        ///             .min_value(Duration::from_micros(20))
1463        ///             .max_value(Duration::from_millis(4))
1464        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1465        ///             .precision_exact(0)
1466        ///             .max_buckets(10)
1467        ///             .unwrap(),
1468        ///     ))
1469        ///     .build()
1470        ///     .unwrap();
1471        /// ```
1472        ///
1473        /// [`LogHistogram`]: crate::runtime::LogHistogram
1474        /// [default configuration]: crate::runtime::LogHistogramBuilder
1475        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1476        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1477            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1478            self
1479        }
1480
1481        /// Sets the histogram resolution for tracking the distribution of task
1482        /// poll times.
1483        ///
1484        /// The resolution is the histogram's first bucket's range. When using a
1485        /// linear histogram scale, each bucket will cover the same range. When
1486        /// using a log scale, each bucket will cover a range twice as big as
1487        /// the previous bucket. In the log case, the resolution represents the
1488        /// smallest bucket range.
1489        ///
1490        /// Note that, when using log scale, the resolution is rounded up to the
1491        /// nearest power of 2 in nanoseconds.
1492        ///
1493        /// **Default:** 100 microseconds.
1494        ///
1495        /// # Examples
1496        ///
1497        /// ```
1498        /// # #[cfg(not(target_family = "wasm"))]
1499        /// # {
1500        /// use tokio::runtime;
1501        /// use std::time::Duration;
1502        ///
1503        /// # #[allow(deprecated)]
1504        /// let rt = runtime::Builder::new_multi_thread()
1505        ///     .enable_metrics_poll_time_histogram()
1506        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1507        ///     .build()
1508        ///     .unwrap();
1509        /// # }
1510        /// ```
1511        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1512        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1513            assert!(resolution > Duration::from_secs(0));
1514            // Sanity check the argument and also make the cast below safe.
1515            assert!(resolution <= Duration::from_secs(1));
1516
1517            let resolution = resolution.as_nanos() as u64;
1518
1519            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1520            self
1521        }
1522
1523        /// Sets the number of buckets for the histogram tracking the
1524        /// distribution of task poll times.
1525        ///
1526        /// The last bucket tracks all greater values that fall out of other
1527        /// ranges. So, configuring the histogram using a linear scale,
1528        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1529        /// polls that take more than 450ms to complete.
1530        ///
1531        /// **Default:** 10
1532        ///
1533        /// # Examples
1534        ///
1535        /// ```
1536        /// # #[cfg(not(target_family = "wasm"))]
1537        /// # {
1538        /// use tokio::runtime;
1539        ///
1540        /// # #[allow(deprecated)]
1541        /// let rt = runtime::Builder::new_multi_thread()
1542        ///     .enable_metrics_poll_time_histogram()
1543        ///     .metrics_poll_count_histogram_buckets(15)
1544        ///     .build()
1545        ///     .unwrap();
1546        /// # }
1547        /// ```
1548        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1549        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1550            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1551            self
1552        }
1553    }
1554
1555    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1556        use crate::runtime::runtime::Scheduler;
1557
1558        let (scheduler, handle, blocking_pool) =
1559            self.build_current_thread_runtime_components(None)?;
1560
1561        Ok(Runtime::from_parts(
1562            Scheduler::CurrentThread(scheduler),
1563            handle,
1564            blocking_pool,
1565        ))
1566    }
1567
1568    #[cfg(tokio_unstable)]
1569    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1570        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1571
1572        let tid = std::thread::current().id();
1573
1574        let (scheduler, handle, blocking_pool) =
1575            self.build_current_thread_runtime_components(Some(tid))?;
1576
1577        Ok(LocalRuntime::from_parts(
1578            LocalRuntimeScheduler::CurrentThread(scheduler),
1579            handle,
1580            blocking_pool,
1581        ))
1582    }
1583
1584    fn build_current_thread_runtime_components(
1585        &mut self,
1586        local_tid: Option<ThreadId>,
1587    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1588        use crate::runtime::scheduler;
1589        use crate::runtime::Config;
1590
1591        let mut cfg = self.get_cfg();
1592        cfg.timer_flavor = TimerFlavor::Traditional;
1593        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1594
1595        // Blocking pool
1596        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1597        let blocking_spawner = blocking_pool.spawner().clone();
1598
1599        // Generate a rng seed for this runtime.
1600        let seed_generator_1 = self.seed_generator.next_generator();
1601        let seed_generator_2 = self.seed_generator.next_generator();
1602
1603        // And now put a single-threaded scheduler on top of the timer. When
1604        // there are no futures ready to do something, it'll let the timer or
1605        // the reactor to generate some new stimuli for the futures to continue
1606        // in their life.
1607        let (scheduler, handle) = CurrentThread::new(
1608            driver,
1609            driver_handle,
1610            blocking_spawner,
1611            seed_generator_2,
1612            Config {
1613                before_park: self.before_park.clone(),
1614                after_unpark: self.after_unpark.clone(),
1615                before_spawn: self.before_spawn.clone(),
1616                #[cfg(tokio_unstable)]
1617                before_poll: self.before_poll.clone(),
1618                #[cfg(tokio_unstable)]
1619                after_poll: self.after_poll.clone(),
1620                after_termination: self.after_termination.clone(),
1621                global_queue_interval: self.global_queue_interval,
1622                event_interval: self.event_interval,
1623                #[cfg(tokio_unstable)]
1624                unhandled_panic: self.unhandled_panic.clone(),
1625                disable_lifo_slot: self.disable_lifo_slot,
1626                seed_generator: seed_generator_1,
1627                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1628            },
1629            local_tid,
1630        );
1631
1632        let handle = Handle {
1633            inner: scheduler::Handle::CurrentThread(handle),
1634        };
1635
1636        Ok((scheduler, handle, blocking_pool))
1637    }
1638
1639    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1640        if self.metrics_poll_count_histogram_enable {
1641            Some(self.metrics_poll_count_histogram.clone())
1642        } else {
1643            None
1644        }
1645    }
1646}
1647
1648cfg_io_driver! {
1649    impl Builder {
1650        /// Enables the I/O driver.
1651        ///
1652        /// Doing this enables using net, process, signal, and some I/O types on
1653        /// the runtime.
1654        ///
1655        /// # Examples
1656        ///
1657        /// ```
1658        /// use tokio::runtime;
1659        ///
1660        /// let rt = runtime::Builder::new_multi_thread()
1661        ///     .enable_io()
1662        ///     .build()
1663        ///     .unwrap();
1664        /// ```
1665        pub fn enable_io(&mut self) -> &mut Self {
1666            self.enable_io = true;
1667            self
1668        }
1669
1670        /// Enables the I/O driver and configures the max number of events to be
1671        /// processed per tick.
1672        ///
1673        /// # Examples
1674        ///
1675        /// ```
1676        /// use tokio::runtime;
1677        ///
1678        /// let rt = runtime::Builder::new_current_thread()
1679        ///     .enable_io()
1680        ///     .max_io_events_per_tick(1024)
1681        ///     .build()
1682        ///     .unwrap();
1683        /// ```
1684        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1685            self.nevents = capacity;
1686            self
1687        }
1688    }
1689}
1690
1691cfg_time! {
1692    impl Builder {
1693        /// Enables the time driver.
1694        ///
1695        /// Doing this enables using `tokio::time` on the runtime.
1696        ///
1697        /// # Examples
1698        ///
1699        /// ```
1700        /// # #[cfg(not(target_family = "wasm"))]
1701        /// # {
1702        /// use tokio::runtime;
1703        ///
1704        /// let rt = runtime::Builder::new_multi_thread()
1705        ///     .enable_time()
1706        ///     .build()
1707        ///     .unwrap();
1708        /// # }
1709        /// ```
1710        pub fn enable_time(&mut self) -> &mut Self {
1711            self.enable_time = true;
1712            self
1713        }
1714    }
1715}
1716
1717cfg_io_uring! {
1718    impl Builder {
1719        /// Enables the tokio's io_uring driver.
1720        ///
1721        /// Doing this enables using io_uring operations on the runtime.
1722        ///
1723        /// # Examples
1724        ///
1725        /// ```
1726        /// use tokio::runtime;
1727        ///
1728        /// let rt = runtime::Builder::new_multi_thread()
1729        ///     .enable_io_uring()
1730        ///     .build()
1731        ///     .unwrap();
1732        /// ```
1733        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1734        pub fn enable_io_uring(&mut self) -> &mut Self {
1735            // Currently, the uring flag is equivalent to `enable_io`.
1736            self.enable_io = true;
1737            self
1738        }
1739    }
1740}
1741
1742cfg_test_util! {
1743    impl Builder {
1744        /// Controls if the runtime's clock starts paused or advancing.
1745        ///
1746        /// Pausing time requires the current-thread runtime; construction of
1747        /// the runtime will panic otherwise.
1748        ///
1749        /// # Examples
1750        ///
1751        /// ```
1752        /// use tokio::runtime;
1753        ///
1754        /// let rt = runtime::Builder::new_current_thread()
1755        ///     .enable_time()
1756        ///     .start_paused(true)
1757        ///     .build()
1758        ///     .unwrap();
1759        /// ```
1760        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1761            self.start_paused = start_paused;
1762            self
1763        }
1764    }
1765}
1766
1767cfg_rt_multi_thread! {
1768    impl Builder {
1769        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1770            use crate::loom::sys::num_cpus;
1771            use crate::runtime::{Config, runtime::Scheduler};
1772            use crate::runtime::scheduler::{self, MultiThread};
1773
1774            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1775
1776            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1777
1778            // Create the blocking pool
1779            let blocking_pool =
1780                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1781            let blocking_spawner = blocking_pool.spawner().clone();
1782
1783            // Generate a rng seed for this runtime.
1784            let seed_generator_1 = self.seed_generator.next_generator();
1785            let seed_generator_2 = self.seed_generator.next_generator();
1786
1787            let (scheduler, handle, launch) = MultiThread::new(
1788                worker_threads,
1789                driver,
1790                driver_handle,
1791                blocking_spawner,
1792                seed_generator_2,
1793                Config {
1794                    before_park: self.before_park.clone(),
1795                    after_unpark: self.after_unpark.clone(),
1796                    before_spawn: self.before_spawn.clone(),
1797                    #[cfg(tokio_unstable)]
1798                    before_poll: self.before_poll.clone(),
1799                    #[cfg(tokio_unstable)]
1800                    after_poll: self.after_poll.clone(),
1801                    after_termination: self.after_termination.clone(),
1802                    global_queue_interval: self.global_queue_interval,
1803                    event_interval: self.event_interval,
1804                    #[cfg(tokio_unstable)]
1805                    unhandled_panic: self.unhandled_panic.clone(),
1806                    disable_lifo_slot: self.disable_lifo_slot,
1807                    seed_generator: seed_generator_1,
1808                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1809                },
1810                self.timer_flavor,
1811            );
1812
1813            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1814
1815            // Spawn the thread pool workers
1816            let _enter = handle.enter();
1817            launch.launch();
1818
1819            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1820        }
1821    }
1822}
1823
1824impl fmt::Debug for Builder {
1825    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1826        fmt.debug_struct("Builder")
1827            .field("worker_threads", &self.worker_threads)
1828            .field("max_blocking_threads", &self.max_blocking_threads)
1829            .field(
1830                "thread_name",
1831                &"<dyn Fn() -> String + Send + Sync + 'static>",
1832            )
1833            .field("thread_stack_size", &self.thread_stack_size)
1834            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1835            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1836            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1837            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1838            .finish()
1839    }
1840}