tokio/sync/
once_cell.rs

1use super::{Semaphore, SemaphorePermit, TryAcquireError};
2use crate::loom::cell::UnsafeCell;
3use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::mem::MaybeUninit;
7use std::ops::Drop;
8use std::ptr;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11// This file contains an implementation of an OnceCell. The principle
12// behind the safety of the cell is that any thread with an `&OnceCell` may
13// access the `value` field according the following rules:
14//
15//  1. When `value_set` is false, the `value` field may be modified by the
16//     thread holding the permit on the semaphore.
17//  2. When `value_set` is true, the `value` field may be accessed immutably by
18//     any thread.
19//
20// It is an invariant that if the semaphore is closed, then `value_set` is true.
21// The reverse does not necessarily hold — but if not, the semaphore may not
22// have any available permits.
23//
24// A thread with a `&mut OnceCell` may modify the value in any way it wants as
25// long as the invariants are upheld.
26
27/// A thread-safe cell that can be written to only once.
28///
29/// A `OnceCell` is typically used for global variables that need to be
30/// initialized once on first use, but need no further changes. The `OnceCell`
31/// in Tokio allows the initialization procedure to be asynchronous.
32///
33/// # Examples
34///
35/// ```
36/// use tokio::sync::OnceCell;
37///
38/// async fn some_computation() -> u32 {
39///     1 + 1
40/// }
41///
42/// static ONCE: OnceCell<u32> = OnceCell::const_new();
43///
44/// # #[tokio::main(flavor = "current_thread")]
45/// # async fn main() {
46/// let result = ONCE.get_or_init(some_computation).await;
47/// assert_eq!(*result, 2);
48/// # }
49/// ```
50///
51/// It is often useful to write a wrapper method for accessing the value.
52///
53/// ```
54/// use tokio::sync::OnceCell;
55///
56/// static ONCE: OnceCell<u32> = OnceCell::const_new();
57///
58/// async fn get_global_integer() -> &'static u32 {
59///     ONCE.get_or_init(|| async {
60///         1 + 1
61///     }).await
62/// }
63///
64/// # #[tokio::main(flavor = "current_thread")]
65/// # async fn main() {
66/// let result = get_global_integer().await;
67/// assert_eq!(*result, 2);
68/// # }
69/// ```
70pub struct OnceCell<T> {
71    value_set: AtomicBool,
72    value: UnsafeCell<MaybeUninit<T>>,
73    semaphore: Semaphore,
74}
75
76impl<T> Default for OnceCell<T> {
77    fn default() -> OnceCell<T> {
78        OnceCell::new()
79    }
80}
81
82impl<T: fmt::Debug> fmt::Debug for OnceCell<T> {
83    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
84        fmt.debug_struct("OnceCell")
85            .field("value", &self.get())
86            .finish()
87    }
88}
89
90impl<T: Clone> Clone for OnceCell<T> {
91    fn clone(&self) -> OnceCell<T> {
92        OnceCell::new_with(self.get().cloned())
93    }
94}
95
96impl<T: PartialEq> PartialEq for OnceCell<T> {
97    fn eq(&self, other: &OnceCell<T>) -> bool {
98        self.get() == other.get()
99    }
100}
101
102impl<T: Eq> Eq for OnceCell<T> {}
103
104impl<T> Drop for OnceCell<T> {
105    fn drop(&mut self) {
106        if self.initialized_mut() {
107            unsafe {
108                self.value
109                    .with_mut(|ptr| ptr::drop_in_place((*ptr).as_mut_ptr()));
110            };
111        }
112    }
113}
114
115impl<T> From<T> for OnceCell<T> {
116    fn from(value: T) -> Self {
117        OnceCell {
118            value_set: AtomicBool::new(true),
119            value: UnsafeCell::new(MaybeUninit::new(value)),
120            semaphore: Semaphore::new_closed(),
121        }
122    }
123}
124
125impl<T> OnceCell<T> {
126    /// Creates a new empty `OnceCell` instance.
127    pub fn new() -> Self {
128        OnceCell {
129            value_set: AtomicBool::new(false),
130            value: UnsafeCell::new(MaybeUninit::uninit()),
131            semaphore: Semaphore::new(1),
132        }
133    }
134
135    /// Creates a new empty `OnceCell` instance.
136    ///
137    /// Equivalent to `OnceCell::new`, except that it can be used in static
138    /// variables.
139    ///
140    /// When using the `tracing` [unstable feature], a `OnceCell` created with
141    /// `const_new` will not be instrumented. As such, it will not be visible
142    /// in [`tokio-console`]. Instead, [`OnceCell::new`] should be used to
143    /// create an instrumented object if that is needed.
144    ///
145    /// # Example
146    ///
147    /// ```
148    /// use tokio::sync::OnceCell;
149    ///
150    /// static ONCE: OnceCell<u32> = OnceCell::const_new();
151    ///
152    /// async fn get_global_integer() -> &'static u32 {
153    ///     ONCE.get_or_init(|| async {
154    ///         1 + 1
155    ///     }).await
156    /// }
157    ///
158    /// # #[tokio::main(flavor = "current_thread")]
159    /// # async fn main() {
160    /// let result = get_global_integer().await;
161    /// assert_eq!(*result, 2);
162    /// # }
163    /// ```
164    ///
165    /// [`tokio-console`]: https://github.com/tokio-rs/console
166    /// [unstable feature]: crate#unstable-features
167    #[cfg(not(all(loom, test)))]
168    pub const fn const_new() -> Self {
169        OnceCell {
170            value_set: AtomicBool::new(false),
171            value: UnsafeCell::new(MaybeUninit::uninit()),
172            semaphore: Semaphore::const_new(1),
173        }
174    }
175
176    /// Creates a new `OnceCell` that contains the provided value, if any.
177    ///
178    /// If the `Option` is `None`, this is equivalent to `OnceCell::new`.
179    ///
180    /// [`OnceCell::new`]: crate::sync::OnceCell::new
181    // Once https://github.com/rust-lang/rust/issues/73255 lands
182    // and tokio MSRV is bumped to the rustc version with it stabilised,
183    // we can make this function available in const context,
184    // by creating `Semaphore::const_new_closed`.
185    pub fn new_with(value: Option<T>) -> Self {
186        if let Some(v) = value {
187            OnceCell::from(v)
188        } else {
189            OnceCell::new()
190        }
191    }
192
193    /// Creates a new `OnceCell` that contains the provided value.
194    ///
195    /// # Example
196    ///
197    /// When using the `tracing` [unstable feature], a `OnceCell` created with
198    /// `const_new_with` will not be instrumented. As such, it will not be
199    /// visible in [`tokio-console`]. Instead, [`OnceCell::new_with`] should be
200    /// used to create an instrumented object if that is needed.
201    ///
202    /// ```
203    /// use tokio::sync::OnceCell;
204    ///
205    /// static ONCE: OnceCell<u32> = OnceCell::const_new_with(1);
206    ///
207    /// async fn get_global_integer() -> &'static u32 {
208    ///     ONCE.get_or_init(|| async {
209    ///         1 + 1
210    ///     }).await
211    /// }
212    ///
213    /// # #[tokio::main(flavor = "current_thread")]
214    /// # async fn main() {
215    /// let result = get_global_integer().await;
216    /// assert_eq!(*result, 1);
217    /// # }
218    /// ```
219    ///
220    /// [`tokio-console`]: https://github.com/tokio-rs/console
221    /// [unstable feature]: crate#unstable-features
222    #[cfg(not(all(loom, test)))]
223    pub const fn const_new_with(value: T) -> Self {
224        OnceCell {
225            value_set: AtomicBool::new(true),
226            value: UnsafeCell::new(MaybeUninit::new(value)),
227            semaphore: Semaphore::const_new_closed(),
228        }
229    }
230
231    /// Returns `true` if the `OnceCell` currently contains a value, and `false`
232    /// otherwise.
233    pub fn initialized(&self) -> bool {
234        // Using acquire ordering so any threads that read a true from this
235        // atomic is able to read the value.
236        self.value_set.load(Ordering::Acquire)
237    }
238
239    /// Returns `true` if the `OnceCell` currently contains a value, and `false`
240    /// otherwise.
241    fn initialized_mut(&mut self) -> bool {
242        *self.value_set.get_mut()
243    }
244
245    // SAFETY: The OnceCell must not be empty.
246    unsafe fn get_unchecked(&self) -> &T {
247        unsafe { &*self.value.with(|ptr| (*ptr).as_ptr()) }
248    }
249
250    // SAFETY: The OnceCell must not be empty.
251    unsafe fn get_unchecked_mut(&mut self) -> &mut T {
252        // SAFETY:
253        //
254        // 1. The caller guarantees that the OnceCell is initialized.
255        // 2. The `&mut self` guarantees that there are no other references to the value.
256        unsafe { &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr()) }
257    }
258
259    fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T {
260        // SAFETY: We are holding the only permit on the semaphore.
261        unsafe {
262            self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
263        }
264
265        // Using release ordering so any threads that read a true from this
266        // atomic is able to read the value we just stored.
267        self.value_set.store(true, Ordering::Release);
268        self.semaphore.close();
269        permit.forget();
270
271        // SAFETY: We just initialized the cell.
272        unsafe { self.get_unchecked() }
273    }
274
275    /// Returns a reference to the value currently stored in the `OnceCell`, or
276    /// `None` if the `OnceCell` is empty.
277    pub fn get(&self) -> Option<&T> {
278        if self.initialized() {
279            Some(unsafe { self.get_unchecked() })
280        } else {
281            None
282        }
283    }
284
285    /// Returns a mutable reference to the value currently stored in the
286    /// `OnceCell`, or `None` if the `OnceCell` is empty.
287    ///
288    /// Since this call borrows the `OnceCell` mutably, it is safe to mutate the
289    /// value inside the `OnceCell` — the mutable borrow statically guarantees
290    /// no other references exist.
291    pub fn get_mut(&mut self) -> Option<&mut T> {
292        if self.initialized_mut() {
293            Some(unsafe { self.get_unchecked_mut() })
294        } else {
295            None
296        }
297    }
298
299    /// Sets the value of the `OnceCell` to the given value if the `OnceCell` is
300    /// empty.
301    ///
302    /// If the `OnceCell` already has a value, this call will fail with an
303    /// [`SetError::AlreadyInitializedError`].
304    ///
305    /// If the `OnceCell` is empty, but some other task is currently trying to
306    /// set the value, this call will fail with [`SetError::InitializingError`].
307    ///
308    /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError
309    /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError
310    pub fn set(&self, value: T) -> Result<(), SetError<T>> {
311        if self.initialized() {
312            return Err(SetError::AlreadyInitializedError(value));
313        }
314
315        // Another task might be initializing the cell, in which case
316        // `try_acquire` will return an error. If we succeed to acquire the
317        // permit, then we can set the value.
318        match self.semaphore.try_acquire() {
319            Ok(permit) => {
320                debug_assert!(!self.initialized());
321                self.set_value(value, permit);
322                Ok(())
323            }
324            Err(TryAcquireError::NoPermits) => {
325                // Some other task is holding the permit. That task is
326                // currently trying to initialize the value.
327                Err(SetError::InitializingError(value))
328            }
329            Err(TryAcquireError::Closed) => {
330                // The semaphore was closed. Some other task has initialized
331                // the value.
332                Err(SetError::AlreadyInitializedError(value))
333            }
334        }
335    }
336
337    /// Gets the value currently in the `OnceCell`, or initialize it with the
338    /// given asynchronous operation.
339    ///
340    /// If some other task is currently working on initializing the `OnceCell`,
341    /// this call will wait for that other task to finish, then return the value
342    /// that the other task produced.
343    ///
344    /// If the provided operation is cancelled or panics, the initialization
345    /// attempt is cancelled. If there are other tasks waiting for the value to
346    /// be initialized, one of them will start another attempt at initializing
347    /// the value.
348    ///
349    /// This will deadlock if `f` tries to initialize the cell recursively.
350    pub async fn get_or_init<F, Fut>(&self, f: F) -> &T
351    where
352        F: FnOnce() -> Fut,
353        Fut: Future<Output = T>,
354    {
355        crate::trace::async_trace_leaf().await;
356
357        if self.initialized() {
358            // SAFETY: The OnceCell has been fully initialized.
359            unsafe { self.get_unchecked() }
360        } else {
361            // Here we try to acquire the semaphore permit. Holding the permit
362            // will allow us to set the value of the OnceCell, and prevents
363            // other tasks from initializing the OnceCell while we are holding
364            // it.
365            match self.semaphore.acquire().await {
366                Ok(permit) => {
367                    debug_assert!(!self.initialized());
368
369                    // If `f()` panics or `select!` is called, this
370                    // `get_or_init` call is aborted and the semaphore permit is
371                    // dropped.
372                    let value = f().await;
373
374                    self.set_value(value, permit)
375                }
376                Err(_) => {
377                    debug_assert!(self.initialized());
378
379                    // SAFETY: The semaphore has been closed. This only happens
380                    // when the OnceCell is fully initialized.
381                    unsafe { self.get_unchecked() }
382                }
383            }
384        }
385    }
386
387    /// Gets the value currently in the `OnceCell`, or initialize it with the
388    /// given asynchronous operation.
389    ///
390    /// If some other task is currently working on initializing the `OnceCell`,
391    /// this call will wait for that other task to finish, then return the value
392    /// that the other task produced.
393    ///
394    /// If the provided operation returns an error, is cancelled or panics, the
395    /// initialization attempt is cancelled. If there are other tasks waiting
396    /// for the value to be initialized, one of them will start another attempt
397    /// at initializing the value.
398    ///
399    /// This will deadlock if `f` tries to initialize the cell recursively.
400    pub async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<&T, E>
401    where
402        F: FnOnce() -> Fut,
403        Fut: Future<Output = Result<T, E>>,
404    {
405        crate::trace::async_trace_leaf().await;
406
407        if self.initialized() {
408            // SAFETY: The OnceCell has been fully initialized.
409            unsafe { Ok(self.get_unchecked()) }
410        } else {
411            // Here we try to acquire the semaphore permit. Holding the permit
412            // will allow us to set the value of the OnceCell, and prevents
413            // other tasks from initializing the OnceCell while we are holding
414            // it.
415            match self.semaphore.acquire().await {
416                Ok(permit) => {
417                    debug_assert!(!self.initialized());
418
419                    // If `f()` panics or `select!` is called, this
420                    // `get_or_try_init` call is aborted and the semaphore
421                    // permit is dropped.
422                    let value = f().await;
423
424                    match value {
425                        Ok(value) => Ok(self.set_value(value, permit)),
426                        Err(e) => Err(e),
427                    }
428                }
429                Err(_) => {
430                    debug_assert!(self.initialized());
431
432                    // SAFETY: The semaphore has been closed. This only happens
433                    // when the OnceCell is fully initialized.
434                    unsafe { Ok(self.get_unchecked()) }
435                }
436            }
437        }
438    }
439
440    /// Takes the value from the cell, destroying the cell in the process.
441    /// Returns `None` if the cell is empty.
442    pub fn into_inner(mut self) -> Option<T> {
443        if self.initialized_mut() {
444            // Set to uninitialized for the destructor of `OnceCell` to work properly
445            *self.value_set.get_mut() = false;
446            Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) })
447        } else {
448            None
449        }
450    }
451
452    /// Takes ownership of the current value, leaving the cell empty.  Returns
453    /// `None` if the cell is empty.
454    pub fn take(&mut self) -> Option<T> {
455        std::mem::take(self).into_inner()
456    }
457}
458
459// Since `get` gives us access to immutable references of the OnceCell, OnceCell
460// can only be Sync if T is Sync, otherwise OnceCell would allow sharing
461// references of !Sync values across threads. We need T to be Send in order for
462// OnceCell to by Sync because we can use `set` on `&OnceCell<T>` to send values
463// (of type T) across threads.
464unsafe impl<T: Sync + Send> Sync for OnceCell<T> {}
465
466// Access to OnceCell's value is guarded by the semaphore permit
467// and atomic operations on `value_set`, so as long as T itself is Send
468// it's safe to send it to another thread
469unsafe impl<T: Send> Send for OnceCell<T> {}
470
471/// Errors that can be returned from [`OnceCell::set`].
472///
473/// [`OnceCell::set`]: crate::sync::OnceCell::set
474#[derive(Debug, PartialEq, Eq)]
475pub enum SetError<T> {
476    /// The cell was already initialized when [`OnceCell::set`] was called.
477    ///
478    /// [`OnceCell::set`]: crate::sync::OnceCell::set
479    AlreadyInitializedError(T),
480
481    /// The cell is currently being initialized.
482    InitializingError(T),
483}
484
485impl<T> fmt::Display for SetError<T> {
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        match self {
488            SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
489            SetError::InitializingError(_) => write!(f, "InitializingError"),
490        }
491    }
492}
493
494impl<T: fmt::Debug> Error for SetError<T> {}
495
496impl<T> SetError<T> {
497    /// Whether `SetError` is `SetError::AlreadyInitializedError`.
498    pub fn is_already_init_err(&self) -> bool {
499        match self {
500            SetError::AlreadyInitializedError(_) => true,
501            SetError::InitializingError(_) => false,
502        }
503    }
504
505    /// Whether `SetError` is `SetError::InitializingError`
506    pub fn is_initializing_err(&self) -> bool {
507        match self {
508            SetError::AlreadyInitializedError(_) => false,
509            SetError::InitializingError(_) => true,
510        }
511    }
512}