tokio/sync/mod.rs
1#![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
2
3//! Synchronization primitives for use in asynchronous contexts.
4//!
5//! Tokio programs tend to be organized as a set of [tasks] where each task
6//! operates independently and may be executed on separate physical threads. The
7//! synchronization primitives provided in this module permit these independent
8//! tasks to communicate together.
9//!
10//! [tasks]: crate::task
11//!
12//! # Message passing
13//!
14//! The most common form of synchronization in a Tokio program is message
15//! passing. Two tasks operate independently and send messages to each other to
16//! synchronize. Doing so has the advantage of avoiding shared state.
17//!
18//! Message passing is implemented using channels. A channel supports sending a
19//! message from one producer task to one or more consumer tasks. There are a
20//! few flavors of channels provided by Tokio. Each channel flavor supports
21//! different message passing patterns. When a channel supports multiple
22//! producers, many separate tasks may **send** messages. When a channel
23//! supports multiple consumers, many different separate tasks may **receive**
24//! messages.
25//!
26//! Tokio provides many different channel flavors as different message passing
27//! patterns are best handled with different implementations.
28//!
29//! ## `oneshot` channel
30//!
31//! The [`oneshot` channel][oneshot] supports sending a **single** value from a
32//! single producer to a single consumer. This channel is usually used to send
33//! the result of a computation to a waiter.
34//!
35//! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a
36//! computation.
37//!
38//! ```
39//! use tokio::sync::oneshot;
40//!
41//! async fn some_computation() -> String {
42//!     "represents the result of the computation".to_string()
43//! }
44//!
45//! #[tokio::main]
46//! async fn main() {
47//!     let (tx, rx) = oneshot::channel();
48//!
49//!     tokio::spawn(async move {
50//!         let res = some_computation().await;
51//!         tx.send(res).unwrap();
52//!     });
53//!
54//!     // Do other work while the computation is happening in the background
55//!
56//!     // Wait for the computation result
57//!     let res = rx.await.unwrap();
58//! }
59//! ```
60//!
61//! Note, if the task produces a computation result as its final
62//! action before terminating, the [`JoinHandle`] can be used to
63//! receive that value instead of allocating resources for the
64//! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If
65//! the task panics, the `Joinhandle` yields `Err` with the panic
66//! cause.
67//!
68//! **Example:**
69//!
70//! ```
71//! async fn some_computation() -> String {
72//!     "the result of the computation".to_string()
73//! }
74//!
75//! #[tokio::main]
76//! async fn main() {
77//!     let join_handle = tokio::spawn(async move {
78//!         some_computation().await
79//!     });
80//!
81//!     // Do other work while the computation is happening in the background
82//!
83//!     // Wait for the computation result
84//!     let res = join_handle.await.unwrap();
85//! }
86//! ```
87//!
88//! [`JoinHandle`]: crate::task::JoinHandle
89//!
90//! ## `mpsc` channel
91//!
92//! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
93//! producers to a single consumer. This channel is often used to send work to a
94//! task or to receive the result of many computations.
95//!
96//! This is also the channel you should use if you want to send many messages
97//! from a single producer to a single consumer. There is no dedicated spsc
98//! channel.
99//!
100//! **Example:** using an mpsc to incrementally stream the results of a series
101//! of computations.
102//!
103//! ```
104//! use tokio::sync::mpsc;
105//!
106//! async fn some_computation(input: u32) -> String {
107//!     format!("the result of computation {}", input)
108//! }
109//!
110//! #[tokio::main]
111//! async fn main() {
112//!     let (tx, mut rx) = mpsc::channel(100);
113//!
114//!     tokio::spawn(async move {
115//!         for i in 0..10 {
116//!             let res = some_computation(i).await;
117//!             tx.send(res).await.unwrap();
118//!         }
119//!     });
120//!
121//!     while let Some(res) = rx.recv().await {
122//!         println!("got = {}", res);
123//!     }
124//! }
125//! ```
126//!
127//! The argument to `mpsc::channel` is the channel capacity. This is the maximum
128//! number of values that can be stored in the channel pending receipt at any
129//! given time. Properly setting this value is key in implementing robust
130//! programs as the channel capacity plays a critical part in handling back
131//! pressure.
132//!
133//! A common concurrency pattern for resource management is to spawn a task
134//! dedicated to managing that resource and using message passing between other
135//! tasks to interact with the resource. The resource may be anything that may
136//! not be concurrently used. Some examples include a socket and program state.
137//! For example, if multiple tasks need to send data over a single socket, spawn
138//! a task to manage the socket and use a channel to synchronize.
139//!
140//! **Example:** sending data from many tasks over a single socket using message
141//! passing.
142//!
143//! ```no_run
144//! use tokio::io::{self, AsyncWriteExt};
145//! use tokio::net::TcpStream;
146//! use tokio::sync::mpsc;
147//!
148//! #[tokio::main]
149//! async fn main() -> io::Result<()> {
150//!     let mut socket = TcpStream::connect("www.example.com:1234").await?;
151//!     let (tx, mut rx) = mpsc::channel(100);
152//!
153//!     for _ in 0..10 {
154//!         // Each task needs its own `tx` handle. This is done by cloning the
155//!         // original handle.
156//!         let tx = tx.clone();
157//!
158//!         tokio::spawn(async move {
159//!             tx.send(&b"data to write"[..]).await.unwrap();
160//!         });
161//!     }
162//!
163//!     // The `rx` half of the channel returns `None` once **all** `tx` clones
164//!     // drop. To ensure `None` is returned, drop the handle owned by the
165//!     // current task. If this `tx` handle is not dropped, there will always
166//!     // be a single outstanding `tx` handle.
167//!     drop(tx);
168//!
169//!     while let Some(res) = rx.recv().await {
170//!         socket.write_all(res).await?;
171//!     }
172//!
173//!     Ok(())
174//! }
175//! ```
176//!
177//! The [`mpsc`] and [`oneshot`] channels can be combined to provide a request /
178//! response type synchronization pattern with a shared resource. A task is
179//! spawned to synchronize a resource and waits on commands received on a
180//! [`mpsc`] channel. Each command includes a [`oneshot`] `Sender` on which the
181//! result of the command is sent.
182//!
183//! **Example:** use a task to synchronize a `u64` counter. Each task sends an
184//! "fetch and increment" command. The counter value **before** the increment is
185//! sent over the provided `oneshot` channel.
186//!
187//! ```
188//! use tokio::sync::{oneshot, mpsc};
189//! use Command::Increment;
190//!
191//! enum Command {
192//!     Increment,
193//!     // Other commands can be added here
194//! }
195//!
196//! #[tokio::main]
197//! async fn main() {
198//!     let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
199//!
200//!     // Spawn a task to manage the counter
201//!     tokio::spawn(async move {
202//!         let mut counter: u64 = 0;
203//!
204//!         while let Some((cmd, response)) = cmd_rx.recv().await {
205//!             match cmd {
206//!                 Increment => {
207//!                     let prev = counter;
208//!                     counter += 1;
209//!                     response.send(prev).unwrap();
210//!                 }
211//!             }
212//!         }
213//!     });
214//!
215//!     let mut join_handles = vec![];
216//!
217//!     // Spawn tasks that will send the increment command.
218//!     for _ in 0..10 {
219//!         let cmd_tx = cmd_tx.clone();
220//!
221//!         join_handles.push(tokio::spawn(async move {
222//!             let (resp_tx, resp_rx) = oneshot::channel();
223//!
224//!             cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
225//!             let res = resp_rx.await.unwrap();
226//!
227//!             println!("previous value = {}", res);
228//!         }));
229//!     }
230//!
231//!     // Wait for all tasks to complete
232//!     for join_handle in join_handles.drain(..) {
233//!         join_handle.await.unwrap();
234//!     }
235//! }
236//! ```
237//!
238//! ## `broadcast` channel
239//!
240//! The [`broadcast` channel] supports sending **many** values from
241//! **many** producers to **many** consumers. Each consumer will receive
242//! **each** value. This channel can be used to implement "fan out" style
243//! patterns common with pub / sub or "chat" systems.
244//!
245//! This channel tends to be used less often than `oneshot` and `mpsc` but still
246//! has its use cases.
247//!
248//! This is also the channel you should use if you want to broadcast values from
249//! a single producer to many consumers. There is no dedicated spmc broadcast
250//! channel.
251//!
252//! Basic usage
253//!
254//! ```
255//! use tokio::sync::broadcast;
256//!
257//! #[tokio::main]
258//! async fn main() {
259//!     let (tx, mut rx1) = broadcast::channel(16);
260//!     let mut rx2 = tx.subscribe();
261//!
262//!     tokio::spawn(async move {
263//!         assert_eq!(rx1.recv().await.unwrap(), 10);
264//!         assert_eq!(rx1.recv().await.unwrap(), 20);
265//!     });
266//!
267//!     tokio::spawn(async move {
268//!         assert_eq!(rx2.recv().await.unwrap(), 10);
269//!         assert_eq!(rx2.recv().await.unwrap(), 20);
270//!     });
271//!
272//!     tx.send(10).unwrap();
273//!     tx.send(20).unwrap();
274//! }
275//! ```
276//!
277//! [`broadcast` channel]: crate::sync::broadcast
278//!
279//! ## `watch` channel
280//!
281//! The [`watch` channel] supports sending **many** values from a **many**
282//! producer to **many** consumers. However, only the **most recent** value is
283//! stored in the channel. Consumers are notified when a new value is sent, but
284//! there is no guarantee that consumers will see **all** values.
285//!
286//! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
287//!
288//! Use cases for the [`watch` channel] include broadcasting configuration
289//! changes or signalling program state changes, such as transitioning to
290//! shutdown.
291//!
292//! **Example:** use a [`watch` channel] to notify tasks of configuration
293//! changes. In this example, a configuration file is checked periodically. When
294//! the file changes, the configuration changes are signalled to consumers.
295//!
296//! ```
297//! use tokio::sync::watch;
298//! use tokio::time::{self, Duration, Instant};
299//!
300//! use std::io;
301//!
302//! #[derive(Debug, Clone, Eq, PartialEq)]
303//! struct Config {
304//!     timeout: Duration,
305//! }
306//!
307//! impl Config {
308//!     async fn load_from_file() -> io::Result<Config> {
309//!         // file loading and deserialization logic here
310//! # Ok(Config { timeout: Duration::from_secs(1) })
311//!     }
312//! }
313//!
314//! async fn my_async_operation() {
315//!     // Do something here
316//! }
317//!
318//! #[tokio::main]
319//! async fn main() {
320//!     // Load initial configuration value
321//!     let mut config = Config::load_from_file().await.unwrap();
322//!
323//!     // Create the watch channel, initialized with the loaded configuration
324//!     let (tx, rx) = watch::channel(config.clone());
325//!
326//!     // Spawn a task to monitor the file.
327//!     tokio::spawn(async move {
328//!         loop {
329//!             // Wait 10 seconds between checks
330//!             time::sleep(Duration::from_secs(10)).await;
331//!
332//!             // Load the configuration file
333//!             let new_config = Config::load_from_file().await.unwrap();
334//!
335//!             // If the configuration changed, send the new config value
336//!             // on the watch channel.
337//!             if new_config != config {
338//!                 tx.send(new_config.clone()).unwrap();
339//!                 config = new_config;
340//!             }
341//!         }
342//!     });
343//!
344//!     let mut handles = vec![];
345//!
346//!     // Spawn tasks that runs the async operation for at most `timeout`. If
347//!     // the timeout elapses, restart the operation.
348//!     //
349//!     // The task simultaneously watches the `Config` for changes. When the
350//!     // timeout duration changes, the timeout is updated without restarting
351//!     // the in-flight operation.
352//!     for _ in 0..5 {
353//!         // Clone a config watch handle for use in this task
354//!         let mut rx = rx.clone();
355//!
356//!         let handle = tokio::spawn(async move {
357//!             // Start the initial operation and pin the future to the stack.
358//!             // Pinning to the stack is required to resume the operation
359//!             // across multiple calls to `select!`
360//!             let op = my_async_operation();
361//!             tokio::pin!(op);
362//!
363//!             // Get the initial config value
364//!             let mut conf = rx.borrow().clone();
365//!
366//!             let mut op_start = Instant::now();
367//!             let sleep = time::sleep_until(op_start + conf.timeout);
368//!             tokio::pin!(sleep);
369//!
370//!             loop {
371//!                 tokio::select! {
372//!                     _ = &mut sleep => {
373//!                         // The operation elapsed. Restart it
374//!                         op.set(my_async_operation());
375//!
376//!                         // Track the new start time
377//!                         op_start = Instant::now();
378//!
379//!                         // Restart the timeout
380//!                         sleep.set(time::sleep_until(op_start + conf.timeout));
381//!                     }
382//!                     _ = rx.changed() => {
383//!                         conf = rx.borrow_and_update().clone();
384//!
385//!                         // The configuration has been updated. Update the
386//!                         // `sleep` using the new `timeout` value.
387//!                         sleep.as_mut().reset(op_start + conf.timeout);
388//!                     }
389//!                     _ = &mut op => {
390//!                         // The operation completed!
391//!                         return
392//!                     }
393//!                 }
394//!             }
395//!         });
396//!
397//!         handles.push(handle);
398//!     }
399//!
400//!     for handle in handles.drain(..) {
401//!         handle.await.unwrap();
402//!     }
403//! }
404//! ```
405//!
406//! [`watch` channel]: mod@crate::sync::watch
407//! [`broadcast` channel]: mod@crate::sync::broadcast
408//!
409//! # State synchronization
410//!
411//! The remaining synchronization primitives focus on synchronizing state.
412//! These are asynchronous equivalents to versions provided by `std`. They
413//! operate in a similar way as their `std` counterparts but will wait
414//! asynchronously instead of blocking the thread.
415//!
416//! * [`Barrier`] Ensures multiple tasks will wait for each other to reach a
417//!   point in the program, before continuing execution all together.
418//!
419//! * [`Mutex`] Mutual Exclusion mechanism, which ensures that at most one
420//!   thread at a time is able to access some data.
421//!
422//! * [`Notify`] Basic task notification. `Notify` supports notifying a
423//!   receiving task without sending data. In this case, the task wakes up and
424//!   resumes processing.
425//!
426//! * [`RwLock`] Provides a mutual exclusion mechanism which allows multiple
427//!   readers at the same time, while allowing only one writer at a time. In
428//!   some cases, this can be more efficient than a mutex.
429//!
430//! * [`Semaphore`] Limits the amount of concurrency. A semaphore holds a
431//!   number of permits, which tasks may request in order to enter a critical
432//!   section. Semaphores are useful for implementing limiting or bounding of
433//!   any kind.
434//!
435//! # Runtime compatibility
436//!
437//! All synchronization primitives provided in this module are runtime agnostic.
438//! You can freely move them between different instances of the Tokio runtime
439//! or even use them from non-Tokio runtimes.
440//!
441//! When used in a Tokio runtime, the synchronization primitives participate in
442//! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid
443//! starvation. This feature does not apply when used from non-Tokio runtimes.
444//!
445//! As an exception, methods ending in `_timeout` are not runtime agnostic
446//! because they require access to the Tokio timer. See the documentation of
447//! each `*_timeout` method for more information on its use.
448
449cfg_sync! {
450    /// Named future types.
451    pub mod futures {
452        pub use super::notify::{Notified, OwnedNotified};
453    }
454
455    mod barrier;
456    pub use barrier::{Barrier, BarrierWaitResult};
457
458    pub mod broadcast;
459
460    pub mod mpsc;
461
462    mod mutex;
463    pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard};
464
465    pub(crate) mod notify;
466    pub use notify::Notify;
467
468    pub mod oneshot;
469
470    pub(crate) mod batch_semaphore;
471    pub use batch_semaphore::{AcquireError, TryAcquireError};
472
473    mod semaphore;
474    pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
475
476    mod rwlock;
477    pub use rwlock::RwLock;
478    pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
479    pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
480    pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
481    pub use rwlock::read_guard::RwLockReadGuard;
482    pub use rwlock::write_guard::RwLockWriteGuard;
483    pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
484
485    mod task;
486    pub(crate) use task::AtomicWaker;
487
488    mod once_cell;
489    pub use self::once_cell::{OnceCell, SetError};
490
491    mod set_once;
492    pub use self::set_once::{SetOnce, SetOnceError};
493
494    pub mod watch;
495}
496
497cfg_not_sync! {
498    cfg_fs! {
499        pub(crate) mod batch_semaphore;
500        mod mutex;
501        pub(crate) use mutex::Mutex;
502    }
503
504    #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))]
505    pub(crate) mod notify;
506
507    #[cfg(any(feature = "rt", all(windows, feature = "process")))]
508    pub(crate) mod oneshot;
509
510    cfg_atomic_waker_impl! {
511        mod task;
512        pub(crate) use task::AtomicWaker;
513    }
514
515    #[cfg(any(feature = "signal", all(unix, feature = "process")))]
516    pub(crate) mod watch;
517}
518
519/// Unit tests
520#[cfg(test)]
521mod tests;