tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4use crate::runtime;
5use crate::runtime::task::{
6 self, JoinHandle, LocalOwnedTasks, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
9use crate::sync::AtomicWaker;
10use crate::util::trace::SpawnMeta;
11use crate::util::RcCell;
12
13use std::cell::Cell;
14use std::collections::VecDeque;
15use std::fmt;
16use std::future::Future;
17use std::marker::PhantomData;
18use std::mem;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::task::Poll;
22
23use pin_project_lite::pin_project;
24
25cfg_rt! {
26 /// A set of tasks which are executed on the same thread.
27 ///
28 /// In some cases, it is necessary to run one or more futures that do not
29 /// implement [`Send`] and thus are unsafe to send between threads. In these
30 /// cases, a [local task set] may be used to schedule one or more `!Send`
31 /// futures to run together on the same thread.
32 ///
33 /// For example, the following code will not compile:
34 ///
35 /// ```rust,compile_fail
36 /// use std::rc::Rc;
37 ///
38 /// #[tokio::main]
39 /// async fn main() {
40 /// // `Rc` does not implement `Send`, and thus may not be sent between
41 /// // threads safely.
42 /// let nonsend_data = Rc::new("my nonsend data...");
43 ///
44 /// let nonsend_data = nonsend_data.clone();
45 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
46 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
47 /// // will not compile.
48 /// tokio::spawn(async move {
49 /// println!("{}", nonsend_data);
50 /// // ...
51 /// }).await.unwrap();
52 /// }
53 /// ```
54 ///
55 /// # Use with `run_until`
56 ///
57 /// To spawn `!Send` futures, we can use a local task set to schedule them
58 /// on the thread calling [`Runtime::block_on`]. When running inside of the
59 /// local task set, we can use [`task::spawn_local`], which can spawn
60 /// `!Send` futures. For example:
61 ///
62 /// ```rust
63 /// use std::rc::Rc;
64 /// use tokio::task;
65 ///
66 /// # #[tokio::main(flavor = "current_thread")]
67 /// # async fn main() {
68 /// let nonsend_data = Rc::new("my nonsend data...");
69 ///
70 /// // Construct a local task set that can run `!Send` futures.
71 /// let local = task::LocalSet::new();
72 ///
73 /// // Run the local task set.
74 /// local.run_until(async move {
75 /// let nonsend_data = nonsend_data.clone();
76 /// // `spawn_local` ensures that the future is spawned on the local
77 /// // task set.
78 /// task::spawn_local(async move {
79 /// println!("{}", nonsend_data);
80 /// // ...
81 /// }).await.unwrap();
82 /// }).await;
83 /// # }
84 /// ```
85 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
86 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
87 /// cannot be used inside a task spawned with `tokio::spawn`.
88 ///
89 /// ## Awaiting a `LocalSet`
90 ///
91 /// Additionally, a `LocalSet` itself implements `Future`, completing when
92 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
93 /// several futures on a `LocalSet` and drive the whole set until they
94 /// complete. For example,
95 ///
96 /// ```rust
97 /// use tokio::{task, time};
98 /// use std::rc::Rc;
99 ///
100 /// # #[tokio::main(flavor = "current_thread")]
101 /// # async fn main() {
102 /// let nonsend_data = Rc::new("world");
103 /// let local = task::LocalSet::new();
104 ///
105 /// let nonsend_data2 = nonsend_data.clone();
106 /// local.spawn_local(async move {
107 /// // ...
108 /// println!("hello {}", nonsend_data2)
109 /// });
110 ///
111 /// local.spawn_local(async move {
112 /// time::sleep(time::Duration::from_millis(100)).await;
113 /// println!("goodbye {}", nonsend_data)
114 /// });
115 ///
116 /// // ...
117 ///
118 /// local.await;
119 /// # }
120 /// ```
121 /// **Note:** Awaiting a `LocalSet` can only be done inside
122 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
123 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
124 /// `tokio::spawn`.
125 ///
126 /// ## Use inside `tokio::spawn`
127 ///
128 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
129 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
130 /// something else. The solution is to create the `LocalSet` somewhere else,
131 /// and communicate with it using an [`mpsc`] channel.
132 ///
133 /// The following example puts the `LocalSet` inside a new thread.
134 /// ```
135 /// # #[cfg(not(target_family = "wasm"))]
136 /// # {
137 /// use tokio::runtime::Builder;
138 /// use tokio::sync::{mpsc, oneshot};
139 /// use tokio::task::LocalSet;
140 ///
141 /// // This struct describes the task you want to spawn. Here we include
142 /// // some simple examples. The oneshot channel allows sending a response
143 /// // to the spawner.
144 /// #[derive(Debug)]
145 /// enum Task {
146 /// PrintNumber(u32),
147 /// AddOne(u32, oneshot::Sender<u32>),
148 /// }
149 ///
150 /// #[derive(Clone)]
151 /// struct LocalSpawner {
152 /// send: mpsc::UnboundedSender<Task>,
153 /// }
154 ///
155 /// impl LocalSpawner {
156 /// pub fn new() -> Self {
157 /// let (send, mut recv) = mpsc::unbounded_channel();
158 ///
159 /// let rt = Builder::new_current_thread()
160 /// .enable_all()
161 /// .build()
162 /// .unwrap();
163 ///
164 /// std::thread::spawn(move || {
165 /// let local = LocalSet::new();
166 ///
167 /// local.spawn_local(async move {
168 /// while let Some(new_task) = recv.recv().await {
169 /// tokio::task::spawn_local(run_task(new_task));
170 /// }
171 /// // If the while loop returns, then all the LocalSpawner
172 /// // objects have been dropped.
173 /// });
174 ///
175 /// // This will return once all senders are dropped and all
176 /// // spawned tasks have returned.
177 /// rt.block_on(local);
178 /// });
179 ///
180 /// Self {
181 /// send,
182 /// }
183 /// }
184 ///
185 /// pub fn spawn(&self, task: Task) {
186 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
187 /// }
188 /// }
189 ///
190 /// // This task may do !Send stuff. We use printing a number as an example,
191 /// // but it could be anything.
192 /// //
193 /// // The Task struct is an enum to support spawning many different kinds
194 /// // of operations.
195 /// async fn run_task(task: Task) {
196 /// match task {
197 /// Task::PrintNumber(n) => {
198 /// println!("{}", n);
199 /// },
200 /// Task::AddOne(n, response) => {
201 /// // We ignore failures to send the response.
202 /// let _ = response.send(n + 1);
203 /// },
204 /// }
205 /// }
206 ///
207 /// #[tokio::main]
208 /// async fn main() {
209 /// let spawner = LocalSpawner::new();
210 ///
211 /// let (send, response) = oneshot::channel();
212 /// spawner.spawn(Task::AddOne(10, send));
213 /// let eleven = response.await.unwrap();
214 /// assert_eq!(eleven, 11);
215 /// }
216 /// # }
217 /// ```
218 ///
219 /// [`Send`]: trait@std::marker::Send
220 /// [local task set]: struct@LocalSet
221 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
222 /// [`task::spawn_local`]: fn@spawn_local
223 /// [`mpsc`]: mod@crate::sync::mpsc
224 pub struct LocalSet {
225 /// Current scheduler tick.
226 tick: Cell<u8>,
227
228 /// State available from thread-local.
229 context: Rc<Context>,
230
231 /// This type should not be Send.
232 _not_send: PhantomData<*const ()>,
233 }
234}
235
236/// State available from the thread-local.
237struct Context {
238 /// State shared between threads.
239 shared: Arc<Shared>,
240
241 /// True if a task panicked without being handled and the local set is
242 /// configured to shutdown on unhandled panic.
243 unhandled_panic: Cell<bool>,
244}
245
246/// `LocalSet` state shared between threads.
247struct Shared {
248 /// # Safety
249 ///
250 /// This field must *only* be accessed from the thread that owns the
251 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
252 local_state: LocalState,
253
254 /// Remote run queue sender.
255 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
256
257 /// Wake the `LocalSet` task.
258 waker: AtomicWaker,
259
260 /// How to respond to unhandled task panics.
261 #[cfg(tokio_unstable)]
262 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
263}
264
265/// Tracks the `LocalSet` state that must only be accessed from the thread that
266/// created the `LocalSet`.
267struct LocalState {
268 /// The `ThreadId` of the thread that owns the `LocalSet`.
269 owner: ThreadId,
270
271 /// Local run queue sender and receiver.
272 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
273
274 /// Collection of all active tasks spawned onto this executor.
275 owned: LocalOwnedTasks<Arc<Shared>>,
276}
277
278pin_project! {
279 #[derive(Debug)]
280 struct RunUntil<'a, F> {
281 local_set: &'a LocalSet,
282 #[pin]
283 future: F,
284 }
285}
286
287tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
288 ctx: RcCell::new(),
289 wake_on_schedule: Cell::new(false),
290} });
291
292struct LocalData {
293 ctx: RcCell<Context>,
294 wake_on_schedule: Cell<bool>,
295}
296
297impl LocalData {
298 /// Should be called except when we call `LocalSet::enter`.
299 /// Especially when we poll a `LocalSet`.
300 #[must_use = "dropping this guard will reset the entered state"]
301 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
302 let ctx = self.ctx.replace(Some(ctx));
303 let wake_on_schedule = self.wake_on_schedule.replace(false);
304 LocalDataEnterGuard {
305 local_data_ref: self,
306 ctx,
307 wake_on_schedule,
308 }
309 }
310}
311
312/// A guard for `LocalData::enter()`
313struct LocalDataEnterGuard<'a> {
314 local_data_ref: &'a LocalData,
315 ctx: Option<Rc<Context>>,
316 wake_on_schedule: bool,
317}
318
319impl<'a> Drop for LocalDataEnterGuard<'a> {
320 fn drop(&mut self) {
321 self.local_data_ref.ctx.set(self.ctx.take());
322 self.local_data_ref
323 .wake_on_schedule
324 .set(self.wake_on_schedule)
325 }
326}
327
328cfg_rt! {
329 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
330 ///
331 /// This is possible when either using one of these types
332 /// explicitly, or (with `tokio_unstable`) by opting to use the
333 /// `"local"` runtime flavor in `tokio::main`:
334 ///
335 /// ```ignore
336 /// #[tokio::main(flavor = "local")]
337 /// ```
338 ///
339 /// The spawned future will run on the same thread that called `spawn_local`.
340 ///
341 /// The provided future will start running in the background immediately
342 /// when `spawn_local` is called, even if you don't await the returned
343 /// `JoinHandle`.
344 ///
345 /// # Panics
346 ///
347 /// This function panics if called outside of a [`LocalSet`] or [`LocalRuntime`].
348 ///
349 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
350 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
351 /// `spawn_local` if you want to stay within the `LocalSet`.
352 ///
353 /// # Examples
354 ///
355 /// With `LocalSet`:
356 ///
357 /// ```rust
358 /// use std::rc::Rc;
359 /// use tokio::task;
360 ///
361 /// # #[tokio::main(flavor = "current_thread")]
362 /// # async fn main() {
363 /// let nonsend_data = Rc::new("my nonsend data...");
364 ///
365 /// let local = task::LocalSet::new();
366 ///
367 /// // Run the local task set.
368 /// local.run_until(async move {
369 /// let nonsend_data = nonsend_data.clone();
370 /// task::spawn_local(async move {
371 /// println!("{}", nonsend_data);
372 /// // ...
373 /// }).await.unwrap();
374 /// }).await;
375 /// # }
376 /// ```
377 /// With local runtime flavor ([Unstable API][unstable] only).
378 ///
379 /// ```rust
380 /// # #[cfg(tokio_unstable)]
381 /// #[tokio::main(flavor = "local")]
382 /// async fn main() {
383 /// let join = tokio::task::spawn_local(async {
384 /// println!("my nonsend data...")
385 /// });
386 ///
387 /// join.await.unwrap()
388 /// }
389 /// # #[cfg(not(tokio_unstable))]
390 /// # fn main() {}
391 ///
392 /// ```
393 ///
394 /// [`LocalSet`]: struct@crate::task::LocalSet
395 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
396 /// [`tokio::spawn`]: fn@crate::task::spawn
397 /// [unstable]: ../../tokio/index.html#unstable-features
398 #[track_caller]
399 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
400 where
401 F: Future + 'static,
402 F::Output: 'static,
403 {
404 let fut_size = std::mem::size_of::<F>();
405 if fut_size > BOX_FUTURE_THRESHOLD {
406 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
407 } else {
408 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
409 }
410 }
411
412
413 #[track_caller]
414 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
415 where F: Future + 'static,
416 F::Output: 'static
417 {
418 use crate::runtime::{context, task};
419
420 let mut future = Some(future);
421
422 let res = context::with_current(|handle| {
423 Some(if handle.is_local() {
424 if !handle.can_spawn_local_on_local_runtime() {
425 return None;
426 }
427
428 let future = future.take().unwrap();
429
430 #[cfg(all(
431 tokio_unstable,
432 feature = "taskdump",
433 feature = "rt",
434 target_os = "linux",
435 any(
436 target_arch = "aarch64",
437 target_arch = "x86",
438 target_arch = "x86_64"
439 )
440 ))]
441 let future = task::trace::Trace::root(future);
442 let id = task::Id::next();
443 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
444
445 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
446 unsafe { handle.spawn_local(task, id, meta.spawned_at) }
447 } else {
448 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
449 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
450 Some(cx) => cx.spawn(future.take().unwrap(), meta)
451 }
452 })
453 });
454
455 match res {
456 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
457 Ok(Some(join_handle)) => join_handle,
458 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
459 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
460 Some(cx) => cx.spawn(future.unwrap(), meta)
461 }
462 }
463 }
464}
465
466/// Initial queue capacity.
467const INITIAL_CAPACITY: usize = 64;
468
469/// Max number of tasks to poll per tick.
470const MAX_TASKS_PER_TICK: usize = 61;
471
472/// How often it check the remote queue first.
473const REMOTE_FIRST_INTERVAL: u8 = 31;
474
475/// Context guard for `LocalSet`
476pub struct LocalEnterGuard {
477 ctx: Option<Rc<Context>>,
478
479 /// Distinguishes whether the context was entered or being polled.
480 /// When we enter it, the value `wake_on_schedule` is set. In this case
481 /// `spawn_local` refers the context, whereas it is not being polled now.
482 wake_on_schedule: bool,
483}
484
485impl Drop for LocalEnterGuard {
486 fn drop(&mut self) {
487 CURRENT.with(
488 |LocalData {
489 ctx,
490 wake_on_schedule,
491 }| {
492 ctx.set(self.ctx.take());
493 wake_on_schedule.set(self.wake_on_schedule);
494 },
495 );
496 }
497}
498
499impl fmt::Debug for LocalEnterGuard {
500 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501 f.debug_struct("LocalEnterGuard").finish()
502 }
503}
504
505impl LocalSet {
506 /// Returns a new local task set.
507 pub fn new() -> LocalSet {
508 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
509
510 LocalSet {
511 tick: Cell::new(0),
512 context: Rc::new(Context {
513 shared: Arc::new(Shared {
514 local_state: LocalState {
515 owner,
516 owned: LocalOwnedTasks::new(),
517 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
518 },
519 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
520 waker: AtomicWaker::new(),
521 #[cfg(tokio_unstable)]
522 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
523 }),
524 unhandled_panic: Cell::new(false),
525 }),
526 _not_send: PhantomData,
527 }
528 }
529
530 /// Enters the context of this `LocalSet`.
531 ///
532 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
533 /// context you are inside.
534 ///
535 /// [`spawn_local`]: fn@crate::task::spawn_local
536 pub fn enter(&self) -> LocalEnterGuard {
537 CURRENT.with(
538 |LocalData {
539 ctx,
540 wake_on_schedule,
541 ..
542 }| {
543 let ctx = ctx.replace(Some(self.context.clone()));
544 let wake_on_schedule = wake_on_schedule.replace(true);
545 LocalEnterGuard {
546 ctx,
547 wake_on_schedule,
548 }
549 },
550 )
551 }
552
553 /// Spawns a `!Send` task onto the local task set.
554 ///
555 /// This task is guaranteed to be run on the current thread.
556 ///
557 /// Unlike the free function [`spawn_local`], this method may be used to
558 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
559 /// future will start running once the `LocalSet` is next started, even if
560 /// you don't await the returned `JoinHandle`.
561 ///
562 /// # Examples
563 ///
564 /// ```rust
565 /// use tokio::task;
566 ///
567 /// # #[tokio::main(flavor = "current_thread")]
568 /// # async fn main() {
569 /// let local = task::LocalSet::new();
570 ///
571 /// // Spawn a future on the local set. This future will be run when
572 /// // we call `run_until` to drive the task set.
573 /// local.spawn_local(async {
574 /// // ...
575 /// });
576 ///
577 /// // Run the local task set.
578 /// local.run_until(async move {
579 /// // ...
580 /// }).await;
581 ///
582 /// // When `run` finishes, we can spawn _more_ futures, which will
583 /// // run in subsequent calls to `run_until`.
584 /// local.spawn_local(async {
585 /// // ...
586 /// });
587 ///
588 /// local.run_until(async move {
589 /// // ...
590 /// }).await;
591 /// # }
592 /// ```
593 /// [`spawn_local`]: fn@spawn_local
594 #[track_caller]
595 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
596 where
597 F: Future + 'static,
598 F::Output: 'static,
599 {
600 let fut_size = mem::size_of::<F>();
601 if fut_size > BOX_FUTURE_THRESHOLD {
602 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
603 } else {
604 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
605 }
606 }
607
608 /// Runs a future to completion on the provided runtime, driving any local
609 /// futures spawned on this task set on the current thread.
610 ///
611 /// This runs the given future on the runtime, blocking until it is
612 /// complete, and yielding its resolved result. Any tasks or timers which
613 /// the future spawns internally will be executed on the runtime. The future
614 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
615 /// current thread.
616 ///
617 /// This method should not be called from an asynchronous context.
618 ///
619 /// # Panics
620 ///
621 /// This function panics if the executor is at capacity, if the provided
622 /// future panics, or if called within an asynchronous execution context.
623 ///
624 /// # Notes
625 ///
626 /// Since this function internally calls [`Runtime::block_on`], and drives
627 /// futures in the local task set inside that call to `block_on`, the local
628 /// futures may not use [in-place blocking]. If a blocking call needs to be
629 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
630 ///
631 /// For example, this will panic:
632 /// ```should_panic,ignore-wasm
633 /// use tokio::runtime::Runtime;
634 /// use tokio::task;
635 ///
636 /// let rt = Runtime::new().unwrap();
637 /// let local = task::LocalSet::new();
638 /// local.block_on(&rt, async {
639 /// let join = task::spawn_local(async {
640 /// let blocking_result = task::block_in_place(|| {
641 /// // ...
642 /// });
643 /// // ...
644 /// });
645 /// join.await.unwrap();
646 /// })
647 /// ```
648 /// This, however, will not panic:
649 /// ```
650 /// # #[cfg(not(target_family = "wasm"))]
651 /// # {
652 /// use tokio::runtime::Runtime;
653 /// use tokio::task;
654 ///
655 /// let rt = Runtime::new().unwrap();
656 /// let local = task::LocalSet::new();
657 /// local.block_on(&rt, async {
658 /// let join = task::spawn_local(async {
659 /// let blocking_result = task::spawn_blocking(|| {
660 /// // ...
661 /// }).await;
662 /// // ...
663 /// });
664 /// join.await.unwrap();
665 /// })
666 /// # }
667 /// ```
668 ///
669 /// [`spawn_local`]: fn@spawn_local
670 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
671 /// [in-place blocking]: fn@crate::task::block_in_place
672 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
673 #[track_caller]
674 #[cfg(feature = "rt")]
675 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
676 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
677 where
678 F: Future,
679 {
680 rt.block_on(self.run_until(future))
681 }
682
683 /// Runs a future to completion on the local set, returning its output.
684 ///
685 /// This returns a future that runs the given future with a local set,
686 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
687 /// Any local futures spawned on the local set will be driven in the
688 /// background until the future passed to `run_until` completes. When the future
689 /// passed to `run_until` finishes, any local futures which have not completed
690 /// will remain on the local set, and will be driven on subsequent calls to
691 /// `run_until` or when [awaiting the local set] itself.
692 ///
693 /// # Cancel safety
694 ///
695 /// This method is cancel safe when `future` is cancel safe.
696 ///
697 /// # Examples
698 ///
699 /// ```rust
700 /// use tokio::task;
701 ///
702 /// # #[tokio::main(flavor = "current_thread")]
703 /// # async fn main() {
704 /// task::LocalSet::new().run_until(async {
705 /// task::spawn_local(async move {
706 /// // ...
707 /// }).await.unwrap();
708 /// // ...
709 /// }).await;
710 /// # }
711 /// ```
712 ///
713 /// [`spawn_local`]: fn@spawn_local
714 /// [awaiting the local set]: #awaiting-a-localset
715 pub async fn run_until<F>(&self, future: F) -> F::Output
716 where
717 F: Future,
718 {
719 let run_until = RunUntil {
720 future,
721 local_set: self,
722 };
723 run_until.await
724 }
725
726 #[track_caller]
727 pub(in crate::task) fn spawn_named<F>(
728 &self,
729 future: F,
730 meta: SpawnMeta<'_>,
731 ) -> JoinHandle<F::Output>
732 where
733 F: Future + 'static,
734 F::Output: 'static,
735 {
736 self.spawn_named_inner(future, meta)
737 }
738
739 #[track_caller]
740 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
741 where
742 F: Future + 'static,
743 F::Output: 'static,
744 {
745 let handle = self.context.spawn(future, meta);
746
747 // Because a task was spawned from *outside* the `LocalSet`, wake the
748 // `LocalSet` future to execute the new task, if it hasn't been woken.
749 //
750 // Spawning via the free fn `spawn` does not require this, as it can
751 // only be called from *within* a future executing on the `LocalSet` —
752 // in that case, the `LocalSet` must already be awake.
753 self.context.shared.waker.wake();
754 handle
755 }
756
757 /// Ticks the scheduler, returning whether the local future needs to be
758 /// notified again.
759 fn tick(&self) -> bool {
760 for _ in 0..MAX_TASKS_PER_TICK {
761 // Make sure we didn't hit an unhandled panic
762 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
763
764 match self.next_task() {
765 // Run the task
766 //
767 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
768 // used. We are responsible for maintaining the invariant that
769 // `run_unchecked` is only called on threads that spawned the
770 // task initially. Because `LocalSet` itself is `!Send`, and
771 // `spawn_local` spawns into the `LocalSet` on the current
772 // thread, the invariant is maintained.
773 Some(task) => crate::task::coop::budget(|| task.run()),
774 // We have fully drained the queue of notified tasks, so the
775 // local future doesn't need to be notified again — it can wait
776 // until something else wakes a task in the local set.
777 None => return false,
778 }
779 }
780
781 true
782 }
783
784 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
785 let tick = self.tick.get();
786 self.tick.set(tick.wrapping_add(1));
787
788 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
789 self.context
790 .shared
791 .queue
792 .lock()
793 .as_mut()
794 .and_then(|queue| queue.pop_front())
795 .or_else(|| self.pop_local())
796 } else {
797 self.pop_local().or_else(|| {
798 self.context
799 .shared
800 .queue
801 .lock()
802 .as_mut()
803 .and_then(VecDeque::pop_front)
804 })
805 };
806
807 task.map(|task| unsafe {
808 // Safety: because the `LocalSet` itself is `!Send`, we know we are
809 // on the same thread if we have access to the `LocalSet`, and can
810 // therefore access the local run queue.
811 self.context.shared.local_state.assert_owner(task)
812 })
813 }
814
815 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
816 unsafe {
817 // Safety: because the `LocalSet` itself is `!Send`, we know we are
818 // on the same thread if we have access to the `LocalSet`, and can
819 // therefore access the local run queue.
820 self.context.shared.local_state.task_pop_front()
821 }
822 }
823
824 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
825 CURRENT.with(|local_data| {
826 let _guard = local_data.enter(self.context.clone());
827 f()
828 })
829 }
830
831 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
832 /// fails.
833 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
834 let mut f = Some(f);
835
836 let res = CURRENT.try_with(|local_data| {
837 let _guard = local_data.enter(self.context.clone());
838 (f.take().unwrap())()
839 });
840
841 match res {
842 Ok(res) => res,
843 Err(_access_error) => (f.take().unwrap())(),
844 }
845 }
846
847 /// Returns the [`Id`] of the current [`LocalSet`] runtime.
848 ///
849 /// # Examples
850 ///
851 /// ```rust
852 /// use tokio::task;
853 ///
854 /// # #[tokio::main(flavor = "current_thread")]
855 /// # async fn main() {
856 /// let local_set = task::LocalSet::new();
857 /// println!("Local set id: {}", local_set.id());
858 /// # }
859 /// ```
860 ///
861 /// [`Id`]: struct@crate::runtime::Id
862 pub fn id(&self) -> runtime::Id {
863 runtime::Id::new(self.context.shared.local_state.owned.id)
864 }
865}
866
867cfg_unstable! {
868 impl LocalSet {
869 /// Configure how the `LocalSet` responds to an unhandled panic on a
870 /// spawned task.
871 ///
872 /// By default, an unhandled panic (i.e. a panic not caught by
873 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
874 /// execution. The panic is error value is forwarded to the task's
875 /// [`JoinHandle`] and all other spawned tasks continue running.
876 ///
877 /// The `unhandled_panic` option enables configuring this behavior.
878 ///
879 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
880 /// spawned tasks have no impact on the `LocalSet`'s execution.
881 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
882 /// shutdown immediately when a spawned task panics even if that
883 /// task's `JoinHandle` has not been dropped. All other spawned tasks
884 /// will immediately terminate and further calls to
885 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
886 ///
887 /// # Panics
888 ///
889 /// This method panics if called after the `LocalSet` has started
890 /// running.
891 ///
892 /// # Unstable
893 ///
894 /// This option is currently unstable and its implementation is
895 /// incomplete. The API may change or be removed in the future. See
896 /// tokio-rs/tokio#4516 for more details.
897 ///
898 /// # Examples
899 ///
900 /// The following demonstrates a `LocalSet` configured to shutdown on
901 /// panic. The first spawned task panics and results in the `LocalSet`
902 /// shutting down. The second spawned task never has a chance to
903 /// execute. The call to `run_until` will panic due to the runtime being
904 /// forcibly shutdown.
905 ///
906 /// ```should_panic
907 /// use tokio::runtime::UnhandledPanic;
908 ///
909 /// # #[tokio::main(flavor = "current_thread")]
910 /// # async fn main() {
911 /// tokio::task::LocalSet::new()
912 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
913 /// .run_until(async {
914 /// tokio::task::spawn_local(async { panic!("boom"); });
915 /// tokio::task::spawn_local(async {
916 /// // This task never completes
917 /// });
918 ///
919 /// // Do some work, but `run_until` will panic before it completes
920 /// # loop { tokio::task::yield_now().await; }
921 /// })
922 /// .await;
923 /// # }
924 /// ```
925 ///
926 /// [`JoinHandle`]: struct@crate::task::JoinHandle
927 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
928 // TODO: This should be set as a builder
929 Rc::get_mut(&mut self.context)
930 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
931 .expect("Unhandled Panic behavior modified after starting LocalSet")
932 .unhandled_panic = behavior;
933 self
934 }
935 }
936}
937
938impl fmt::Debug for LocalSet {
939 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
940 fmt.debug_struct("LocalSet").finish()
941 }
942}
943
944impl Future for LocalSet {
945 type Output = ();
946
947 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
948 let _no_blocking = crate::runtime::context::disallow_block_in_place();
949
950 // Register the waker before starting to work
951 self.context.shared.waker.register_by_ref(cx.waker());
952
953 if self.with(|| self.tick()) {
954 // If `tick` returns true, we need to notify the local future again:
955 // there are still tasks remaining in the run queue.
956 cx.waker().wake_by_ref();
957 Poll::Pending
958
959 // Safety: called from the thread that owns `LocalSet`. Because
960 // `LocalSet` is `!Send`, this is safe.
961 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
962 // If the scheduler has no remaining futures, we're done!
963 Poll::Ready(())
964 } else {
965 // There are still futures in the local set, but we've polled all the
966 // futures in the run queue. Therefore, we can just return Pending
967 // since the remaining futures will be woken from somewhere else.
968 Poll::Pending
969 }
970 }
971}
972
973impl Default for LocalSet {
974 fn default() -> LocalSet {
975 LocalSet::new()
976 }
977}
978
979impl Drop for LocalSet {
980 fn drop(&mut self) {
981 self.with_if_possible(|| {
982 let _no_blocking = crate::runtime::context::disallow_block_in_place();
983
984 // Shut down all tasks in the LocalOwnedTasks and close it to
985 // prevent new tasks from ever being added.
986 unsafe {
987 // Safety: called from the thread that owns `LocalSet`
988 self.context.shared.local_state.close_and_shutdown_all();
989 }
990
991 // We already called shutdown on all tasks above, so there is no
992 // need to call shutdown.
993
994 // Safety: note that this *intentionally* bypasses the unsafe
995 // `Shared::local_queue()` method. This is in order to avoid the
996 // debug assertion that we are on the thread that owns the
997 // `LocalSet`, because on some systems (e.g. at least some macOS
998 // versions), attempting to get the current thread ID can panic due
999 // to the thread's local data that stores the thread ID being
1000 // dropped *before* the `LocalSet`.
1001 //
1002 // Despite avoiding the assertion here, it is safe for us to access
1003 // the local queue in `Drop`, because the `LocalSet` itself is
1004 // `!Send`, so we can reasonably guarantee that it will not be
1005 // `Drop`ped from another thread.
1006 let local_queue = unsafe {
1007 // Safety: called from the thread that owns `LocalSet`
1008 self.context.shared.local_state.take_local_queue()
1009 };
1010 for task in local_queue {
1011 drop(task);
1012 }
1013
1014 // Take the queue from the Shared object to prevent pushing
1015 // notifications to it in the future.
1016 let queue = self.context.shared.queue.lock().take().unwrap();
1017 for task in queue {
1018 drop(task);
1019 }
1020
1021 // Safety: called from the thread that owns `LocalSet`
1022 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
1023 });
1024 }
1025}
1026
1027// === impl Context ===
1028
1029impl Context {
1030 #[track_caller]
1031 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
1032 where
1033 F: Future + 'static,
1034 F::Output: 'static,
1035 {
1036 let id = crate::runtime::task::Id::next();
1037 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1038
1039 // Safety: called from the thread that owns the `LocalSet`
1040 let (handle, notified) = {
1041 self.shared.local_state.assert_called_from_owner_thread();
1042 self.shared.local_state.owned.bind(
1043 future,
1044 self.shared.clone(),
1045 id,
1046 SpawnLocation::capture(),
1047 )
1048 };
1049
1050 if let Some(notified) = notified {
1051 self.shared.schedule(notified);
1052 }
1053
1054 handle
1055 }
1056}
1057
1058// === impl LocalFuture ===
1059
1060impl<T: Future> Future for RunUntil<'_, T> {
1061 type Output = T::Output;
1062
1063 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1064 let me = self.project();
1065
1066 me.local_set.with(|| {
1067 me.local_set
1068 .context
1069 .shared
1070 .waker
1071 .register_by_ref(cx.waker());
1072
1073 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1074 let f = me.future;
1075
1076 if let Poll::Ready(output) = f.poll(cx) {
1077 return Poll::Ready(output);
1078 }
1079
1080 if me.local_set.tick() {
1081 // If `tick` returns `true`, we need to notify the local future again:
1082 // there are still tasks remaining in the run queue.
1083 cx.waker().wake_by_ref();
1084 }
1085
1086 Poll::Pending
1087 })
1088 }
1089}
1090
1091impl Shared {
1092 /// Schedule the provided task on the scheduler.
1093 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1094 CURRENT.with(|localdata| {
1095 match localdata.ctx.get() {
1096 // If the current `LocalSet` is being polled, we don't need to wake it.
1097 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1098 // In this case it is not being polled, so we need to wake it.
1099 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1100 // Safety: if the current `LocalSet` context points to this
1101 // `LocalSet`, then we are on the thread that owns it.
1102 cx.shared.local_state.task_push_back(task);
1103 },
1104
1105 // We are on the thread that owns the `LocalSet`, so we can
1106 // wake to the local queue.
1107 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1108 unsafe {
1109 // Safety: we just checked that the thread ID matches
1110 // the localset's owner, so this is safe.
1111 self.local_state.task_push_back(task);
1112 }
1113 // We still have to wake the `LocalSet`, because it isn't
1114 // currently being polled.
1115 self.waker.wake();
1116 }
1117
1118 // We are *not* on the thread that owns the `LocalSet`, so we
1119 // have to wake to the remote queue.
1120 _ => {
1121 // First, check whether the queue is still there (if not, the
1122 // LocalSet is dropped). Then push to it if so, and if not,
1123 // do nothing.
1124 let mut lock = self.queue.lock();
1125
1126 if let Some(queue) = lock.as_mut() {
1127 queue.push_back(task);
1128 drop(lock);
1129 self.waker.wake();
1130 }
1131 }
1132 }
1133 });
1134 }
1135
1136 fn ptr_eq(&self, other: &Shared) -> bool {
1137 std::ptr::eq(self, other)
1138 }
1139}
1140
1141// This is safe because (and only because) we *pinky pwomise* to never touch the
1142// local run queue except from the thread that owns the `LocalSet`.
1143unsafe impl Sync for Shared {}
1144
1145impl task::Schedule for Arc<Shared> {
1146 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1147 // Safety, this is always called from the thread that owns `LocalSet`
1148 unsafe { self.local_state.task_remove(task) }
1149 }
1150
1151 fn schedule(&self, task: task::Notified<Self>) {
1152 Shared::schedule(self, task);
1153 }
1154
1155 // localset does not currently support task hooks
1156 fn hooks(&self) -> TaskHarnessScheduleHooks {
1157 TaskHarnessScheduleHooks {
1158 task_terminate_callback: None,
1159 }
1160 }
1161
1162 cfg_unstable! {
1163 fn unhandled_panic(&self) {
1164 use crate::runtime::UnhandledPanic;
1165
1166 match self.unhandled_panic {
1167 UnhandledPanic::Ignore => {
1168 // Do nothing
1169 }
1170 UnhandledPanic::ShutdownRuntime => {
1171 // This hook is only called from within the runtime, so
1172 // `CURRENT` should match with `&self`, i.e. there is no
1173 // opportunity for a nested scheduler to be called.
1174 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1175 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1176 cx.unhandled_panic.set(true);
1177 // Safety: this is always called from the thread that owns `LocalSet`
1178 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1179 }
1180 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1181 })
1182 }
1183 }
1184 }
1185 }
1186}
1187
1188impl LocalState {
1189 /// # Safety
1190 ///
1191 /// This method must only be called from the thread who
1192 /// has the same [`ThreadId`] as [`Self::owner`].
1193 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1194 // The caller ensures it is called from the same thread that owns
1195 // the LocalSet.
1196 self.assert_called_from_owner_thread();
1197
1198 self.local_queue
1199 .with_mut(|ptr| unsafe { (*ptr).pop_front() })
1200 }
1201
1202 /// # Safety
1203 ///
1204 /// This method must only be called from the thread who
1205 /// has the same [`ThreadId`] as [`Self::owner`].
1206 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1207 // The caller ensures it is called from the same thread that owns
1208 // the LocalSet.
1209 self.assert_called_from_owner_thread();
1210
1211 self.local_queue
1212 .with_mut(|ptr| unsafe { (*ptr).push_back(task) });
1213 }
1214
1215 /// # Safety
1216 ///
1217 /// This method must only be called from the thread who
1218 /// has the same [`ThreadId`] as [`Self::owner`].
1219 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1220 // The caller ensures it is called from the same thread that owns
1221 // the LocalSet.
1222 self.assert_called_from_owner_thread();
1223
1224 self.local_queue
1225 .with_mut(|ptr| std::mem::take(unsafe { &mut (*ptr) }))
1226 }
1227
1228 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1229 // The caller ensures it is called from the same thread that owns
1230 // the LocalSet.
1231 self.assert_called_from_owner_thread();
1232
1233 self.owned.remove(task)
1234 }
1235
1236 /// Returns true if the `LocalSet` does not have any spawned tasks
1237 unsafe fn owned_is_empty(&self) -> bool {
1238 // The caller ensures it is called from the same thread that owns
1239 // the LocalSet.
1240 self.assert_called_from_owner_thread();
1241
1242 self.owned.is_empty()
1243 }
1244
1245 unsafe fn assert_owner(
1246 &self,
1247 task: task::Notified<Arc<Shared>>,
1248 ) -> task::LocalNotified<Arc<Shared>> {
1249 // The caller ensures it is called from the same thread that owns
1250 // the LocalSet.
1251 self.assert_called_from_owner_thread();
1252
1253 self.owned.assert_owner(task)
1254 }
1255
1256 unsafe fn close_and_shutdown_all(&self) {
1257 // The caller ensures it is called from the same thread that owns
1258 // the LocalSet.
1259 self.assert_called_from_owner_thread();
1260
1261 self.owned.close_and_shutdown_all();
1262 }
1263
1264 #[track_caller]
1265 fn assert_called_from_owner_thread(&self) {
1266 // FreeBSD has some weirdness around thread-local destruction.
1267 // TODO: remove this hack when thread id is cleaned up
1268 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1269 debug_assert!(
1270 // if we couldn't get the thread ID because we're dropping the local
1271 // data, skip the assertion --- the `Drop` impl is not going to be
1272 // called from another thread, because `LocalSet` is `!Send`
1273 context::thread_id()
1274 .map(|id| id == self.owner)
1275 .unwrap_or(true),
1276 "`LocalSet`'s local run queue must not be accessed by another thread!"
1277 );
1278 }
1279}
1280
1281// This is `Send` because it is stored in `Shared`. It is up to the caller to
1282// ensure they are on the same thread that owns the `LocalSet`.
1283unsafe impl Send for LocalState {}
1284
1285#[cfg(all(test, not(loom)))]
1286mod tests {
1287 use super::*;
1288
1289 // Does a `LocalSet` running on a current-thread runtime...basically work?
1290 //
1291 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1292 // a lib test, it will run under Miri, so this is necessary to catch stacked
1293 // borrows violations in the `LocalSet` implementation.
1294 #[test]
1295 fn local_current_thread_scheduler() {
1296 let f = async {
1297 LocalSet::new()
1298 .run_until(async {
1299 spawn_local(async {}).await.unwrap();
1300 })
1301 .await;
1302 };
1303 crate::runtime::Builder::new_current_thread()
1304 .build()
1305 .expect("rt")
1306 .block_on(f)
1307 }
1308
1309 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1310 // same thread, the task is woken to the localset's local queue rather than
1311 // its remote queue.
1312 //
1313 // This test has to be defined in the `local.rs` file as a lib test, rather
1314 // than in `tests/`, because it makes assertions about the local set's
1315 // internal state.
1316 #[test]
1317 fn wakes_to_local_queue() {
1318 use super::*;
1319 use crate::sync::Notify;
1320 let rt = crate::runtime::Builder::new_current_thread()
1321 .build()
1322 .expect("rt");
1323 rt.block_on(async {
1324 let local = LocalSet::new();
1325 let notify = Arc::new(Notify::new());
1326 let task = local.spawn_local({
1327 let notify = notify.clone();
1328 async move {
1329 notify.notified().await;
1330 }
1331 });
1332 let mut run_until = Box::pin(local.run_until(async move {
1333 task.await.unwrap();
1334 }));
1335
1336 // poll the run until future once
1337 std::future::poll_fn(|cx| {
1338 let _ = run_until.as_mut().poll(cx);
1339 Poll::Ready(())
1340 })
1341 .await;
1342
1343 notify.notify_one();
1344 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1345 // TODO(eliza): it would be nice to be able to assert that this is
1346 // the local task.
1347 assert!(
1348 task.is_some(),
1349 "task should have been notified to the LocalSet's local queue"
1350 );
1351 })
1352 }
1353}