tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5 blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41/// // build runtime
42/// let runtime = Builder::new_multi_thread()
43/// .worker_threads(4)
44/// .thread_name("my-custom-name")
45/// .thread_stack_size(3 * 1024 * 1024)
46/// .build()
47/// .unwrap();
48///
49/// // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54 /// Runtime type
55 kind: Kind,
56
57 /// Whether or not to enable the I/O driver
58 enable_io: bool,
59 nevents: usize,
60
61 /// Whether or not to enable the time driver
62 enable_time: bool,
63
64 /// Whether or not the clock should start paused.
65 start_paused: bool,
66
67 /// The number of worker threads, used by Runtime.
68 ///
69 /// Only used when not using the current-thread executor.
70 worker_threads: Option<usize>,
71
72 /// Cap on thread usage.
73 max_blocking_threads: usize,
74
75 /// Name fn used for threads spawned by the runtime.
76 pub(super) thread_name: ThreadNameFn,
77
78 /// Stack size used for threads spawned by the runtime.
79 pub(super) thread_stack_size: Option<usize>,
80
81 /// Callback to run after each thread starts.
82 pub(super) after_start: Option<Callback>,
83
84 /// To run before each worker thread stops
85 pub(super) before_stop: Option<Callback>,
86
87 /// To run before each worker thread is parked.
88 pub(super) before_park: Option<Callback>,
89
90 /// To run after each thread is unparked.
91 pub(super) after_unpark: Option<Callback>,
92
93 /// To run before each task is spawned.
94 pub(super) before_spawn: Option<TaskCallback>,
95
96 /// To run before each poll
97 #[cfg(tokio_unstable)]
98 pub(super) before_poll: Option<TaskCallback>,
99
100 /// To run after each poll
101 #[cfg(tokio_unstable)]
102 pub(super) after_poll: Option<TaskCallback>,
103
104 /// To run after each task is terminated.
105 pub(super) after_termination: Option<TaskCallback>,
106
107 /// Customizable keep alive timeout for `BlockingPool`
108 pub(super) keep_alive: Option<Duration>,
109
110 /// How many ticks before pulling a task from the global/remote queue?
111 ///
112 /// When `None`, the value is unspecified and behavior details are left to
113 /// the scheduler. Each scheduler flavor could choose to either pick its own
114 /// default value or use some other strategy to decide when to poll from the
115 /// global queue. For example, the multi-threaded scheduler uses a
116 /// self-tuning strategy based on mean task poll times.
117 pub(super) global_queue_interval: Option<u32>,
118
119 /// How many ticks before yielding to the driver for timer and I/O events?
120 pub(super) event_interval: u32,
121
122 /// When true, the multi-threade scheduler LIFO slot should not be used.
123 ///
124 /// This option should only be exposed as unstable.
125 pub(super) disable_lifo_slot: bool,
126
127 /// Specify a random number generator seed to provide deterministic results
128 pub(super) seed_generator: RngSeedGenerator,
129
130 /// When true, enables task poll count histogram instrumentation.
131 pub(super) metrics_poll_count_histogram_enable: bool,
132
133 /// Configures the task poll count histogram
134 pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136 #[cfg(tokio_unstable)]
137 pub(super) unhandled_panic: UnhandledPanic,
138
139 timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143 /// How the runtime should respond to unhandled panics.
144 ///
145 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146 /// to configure the runtime behavior when a spawned task panics.
147 ///
148 /// See [`Builder::unhandled_panic`] for more details.
149 #[derive(Debug, Clone)]
150 #[non_exhaustive]
151 pub enum UnhandledPanic {
152 /// The runtime should ignore panics on spawned tasks.
153 ///
154 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155 /// tasks continue running normally.
156 ///
157 /// This is the default behavior.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// # #[cfg(not(target_family = "wasm"))]
163 /// # {
164 /// use tokio::runtime::{self, UnhandledPanic};
165 ///
166 /// # pub fn main() {
167 /// let rt = runtime::Builder::new_current_thread()
168 /// .unhandled_panic(UnhandledPanic::Ignore)
169 /// .build()
170 /// .unwrap();
171 ///
172 /// let task1 = rt.spawn(async { panic!("boom"); });
173 /// let task2 = rt.spawn(async {
174 /// // This task completes normally
175 /// "done"
176 /// });
177 ///
178 /// rt.block_on(async {
179 /// // The panic on the first task is forwarded to the `JoinHandle`
180 /// assert!(task1.await.is_err());
181 ///
182 /// // The second task completes normally
183 /// assert!(task2.await.is_ok());
184 /// })
185 /// # }
186 /// # }
187 /// ```
188 ///
189 /// [`JoinHandle`]: struct@crate::task::JoinHandle
190 Ignore,
191
192 /// The runtime should immediately shutdown if a spawned task panics.
193 ///
194 /// The runtime will immediately shutdown even if the panicked task's
195 /// [`JoinHandle`] is still available. All further spawned tasks will be
196 /// immediately dropped and call to [`Runtime::block_on`] will panic.
197 ///
198 /// # Examples
199 ///
200 /// ```should_panic
201 /// use tokio::runtime::{self, UnhandledPanic};
202 ///
203 /// # pub fn main() {
204 /// let rt = runtime::Builder::new_current_thread()
205 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206 /// .build()
207 /// .unwrap();
208 ///
209 /// rt.spawn(async { panic!("boom"); });
210 /// rt.spawn(async {
211 /// // This task never completes.
212 /// });
213 ///
214 /// rt.block_on(async {
215 /// // Do some work
216 /// # loop { tokio::task::yield_now().await; }
217 /// })
218 /// # }
219 /// ```
220 ///
221 /// [`JoinHandle`]: struct@crate::task::JoinHandle
222 ShutdownRuntime,
223 }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230 CurrentThread,
231 #[cfg(feature = "rt-multi-thread")]
232 MultiThread,
233}
234
235impl Builder {
236 /// Returns a new builder with the current thread scheduler selected.
237 ///
238 /// Configuration methods can be chained on the return value.
239 ///
240 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241 /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable).
242 ///
243 /// [`LocalSet`]: crate::task::LocalSet
244 /// [`LocalRuntime`]: crate::runtime::LocalRuntime
245 /// [`build_local`]: crate::runtime::Builder::build_local
246 pub fn new_current_thread() -> Builder {
247 #[cfg(loom)]
248 const EVENT_INTERVAL: u32 = 4;
249 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
250 #[cfg(not(loom))]
251 const EVENT_INTERVAL: u32 = 61;
252
253 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
254 }
255
256 /// Returns a new builder with the multi thread scheduler selected.
257 ///
258 /// Configuration methods can be chained on the return value.
259 #[cfg(feature = "rt-multi-thread")]
260 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
261 pub fn new_multi_thread() -> Builder {
262 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
263 Builder::new(Kind::MultiThread, 61)
264 }
265
266 /// Returns a new runtime builder initialized with default configuration
267 /// values.
268 ///
269 /// Configuration methods can be chained on the return value.
270 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
271 Builder {
272 kind,
273
274 // I/O defaults to "off"
275 enable_io: false,
276 nevents: 1024,
277
278 // Time defaults to "off"
279 enable_time: false,
280
281 // The clock starts not-paused
282 start_paused: false,
283
284 // Read from environment variable first in multi-threaded mode.
285 // Default to lazy auto-detection (one thread per CPU core)
286 worker_threads: None,
287
288 max_blocking_threads: 512,
289
290 // Default thread name
291 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
292
293 // Do not set a stack size by default
294 thread_stack_size: None,
295
296 // No worker thread callbacks
297 after_start: None,
298 before_stop: None,
299 before_park: None,
300 after_unpark: None,
301
302 before_spawn: None,
303 after_termination: None,
304
305 #[cfg(tokio_unstable)]
306 before_poll: None,
307 #[cfg(tokio_unstable)]
308 after_poll: None,
309
310 keep_alive: None,
311
312 // Defaults for these values depend on the scheduler kind, so we get them
313 // as parameters.
314 global_queue_interval: None,
315 event_interval,
316
317 seed_generator: RngSeedGenerator::new(RngSeed::new()),
318
319 #[cfg(tokio_unstable)]
320 unhandled_panic: UnhandledPanic::Ignore,
321
322 metrics_poll_count_histogram_enable: false,
323
324 metrics_poll_count_histogram: HistogramBuilder::default(),
325
326 disable_lifo_slot: false,
327
328 timer_flavor: TimerFlavor::Traditional,
329 }
330 }
331
332 /// Enables both I/O and time drivers.
333 ///
334 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
335 /// individually. If additional components are added to Tokio in the future,
336 /// `enable_all` will include these future components.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// # #[cfg(not(target_family = "wasm"))]
342 /// # {
343 /// use tokio::runtime;
344 ///
345 /// let rt = runtime::Builder::new_multi_thread()
346 /// .enable_all()
347 /// .build()
348 /// .unwrap();
349 /// # }
350 /// ```
351 pub fn enable_all(&mut self) -> &mut Self {
352 #[cfg(any(
353 feature = "net",
354 all(unix, feature = "process"),
355 all(unix, feature = "signal")
356 ))]
357 self.enable_io();
358
359 #[cfg(all(
360 tokio_unstable,
361 feature = "io-uring",
362 feature = "rt",
363 feature = "fs",
364 target_os = "linux",
365 ))]
366 self.enable_io_uring();
367
368 #[cfg(feature = "time")]
369 self.enable_time();
370
371 self
372 }
373
374 /// Enables the alternative timer implementation, which is disabled by default.
375 ///
376 /// The alternative timer implementation is an unstable feature that may
377 /// provide better performance on multi-threaded runtimes with a large number
378 /// of worker threads.
379 ///
380 /// This option only applies to multi-threaded runtimes. Attempting to use
381 /// this option with any other runtime type will have no effect.
382 ///
383 /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// # #[cfg(not(target_family = "wasm"))]
389 /// # {
390 /// use tokio::runtime;
391 ///
392 /// let rt = runtime::Builder::new_multi_thread()
393 /// .enable_alt_timer()
394 /// .build()
395 /// .unwrap();
396 /// # }
397 /// ```
398 #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
399 #[cfg_attr(
400 docsrs,
401 doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
402 )]
403 pub fn enable_alt_timer(&mut self) -> &mut Self {
404 self.enable_time();
405 self.timer_flavor = TimerFlavor::Alternative;
406 self
407 }
408
409 /// Sets the number of worker threads the `Runtime` will use.
410 ///
411 /// This can be any number above 0 though it is advised to keep this value
412 /// on the smaller side.
413 ///
414 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
415 ///
416 /// # Default
417 ///
418 /// The default value is the number of cores available to the system.
419 ///
420 /// When using the `current_thread` runtime this method has no effect.
421 ///
422 /// # Examples
423 ///
424 /// ## Multi threaded runtime with 4 threads
425 ///
426 /// ```
427 /// # #[cfg(not(target_family = "wasm"))]
428 /// # {
429 /// use tokio::runtime;
430 ///
431 /// // This will spawn a work-stealing runtime with 4 worker threads.
432 /// let rt = runtime::Builder::new_multi_thread()
433 /// .worker_threads(4)
434 /// .build()
435 /// .unwrap();
436 ///
437 /// rt.spawn(async move {});
438 /// # }
439 /// ```
440 ///
441 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
442 ///
443 /// ```
444 /// use tokio::runtime;
445 ///
446 /// // Create a runtime that _must_ be driven from a call
447 /// // to `Runtime::block_on`.
448 /// let rt = runtime::Builder::new_current_thread()
449 /// .build()
450 /// .unwrap();
451 ///
452 /// // This will run the runtime and future on the current thread
453 /// rt.block_on(async move {});
454 /// ```
455 ///
456 /// # Panics
457 ///
458 /// This will panic if `val` is not larger than `0`.
459 #[track_caller]
460 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
461 assert!(val > 0, "Worker threads cannot be set to 0");
462 self.worker_threads = Some(val);
463 self
464 }
465
466 /// Specifies the limit for additional threads spawned by the Runtime.
467 ///
468 /// These threads are used for blocking operations like tasks spawned
469 /// through [`spawn_blocking`], this includes but is not limited to:
470 /// - [`fs`] operations
471 /// - dns resolution through [`ToSocketAddrs`]
472 /// - writing to [`Stdout`] or [`Stderr`]
473 /// - reading from [`Stdin`]
474 ///
475 /// Unlike the [`worker_threads`], they are not always active and will exit
476 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
477 ///
478 /// It's recommended to not set this limit too low in order to avoid hanging on operations
479 /// requiring [`spawn_blocking`].
480 ///
481 /// The default value is 512.
482 ///
483 /// # Queue Behavior
484 ///
485 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
486 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
487 /// method has not been reached, a new thread will be spawned. If no idle thread is available
488 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
489 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
490 /// it could potentially grow unbounded.
491 ///
492 /// # Panics
493 ///
494 /// This will panic if `val` is not larger than `0`.
495 ///
496 /// # Upgrading from 0.x
497 ///
498 /// In old versions `max_threads` limited both blocking and worker threads, but the
499 /// current `max_blocking_threads` does not include async worker threads in the count.
500 ///
501 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
502 /// [`fs`]: mod@crate::fs
503 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
504 /// [`Stdout`]: struct@crate::io::Stdout
505 /// [`Stdin`]: struct@crate::io::Stdin
506 /// [`Stderr`]: struct@crate::io::Stderr
507 /// [`worker_threads`]: Self::worker_threads
508 /// [`thread_keep_alive`]: Self::thread_keep_alive
509 #[track_caller]
510 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
511 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
512 assert!(val > 0, "Max blocking threads cannot be set to 0");
513 self.max_blocking_threads = val;
514 self
515 }
516
517 /// Sets name of threads spawned by the `Runtime`'s thread pool.
518 ///
519 /// The default name is "tokio-runtime-worker".
520 ///
521 /// # Examples
522 ///
523 /// ```
524 /// # #[cfg(not(target_family = "wasm"))]
525 /// # {
526 /// # use tokio::runtime;
527 ///
528 /// # pub fn main() {
529 /// let rt = runtime::Builder::new_multi_thread()
530 /// .thread_name("my-pool")
531 /// .build();
532 /// # }
533 /// # }
534 /// ```
535 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
536 let val = val.into();
537 self.thread_name = std::sync::Arc::new(move || val.clone());
538 self
539 }
540
541 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
542 ///
543 /// The default name fn is `|| "tokio-runtime-worker".into()`.
544 ///
545 /// # Examples
546 ///
547 /// ```
548 /// # #[cfg(not(target_family = "wasm"))]
549 /// # {
550 /// # use tokio::runtime;
551 /// # use std::sync::atomic::{AtomicUsize, Ordering};
552 /// # pub fn main() {
553 /// let rt = runtime::Builder::new_multi_thread()
554 /// .thread_name_fn(|| {
555 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
556 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
557 /// format!("my-pool-{}", id)
558 /// })
559 /// .build();
560 /// # }
561 /// # }
562 /// ```
563 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
564 where
565 F: Fn() -> String + Send + Sync + 'static,
566 {
567 self.thread_name = std::sync::Arc::new(f);
568 self
569 }
570
571 /// Sets the stack size (in bytes) for worker threads.
572 ///
573 /// The actual stack size may be greater than this value if the platform
574 /// specifies minimal stack size.
575 ///
576 /// The default stack size for spawned threads is 2 MiB, though this
577 /// particular stack size is subject to change in the future.
578 ///
579 /// # Examples
580 ///
581 /// ```
582 /// # #[cfg(not(target_family = "wasm"))]
583 /// # {
584 /// # use tokio::runtime;
585 ///
586 /// # pub fn main() {
587 /// let rt = runtime::Builder::new_multi_thread()
588 /// .thread_stack_size(32 * 1024)
589 /// .build();
590 /// # }
591 /// # }
592 /// ```
593 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
594 self.thread_stack_size = Some(val);
595 self
596 }
597
598 /// Executes function `f` after each thread is started but before it starts
599 /// doing work.
600 ///
601 /// This is intended for bookkeeping and monitoring use cases.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// # #[cfg(not(target_family = "wasm"))]
607 /// # {
608 /// # use tokio::runtime;
609 /// # pub fn main() {
610 /// let runtime = runtime::Builder::new_multi_thread()
611 /// .on_thread_start(|| {
612 /// println!("thread started");
613 /// })
614 /// .build();
615 /// # }
616 /// # }
617 /// ```
618 #[cfg(not(loom))]
619 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
620 where
621 F: Fn() + Send + Sync + 'static,
622 {
623 self.after_start = Some(std::sync::Arc::new(f));
624 self
625 }
626
627 /// Executes function `f` before each thread stops.
628 ///
629 /// This is intended for bookkeeping and monitoring use cases.
630 ///
631 /// # Examples
632 ///
633 /// ```
634 /// # #[cfg(not(target_family = "wasm"))]
635 /// {
636 /// # use tokio::runtime;
637 /// # pub fn main() {
638 /// let runtime = runtime::Builder::new_multi_thread()
639 /// .on_thread_stop(|| {
640 /// println!("thread stopping");
641 /// })
642 /// .build();
643 /// # }
644 /// # }
645 /// ```
646 #[cfg(not(loom))]
647 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
648 where
649 F: Fn() + Send + Sync + 'static,
650 {
651 self.before_stop = Some(std::sync::Arc::new(f));
652 self
653 }
654
655 /// Executes function `f` just before a thread is parked (goes idle).
656 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
657 /// can be called, and may result in this thread being unparked immediately.
658 ///
659 /// This can be used to start work only when the executor is idle, or for bookkeeping
660 /// and monitoring purposes.
661 ///
662 /// Note: There can only be one park callback for a runtime; calling this function
663 /// more than once replaces the last callback defined, rather than adding to it.
664 ///
665 /// # Examples
666 ///
667 /// ## Multithreaded executor
668 /// ```
669 /// # #[cfg(not(target_family = "wasm"))]
670 /// # {
671 /// # use std::sync::Arc;
672 /// # use std::sync::atomic::{AtomicBool, Ordering};
673 /// # use tokio::runtime;
674 /// # use tokio::sync::Barrier;
675 /// # pub fn main() {
676 /// let once = AtomicBool::new(true);
677 /// let barrier = Arc::new(Barrier::new(2));
678 ///
679 /// let runtime = runtime::Builder::new_multi_thread()
680 /// .worker_threads(1)
681 /// .on_thread_park({
682 /// let barrier = barrier.clone();
683 /// move || {
684 /// let barrier = barrier.clone();
685 /// if once.swap(false, Ordering::Relaxed) {
686 /// tokio::spawn(async move { barrier.wait().await; });
687 /// }
688 /// }
689 /// })
690 /// .build()
691 /// .unwrap();
692 ///
693 /// runtime.block_on(async {
694 /// barrier.wait().await;
695 /// })
696 /// # }
697 /// # }
698 /// ```
699 /// ## Current thread executor
700 /// ```
701 /// # use std::sync::Arc;
702 /// # use std::sync::atomic::{AtomicBool, Ordering};
703 /// # use tokio::runtime;
704 /// # use tokio::sync::Barrier;
705 /// # pub fn main() {
706 /// let once = AtomicBool::new(true);
707 /// let barrier = Arc::new(Barrier::new(2));
708 ///
709 /// let runtime = runtime::Builder::new_current_thread()
710 /// .on_thread_park({
711 /// let barrier = barrier.clone();
712 /// move || {
713 /// let barrier = barrier.clone();
714 /// if once.swap(false, Ordering::Relaxed) {
715 /// tokio::spawn(async move { barrier.wait().await; });
716 /// }
717 /// }
718 /// })
719 /// .build()
720 /// .unwrap();
721 ///
722 /// runtime.block_on(async {
723 /// barrier.wait().await;
724 /// })
725 /// # }
726 /// ```
727 #[cfg(not(loom))]
728 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
729 where
730 F: Fn() + Send + Sync + 'static,
731 {
732 self.before_park = Some(std::sync::Arc::new(f));
733 self
734 }
735
736 /// Executes function `f` just after a thread unparks (starts executing tasks).
737 ///
738 /// This is intended for bookkeeping and monitoring use cases; note that work
739 /// in this callback will increase latencies when the application has allowed one or
740 /// more runtime threads to go idle.
741 ///
742 /// Note: There can only be one unpark callback for a runtime; calling this function
743 /// more than once replaces the last callback defined, rather than adding to it.
744 ///
745 /// # Examples
746 ///
747 /// ```
748 /// # #[cfg(not(target_family = "wasm"))]
749 /// # {
750 /// # use tokio::runtime;
751 /// # pub fn main() {
752 /// let runtime = runtime::Builder::new_multi_thread()
753 /// .on_thread_unpark(|| {
754 /// println!("thread unparking");
755 /// })
756 /// .build();
757 ///
758 /// runtime.unwrap().block_on(async {
759 /// tokio::task::yield_now().await;
760 /// println!("Hello from Tokio!");
761 /// })
762 /// # }
763 /// # }
764 /// ```
765 #[cfg(not(loom))]
766 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
767 where
768 F: Fn() + Send + Sync + 'static,
769 {
770 self.after_unpark = Some(std::sync::Arc::new(f));
771 self
772 }
773
774 /// Executes function `f` just before a task is spawned.
775 ///
776 /// `f` is called within the Tokio context, so functions like
777 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
778 /// invoked immediately.
779 ///
780 /// This can be used for bookkeeping or monitoring purposes.
781 ///
782 /// Note: There can only be one spawn callback for a runtime; calling this function more
783 /// than once replaces the last callback defined, rather than adding to it.
784 ///
785 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
786 ///
787 /// **Note**: This is an [unstable API][unstable]. The public API of this type
788 /// may break in 1.x releases. See [the documentation on unstable
789 /// features][unstable] for details.
790 ///
791 /// [unstable]: crate#unstable-features
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # use tokio::runtime;
797 /// # pub fn main() {
798 /// let runtime = runtime::Builder::new_current_thread()
799 /// .on_task_spawn(|_| {
800 /// println!("spawning task");
801 /// })
802 /// .build()
803 /// .unwrap();
804 ///
805 /// runtime.block_on(async {
806 /// tokio::task::spawn(std::future::ready(()));
807 ///
808 /// for _ in 0..64 {
809 /// tokio::task::yield_now().await;
810 /// }
811 /// })
812 /// # }
813 /// ```
814 #[cfg(all(not(loom), tokio_unstable))]
815 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
817 where
818 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819 {
820 self.before_spawn = Some(std::sync::Arc::new(f));
821 self
822 }
823
824 /// Executes function `f` just before a task is polled
825 ///
826 /// `f` is called within the Tokio context, so functions like
827 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
828 /// invoked immediately.
829 ///
830 /// **Note**: This is an [unstable API][unstable]. The public API of this type
831 /// may break in 1.x releases. See [the documentation on unstable
832 /// features][unstable] for details.
833 ///
834 /// [unstable]: crate#unstable-features
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// # #[cfg(not(target_family = "wasm"))]
840 /// # {
841 /// # use std::sync::{atomic::AtomicUsize, Arc};
842 /// # use tokio::task::yield_now;
843 /// # pub fn main() {
844 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
845 /// let poll_start = poll_start_counter.clone();
846 /// let rt = tokio::runtime::Builder::new_multi_thread()
847 /// .enable_all()
848 /// .on_before_task_poll(move |meta| {
849 /// println!("task {} is about to be polled", meta.id())
850 /// })
851 /// .build()
852 /// .unwrap();
853 /// let task = rt.spawn(async {
854 /// yield_now().await;
855 /// });
856 /// let _ = rt.block_on(task);
857 ///
858 /// # }
859 /// # }
860 /// ```
861 #[cfg(tokio_unstable)]
862 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
863 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
864 where
865 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
866 {
867 self.before_poll = Some(std::sync::Arc::new(f));
868 self
869 }
870
871 /// Executes function `f` just after a task is polled
872 ///
873 /// `f` is called within the Tokio context, so functions like
874 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
875 /// invoked immediately.
876 ///
877 /// **Note**: This is an [unstable API][unstable]. The public API of this type
878 /// may break in 1.x releases. See [the documentation on unstable
879 /// features][unstable] for details.
880 ///
881 /// [unstable]: crate#unstable-features
882 ///
883 /// # Examples
884 ///
885 /// ```
886 /// # #[cfg(not(target_family = "wasm"))]
887 /// # {
888 /// # use std::sync::{atomic::AtomicUsize, Arc};
889 /// # use tokio::task::yield_now;
890 /// # pub fn main() {
891 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
892 /// let poll_stop = poll_stop_counter.clone();
893 /// let rt = tokio::runtime::Builder::new_multi_thread()
894 /// .enable_all()
895 /// .on_after_task_poll(move |meta| {
896 /// println!("task {} completed polling", meta.id());
897 /// })
898 /// .build()
899 /// .unwrap();
900 /// let task = rt.spawn(async {
901 /// yield_now().await;
902 /// });
903 /// let _ = rt.block_on(task);
904 ///
905 /// # }
906 /// # }
907 /// ```
908 #[cfg(tokio_unstable)]
909 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
910 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
911 where
912 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
913 {
914 self.after_poll = Some(std::sync::Arc::new(f));
915 self
916 }
917
918 /// Executes function `f` just after a task is terminated.
919 ///
920 /// `f` is called within the Tokio context, so functions like
921 /// [`tokio::spawn`](crate::spawn) can be called.
922 ///
923 /// This can be used for bookkeeping or monitoring purposes.
924 ///
925 /// Note: There can only be one task termination callback for a runtime; calling this
926 /// function more than once replaces the last callback defined, rather than adding to it.
927 ///
928 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
929 ///
930 /// **Note**: This is an [unstable API][unstable]. The public API of this type
931 /// may break in 1.x releases. See [the documentation on unstable
932 /// features][unstable] for details.
933 ///
934 /// [unstable]: crate#unstable-features
935 ///
936 /// # Examples
937 ///
938 /// ```
939 /// # use tokio::runtime;
940 /// # pub fn main() {
941 /// let runtime = runtime::Builder::new_current_thread()
942 /// .on_task_terminate(|_| {
943 /// println!("killing task");
944 /// })
945 /// .build()
946 /// .unwrap();
947 ///
948 /// runtime.block_on(async {
949 /// tokio::task::spawn(std::future::ready(()));
950 ///
951 /// for _ in 0..64 {
952 /// tokio::task::yield_now().await;
953 /// }
954 /// })
955 /// # }
956 /// ```
957 #[cfg(all(not(loom), tokio_unstable))]
958 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
959 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
960 where
961 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
962 {
963 self.after_termination = Some(std::sync::Arc::new(f));
964 self
965 }
966
967 /// Creates the configured `Runtime`.
968 ///
969 /// The returned `Runtime` instance is ready to spawn tasks.
970 ///
971 /// # Examples
972 ///
973 /// ```
974 /// # #[cfg(not(target_family = "wasm"))]
975 /// # {
976 /// use tokio::runtime::Builder;
977 ///
978 /// let rt = Builder::new_multi_thread().build().unwrap();
979 ///
980 /// rt.block_on(async {
981 /// println!("Hello from the Tokio runtime");
982 /// });
983 /// # }
984 /// ```
985 pub fn build(&mut self) -> io::Result<Runtime> {
986 match &self.kind {
987 Kind::CurrentThread => self.build_current_thread_runtime(),
988 #[cfg(feature = "rt-multi-thread")]
989 Kind::MultiThread => self.build_threaded_runtime(),
990 }
991 }
992
993 /// Creates the configured [`LocalRuntime`].
994 ///
995 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
996 ///
997 /// # Panics
998 ///
999 /// This will panic if the runtime is configured with [`new_multi_thread()`].
1000 ///
1001 /// [`new_multi_thread()`]: Builder::new_multi_thread
1002 ///
1003 /// # Examples
1004 ///
1005 /// ```
1006 /// use tokio::runtime::{Builder, LocalOptions};
1007 ///
1008 /// let rt = Builder::new_current_thread()
1009 /// .build_local(LocalOptions::default())
1010 /// .unwrap();
1011 ///
1012 /// rt.spawn_local(async {
1013 /// println!("Hello from the Tokio runtime");
1014 /// });
1015 /// ```
1016 #[allow(unused_variables, unreachable_patterns)]
1017 #[cfg(tokio_unstable)]
1018 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1019 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1020 match &self.kind {
1021 Kind::CurrentThread => self.build_current_thread_local_runtime(),
1022 #[cfg(feature = "rt-multi-thread")]
1023 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1024 }
1025 }
1026
1027 fn get_cfg(&self) -> driver::Cfg {
1028 driver::Cfg {
1029 enable_pause_time: match self.kind {
1030 Kind::CurrentThread => true,
1031 #[cfg(feature = "rt-multi-thread")]
1032 Kind::MultiThread => false,
1033 },
1034 enable_io: self.enable_io,
1035 enable_time: self.enable_time,
1036 start_paused: self.start_paused,
1037 nevents: self.nevents,
1038 timer_flavor: self.timer_flavor,
1039 }
1040 }
1041
1042 /// Sets a custom timeout for a thread in the blocking pool.
1043 ///
1044 /// By default, the timeout for a thread is set to 10 seconds. This can
1045 /// be overridden using `.thread_keep_alive()`.
1046 ///
1047 /// # Example
1048 ///
1049 /// ```
1050 /// # #[cfg(not(target_family = "wasm"))]
1051 /// # {
1052 /// # use tokio::runtime;
1053 /// # use std::time::Duration;
1054 /// # pub fn main() {
1055 /// let rt = runtime::Builder::new_multi_thread()
1056 /// .thread_keep_alive(Duration::from_millis(100))
1057 /// .build();
1058 /// # }
1059 /// # }
1060 /// ```
1061 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1062 self.keep_alive = Some(duration);
1063 self
1064 }
1065
1066 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1067 /// task queue.
1068 ///
1069 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1070 ///
1071 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1072 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1073 ///
1074 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1075 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1076 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1077 /// getting started on new work, especially if tasks frequently yield rather than complete
1078 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1079 /// tasks from the local queue will be executed only if the global queue is empty.
1080 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1081 /// tasks quickly complete polling.
1082 ///
1083 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1084 ///
1085 /// # Panics
1086 ///
1087 /// This function will panic if 0 is passed as an argument.
1088 ///
1089 /// # Examples
1090 ///
1091 /// ```
1092 /// # #[cfg(not(target_family = "wasm"))]
1093 /// # {
1094 /// # use tokio::runtime;
1095 /// # pub fn main() {
1096 /// let rt = runtime::Builder::new_multi_thread()
1097 /// .global_queue_interval(31)
1098 /// .build();
1099 /// # }
1100 /// # }
1101 /// ```
1102 #[track_caller]
1103 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1104 assert!(val > 0, "global_queue_interval must be greater than 0");
1105 self.global_queue_interval = Some(val);
1106 self
1107 }
1108
1109 /// Sets the number of scheduler ticks after which the scheduler will poll for
1110 /// external events (timers, I/O, and so on).
1111 ///
1112 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1113 ///
1114 /// By default, the event interval is `61` for all scheduler types.
1115 ///
1116 /// Setting the event interval determines the effective "priority" of delivering
1117 /// these external events (which may wake up additional tasks), compared to
1118 /// executing tasks that are currently ready to run. A smaller value is useful
1119 /// when tasks frequently spend a long time in polling, or frequently yield,
1120 /// which can result in overly long delays picking up I/O events. Conversely,
1121 /// picking up new events requires extra synchronization and syscall overhead,
1122 /// so if tasks generally complete their polling quickly, a higher event interval
1123 /// will minimize that overhead while still keeping the scheduler responsive to
1124 /// events.
1125 ///
1126 /// # Examples
1127 ///
1128 /// ```
1129 /// # #[cfg(not(target_family = "wasm"))]
1130 /// # {
1131 /// # use tokio::runtime;
1132 /// # pub fn main() {
1133 /// let rt = runtime::Builder::new_multi_thread()
1134 /// .event_interval(31)
1135 /// .build();
1136 /// # }
1137 /// # }
1138 /// ```
1139 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1140 self.event_interval = val;
1141 self
1142 }
1143
1144 cfg_unstable! {
1145 /// Configure how the runtime responds to an unhandled panic on a
1146 /// spawned task.
1147 ///
1148 /// By default, an unhandled panic (i.e. a panic not caught by
1149 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1150 /// execution. The panic's error value is forwarded to the task's
1151 /// [`JoinHandle`] and all other spawned tasks continue running.
1152 ///
1153 /// The `unhandled_panic` option enables configuring this behavior.
1154 ///
1155 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1156 /// spawned tasks have no impact on the runtime's execution.
1157 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1158 /// shutdown immediately when a spawned task panics even if that
1159 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1160 /// will immediately terminate and further calls to
1161 /// [`Runtime::block_on`] will panic.
1162 ///
1163 /// # Panics
1164 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1165 /// on a runtime other than the current thread runtime.
1166 ///
1167 /// # Unstable
1168 ///
1169 /// This option is currently unstable and its implementation is
1170 /// incomplete. The API may change or be removed in the future. See
1171 /// issue [tokio-rs/tokio#4516] for more details.
1172 ///
1173 /// # Examples
1174 ///
1175 /// The following demonstrates a runtime configured to shutdown on
1176 /// panic. The first spawned task panics and results in the runtime
1177 /// shutting down. The second spawned task never has a chance to
1178 /// execute. The call to `block_on` will panic due to the runtime being
1179 /// forcibly shutdown.
1180 ///
1181 /// ```should_panic
1182 /// use tokio::runtime::{self, UnhandledPanic};
1183 ///
1184 /// # pub fn main() {
1185 /// let rt = runtime::Builder::new_current_thread()
1186 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1187 /// .build()
1188 /// .unwrap();
1189 ///
1190 /// rt.spawn(async { panic!("boom"); });
1191 /// rt.spawn(async {
1192 /// // This task never completes.
1193 /// });
1194 ///
1195 /// rt.block_on(async {
1196 /// // Do some work
1197 /// # loop { tokio::task::yield_now().await; }
1198 /// })
1199 /// # }
1200 /// ```
1201 ///
1202 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1203 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1204 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1205 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1206 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1207 }
1208
1209 self.unhandled_panic = behavior;
1210 self
1211 }
1212
1213 /// Disables the LIFO task scheduler heuristic.
1214 ///
1215 /// The multi-threaded scheduler includes a heuristic for optimizing
1216 /// message-passing patterns. This heuristic results in the **last**
1217 /// scheduled task being polled first.
1218 ///
1219 /// To implement this heuristic, each worker thread has a slot which
1220 /// holds the task that should be polled next. However, this slot cannot
1221 /// be stolen by other worker threads, which can result in lower total
1222 /// throughput when tasks tend to have longer poll times.
1223 ///
1224 /// This configuration option will disable this heuristic resulting in
1225 /// all scheduled tasks being pushed into the worker-local queue, which
1226 /// is stealable.
1227 ///
1228 /// Consider trying this option when the task "scheduled" time is high
1229 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1230 /// collect this data.
1231 ///
1232 /// # Unstable
1233 ///
1234 /// This configuration option is considered a workaround for the LIFO
1235 /// slot not being stealable. When the slot becomes stealable, we will
1236 /// revisit whether or not this option is necessary. See
1237 /// issue [tokio-rs/tokio#4941].
1238 ///
1239 /// # Examples
1240 ///
1241 /// ```
1242 /// # #[cfg(not(target_family = "wasm"))]
1243 /// # {
1244 /// use tokio::runtime;
1245 ///
1246 /// let rt = runtime::Builder::new_multi_thread()
1247 /// .disable_lifo_slot()
1248 /// .build()
1249 /// .unwrap();
1250 /// # }
1251 /// ```
1252 ///
1253 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1254 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1255 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1256 self.disable_lifo_slot = true;
1257 self
1258 }
1259
1260 /// Specifies the random number generation seed to use within all
1261 /// threads associated with the runtime being built.
1262 ///
1263 /// This option is intended to make certain parts of the runtime
1264 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1265 /// [`tokio::select!`] it will ensure that the order that branches are
1266 /// polled is deterministic.
1267 ///
1268 /// In addition to the code specifying `rng_seed` and interacting with
1269 /// the runtime, the internals of Tokio and the Rust compiler may affect
1270 /// the sequences of random numbers. In order to ensure repeatable
1271 /// results, the version of Tokio, the versions of all other
1272 /// dependencies that interact with Tokio, and the Rust compiler version
1273 /// should also all remain constant.
1274 ///
1275 /// # Examples
1276 ///
1277 /// ```
1278 /// # use tokio::runtime::{self, RngSeed};
1279 /// # pub fn main() {
1280 /// let seed = RngSeed::from_bytes(b"place your seed here");
1281 /// let rt = runtime::Builder::new_current_thread()
1282 /// .rng_seed(seed)
1283 /// .build();
1284 /// # }
1285 /// ```
1286 ///
1287 /// [`tokio::select!`]: crate::select
1288 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1289 self.seed_generator = RngSeedGenerator::new(seed);
1290 self
1291 }
1292 }
1293
1294 cfg_unstable_metrics! {
1295 /// Enables tracking the distribution of task poll times.
1296 ///
1297 /// Task poll times are not instrumented by default as doing so requires
1298 /// calling [`Instant::now()`] twice per task poll, which could add
1299 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1300 /// metrics data.
1301 ///
1302 /// The histogram uses fixed bucket sizes. In other words, the histogram
1303 /// buckets are not dynamic based on input values. Use the
1304 /// `metrics_poll_time_histogram` builder methods to configure the
1305 /// histogram details.
1306 ///
1307 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1308 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1309 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1310 /// to select [`LogHistogram`] instead.
1311 ///
1312 /// # Examples
1313 ///
1314 /// ```
1315 /// # #[cfg(not(target_family = "wasm"))]
1316 /// # {
1317 /// use tokio::runtime;
1318 ///
1319 /// let rt = runtime::Builder::new_multi_thread()
1320 /// .enable_metrics_poll_time_histogram()
1321 /// .build()
1322 /// .unwrap();
1323 /// # // Test default values here
1324 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1325 /// # let m = rt.handle().metrics();
1326 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1327 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1328 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1329 /// # }
1330 /// ```
1331 ///
1332 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1333 /// [`Instant::now()`]: std::time::Instant::now
1334 /// [`LogHistogram`]: crate::runtime::LogHistogram
1335 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1336 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1337 self.metrics_poll_count_histogram_enable = true;
1338 self
1339 }
1340
1341 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1342 ///
1343 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1344 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1345 #[doc(hidden)]
1346 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1347 self.enable_metrics_poll_time_histogram()
1348 }
1349
1350 /// Sets the histogram scale for tracking the distribution of task poll
1351 /// times.
1352 ///
1353 /// Tracking the distribution of task poll times can be done using a
1354 /// linear or log scale. When using linear scale, each histogram bucket
1355 /// will represent the same range of poll times. When using log scale,
1356 /// each histogram bucket will cover a range twice as big as the
1357 /// previous bucket.
1358 ///
1359 /// **Default:** linear scale.
1360 ///
1361 /// # Examples
1362 ///
1363 /// ```
1364 /// # #[cfg(not(target_family = "wasm"))]
1365 /// # {
1366 /// use tokio::runtime::{self, HistogramScale};
1367 ///
1368 /// # #[allow(deprecated)]
1369 /// let rt = runtime::Builder::new_multi_thread()
1370 /// .enable_metrics_poll_time_histogram()
1371 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1372 /// .build()
1373 /// .unwrap();
1374 /// # }
1375 /// ```
1376 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1377 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1378 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1379 self
1380 }
1381
1382 /// Configure the histogram for tracking poll times
1383 ///
1384 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1385 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1386 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1387 ///
1388 /// # Examples
1389 /// Configure a [`LogHistogram`] with [default configuration]:
1390 /// ```
1391 /// # #[cfg(not(target_family = "wasm"))]
1392 /// # {
1393 /// use tokio::runtime;
1394 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1395 ///
1396 /// let rt = runtime::Builder::new_multi_thread()
1397 /// .enable_metrics_poll_time_histogram()
1398 /// .metrics_poll_time_histogram_configuration(
1399 /// HistogramConfiguration::log(LogHistogram::default())
1400 /// )
1401 /// .build()
1402 /// .unwrap();
1403 /// # }
1404 /// ```
1405 ///
1406 /// Configure a linear histogram with 100 buckets, each 10μs wide
1407 /// ```
1408 /// # #[cfg(not(target_family = "wasm"))]
1409 /// # {
1410 /// use tokio::runtime;
1411 /// use std::time::Duration;
1412 /// use tokio::runtime::HistogramConfiguration;
1413 ///
1414 /// let rt = runtime::Builder::new_multi_thread()
1415 /// .enable_metrics_poll_time_histogram()
1416 /// .metrics_poll_time_histogram_configuration(
1417 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1418 /// )
1419 /// .build()
1420 /// .unwrap();
1421 /// # }
1422 /// ```
1423 ///
1424 /// Configure a [`LogHistogram`] with the following settings:
1425 /// - Measure times from 100ns to 120s
1426 /// - Max error of 0.1
1427 /// - No more than 1024 buckets
1428 /// ```
1429 /// # #[cfg(not(target_family = "wasm"))]
1430 /// # {
1431 /// use std::time::Duration;
1432 /// use tokio::runtime;
1433 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1434 ///
1435 /// let rt = runtime::Builder::new_multi_thread()
1436 /// .enable_metrics_poll_time_histogram()
1437 /// .metrics_poll_time_histogram_configuration(
1438 /// HistogramConfiguration::log(LogHistogram::builder()
1439 /// .max_value(Duration::from_secs(120))
1440 /// .min_value(Duration::from_nanos(100))
1441 /// .max_error(0.1)
1442 /// .max_buckets(1024)
1443 /// .expect("configuration uses 488 buckets")
1444 /// )
1445 /// )
1446 /// .build()
1447 /// .unwrap();
1448 /// # }
1449 /// ```
1450 ///
1451 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1452 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1453 /// where each bucket is twice the size of the previous bucket.
1454 /// ```rust
1455 /// use std::time::Duration;
1456 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1457 /// let rt = tokio::runtime::Builder::new_current_thread()
1458 /// .enable_all()
1459 /// .enable_metrics_poll_time_histogram()
1460 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1461 /// LogHistogram::builder()
1462 /// .min_value(Duration::from_micros(20))
1463 /// .max_value(Duration::from_millis(4))
1464 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1465 /// .precision_exact(0)
1466 /// .max_buckets(10)
1467 /// .unwrap(),
1468 /// ))
1469 /// .build()
1470 /// .unwrap();
1471 /// ```
1472 ///
1473 /// [`LogHistogram`]: crate::runtime::LogHistogram
1474 /// [default configuration]: crate::runtime::LogHistogramBuilder
1475 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1476 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1477 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1478 self
1479 }
1480
1481 /// Sets the histogram resolution for tracking the distribution of task
1482 /// poll times.
1483 ///
1484 /// The resolution is the histogram's first bucket's range. When using a
1485 /// linear histogram scale, each bucket will cover the same range. When
1486 /// using a log scale, each bucket will cover a range twice as big as
1487 /// the previous bucket. In the log case, the resolution represents the
1488 /// smallest bucket range.
1489 ///
1490 /// Note that, when using log scale, the resolution is rounded up to the
1491 /// nearest power of 2 in nanoseconds.
1492 ///
1493 /// **Default:** 100 microseconds.
1494 ///
1495 /// # Examples
1496 ///
1497 /// ```
1498 /// # #[cfg(not(target_family = "wasm"))]
1499 /// # {
1500 /// use tokio::runtime;
1501 /// use std::time::Duration;
1502 ///
1503 /// # #[allow(deprecated)]
1504 /// let rt = runtime::Builder::new_multi_thread()
1505 /// .enable_metrics_poll_time_histogram()
1506 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1507 /// .build()
1508 /// .unwrap();
1509 /// # }
1510 /// ```
1511 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1512 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1513 assert!(resolution > Duration::from_secs(0));
1514 // Sanity check the argument and also make the cast below safe.
1515 assert!(resolution <= Duration::from_secs(1));
1516
1517 let resolution = resolution.as_nanos() as u64;
1518
1519 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1520 self
1521 }
1522
1523 /// Sets the number of buckets for the histogram tracking the
1524 /// distribution of task poll times.
1525 ///
1526 /// The last bucket tracks all greater values that fall out of other
1527 /// ranges. So, configuring the histogram using a linear scale,
1528 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1529 /// polls that take more than 450ms to complete.
1530 ///
1531 /// **Default:** 10
1532 ///
1533 /// # Examples
1534 ///
1535 /// ```
1536 /// # #[cfg(not(target_family = "wasm"))]
1537 /// # {
1538 /// use tokio::runtime;
1539 ///
1540 /// # #[allow(deprecated)]
1541 /// let rt = runtime::Builder::new_multi_thread()
1542 /// .enable_metrics_poll_time_histogram()
1543 /// .metrics_poll_count_histogram_buckets(15)
1544 /// .build()
1545 /// .unwrap();
1546 /// # }
1547 /// ```
1548 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1549 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1550 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1551 self
1552 }
1553 }
1554
1555 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1556 use crate::runtime::runtime::Scheduler;
1557
1558 let (scheduler, handle, blocking_pool) =
1559 self.build_current_thread_runtime_components(None)?;
1560
1561 Ok(Runtime::from_parts(
1562 Scheduler::CurrentThread(scheduler),
1563 handle,
1564 blocking_pool,
1565 ))
1566 }
1567
1568 #[cfg(tokio_unstable)]
1569 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1570 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1571
1572 let tid = std::thread::current().id();
1573
1574 let (scheduler, handle, blocking_pool) =
1575 self.build_current_thread_runtime_components(Some(tid))?;
1576
1577 Ok(LocalRuntime::from_parts(
1578 LocalRuntimeScheduler::CurrentThread(scheduler),
1579 handle,
1580 blocking_pool,
1581 ))
1582 }
1583
1584 fn build_current_thread_runtime_components(
1585 &mut self,
1586 local_tid: Option<ThreadId>,
1587 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1588 use crate::runtime::scheduler;
1589 use crate::runtime::Config;
1590
1591 let mut cfg = self.get_cfg();
1592 cfg.timer_flavor = TimerFlavor::Traditional;
1593 let (driver, driver_handle) = driver::Driver::new(cfg)?;
1594
1595 // Blocking pool
1596 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1597 let blocking_spawner = blocking_pool.spawner().clone();
1598
1599 // Generate a rng seed for this runtime.
1600 let seed_generator_1 = self.seed_generator.next_generator();
1601 let seed_generator_2 = self.seed_generator.next_generator();
1602
1603 // And now put a single-threaded scheduler on top of the timer. When
1604 // there are no futures ready to do something, it'll let the timer or
1605 // the reactor to generate some new stimuli for the futures to continue
1606 // in their life.
1607 let (scheduler, handle) = CurrentThread::new(
1608 driver,
1609 driver_handle,
1610 blocking_spawner,
1611 seed_generator_2,
1612 Config {
1613 before_park: self.before_park.clone(),
1614 after_unpark: self.after_unpark.clone(),
1615 before_spawn: self.before_spawn.clone(),
1616 #[cfg(tokio_unstable)]
1617 before_poll: self.before_poll.clone(),
1618 #[cfg(tokio_unstable)]
1619 after_poll: self.after_poll.clone(),
1620 after_termination: self.after_termination.clone(),
1621 global_queue_interval: self.global_queue_interval,
1622 event_interval: self.event_interval,
1623 #[cfg(tokio_unstable)]
1624 unhandled_panic: self.unhandled_panic.clone(),
1625 disable_lifo_slot: self.disable_lifo_slot,
1626 seed_generator: seed_generator_1,
1627 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1628 },
1629 local_tid,
1630 );
1631
1632 let handle = Handle {
1633 inner: scheduler::Handle::CurrentThread(handle),
1634 };
1635
1636 Ok((scheduler, handle, blocking_pool))
1637 }
1638
1639 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1640 if self.metrics_poll_count_histogram_enable {
1641 Some(self.metrics_poll_count_histogram.clone())
1642 } else {
1643 None
1644 }
1645 }
1646}
1647
1648cfg_io_driver! {
1649 impl Builder {
1650 /// Enables the I/O driver.
1651 ///
1652 /// Doing this enables using net, process, signal, and some I/O types on
1653 /// the runtime.
1654 ///
1655 /// # Examples
1656 ///
1657 /// ```
1658 /// use tokio::runtime;
1659 ///
1660 /// let rt = runtime::Builder::new_multi_thread()
1661 /// .enable_io()
1662 /// .build()
1663 /// .unwrap();
1664 /// ```
1665 pub fn enable_io(&mut self) -> &mut Self {
1666 self.enable_io = true;
1667 self
1668 }
1669
1670 /// Enables the I/O driver and configures the max number of events to be
1671 /// processed per tick.
1672 ///
1673 /// # Examples
1674 ///
1675 /// ```
1676 /// use tokio::runtime;
1677 ///
1678 /// let rt = runtime::Builder::new_current_thread()
1679 /// .enable_io()
1680 /// .max_io_events_per_tick(1024)
1681 /// .build()
1682 /// .unwrap();
1683 /// ```
1684 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1685 self.nevents = capacity;
1686 self
1687 }
1688 }
1689}
1690
1691cfg_time! {
1692 impl Builder {
1693 /// Enables the time driver.
1694 ///
1695 /// Doing this enables using `tokio::time` on the runtime.
1696 ///
1697 /// # Examples
1698 ///
1699 /// ```
1700 /// # #[cfg(not(target_family = "wasm"))]
1701 /// # {
1702 /// use tokio::runtime;
1703 ///
1704 /// let rt = runtime::Builder::new_multi_thread()
1705 /// .enable_time()
1706 /// .build()
1707 /// .unwrap();
1708 /// # }
1709 /// ```
1710 pub fn enable_time(&mut self) -> &mut Self {
1711 self.enable_time = true;
1712 self
1713 }
1714 }
1715}
1716
1717cfg_io_uring! {
1718 impl Builder {
1719 /// Enables the tokio's io_uring driver.
1720 ///
1721 /// Doing this enables using io_uring operations on the runtime.
1722 ///
1723 /// # Examples
1724 ///
1725 /// ```
1726 /// use tokio::runtime;
1727 ///
1728 /// let rt = runtime::Builder::new_multi_thread()
1729 /// .enable_io_uring()
1730 /// .build()
1731 /// .unwrap();
1732 /// ```
1733 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1734 pub fn enable_io_uring(&mut self) -> &mut Self {
1735 // Currently, the uring flag is equivalent to `enable_io`.
1736 self.enable_io = true;
1737 self
1738 }
1739 }
1740}
1741
1742cfg_test_util! {
1743 impl Builder {
1744 /// Controls if the runtime's clock starts paused or advancing.
1745 ///
1746 /// Pausing time requires the current-thread runtime; construction of
1747 /// the runtime will panic otherwise.
1748 ///
1749 /// # Examples
1750 ///
1751 /// ```
1752 /// use tokio::runtime;
1753 ///
1754 /// let rt = runtime::Builder::new_current_thread()
1755 /// .enable_time()
1756 /// .start_paused(true)
1757 /// .build()
1758 /// .unwrap();
1759 /// ```
1760 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1761 self.start_paused = start_paused;
1762 self
1763 }
1764 }
1765}
1766
1767cfg_rt_multi_thread! {
1768 impl Builder {
1769 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1770 use crate::loom::sys::num_cpus;
1771 use crate::runtime::{Config, runtime::Scheduler};
1772 use crate::runtime::scheduler::{self, MultiThread};
1773
1774 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1775
1776 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1777
1778 // Create the blocking pool
1779 let blocking_pool =
1780 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1781 let blocking_spawner = blocking_pool.spawner().clone();
1782
1783 // Generate a rng seed for this runtime.
1784 let seed_generator_1 = self.seed_generator.next_generator();
1785 let seed_generator_2 = self.seed_generator.next_generator();
1786
1787 let (scheduler, handle, launch) = MultiThread::new(
1788 worker_threads,
1789 driver,
1790 driver_handle,
1791 blocking_spawner,
1792 seed_generator_2,
1793 Config {
1794 before_park: self.before_park.clone(),
1795 after_unpark: self.after_unpark.clone(),
1796 before_spawn: self.before_spawn.clone(),
1797 #[cfg(tokio_unstable)]
1798 before_poll: self.before_poll.clone(),
1799 #[cfg(tokio_unstable)]
1800 after_poll: self.after_poll.clone(),
1801 after_termination: self.after_termination.clone(),
1802 global_queue_interval: self.global_queue_interval,
1803 event_interval: self.event_interval,
1804 #[cfg(tokio_unstable)]
1805 unhandled_panic: self.unhandled_panic.clone(),
1806 disable_lifo_slot: self.disable_lifo_slot,
1807 seed_generator: seed_generator_1,
1808 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1809 },
1810 self.timer_flavor,
1811 );
1812
1813 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1814
1815 // Spawn the thread pool workers
1816 let _enter = handle.enter();
1817 launch.launch();
1818
1819 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1820 }
1821 }
1822}
1823
1824impl fmt::Debug for Builder {
1825 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1826 fmt.debug_struct("Builder")
1827 .field("worker_threads", &self.worker_threads)
1828 .field("max_blocking_threads", &self.max_blocking_threads)
1829 .field(
1830 "thread_name",
1831 &"<dyn Fn() -> String + Send + Sync + 'static>",
1832 )
1833 .field("thread_stack_size", &self.thread_stack_size)
1834 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1835 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1836 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1837 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1838 .finish()
1839 }
1840}