tokio/sync/
watch.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A multi-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33//!   `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34//! * If the current value is *unseen* when calling [`changed`], then
35//!   [`changed`] will return immediately. If the current value is *seen*, then
36//!   it will sleep until either a new message is sent via the [`Sender`] half,
37//!   or the [`Sender`] is dropped.
38//! * On completion, the [`changed`] method marks the new value as *seen*.
39//! * At creation, the initial value is considered *seen*. In other words,
40//!   [`Receiver::changed()`] will not return until a subsequent value is sent.
41//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42//!   The current value at the time the [`Receiver`] is created is considered
43//!   *seen*.
44//!
45//! ## `borrow_and_update` versus `borrow`
46//!
47//! If the receiver intends to await notifications from [`changed`] in a loop,
48//! [`Receiver::borrow_and_update()`] should be preferred over
49//! [`Receiver::borrow()`].  This avoids a potential race where a new value is
50//! sent between [`changed`] being ready and the value being read. (If
51//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52//!
53//! If the receiver is only interested in the current value, and does not intend
54//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57//! self`.
58//!
59//! # Examples
60//!
61//! The following example prints `hello! world! `.
62//!
63//! ```
64//! use tokio::sync::watch;
65//! use tokio::time::{Duration, sleep};
66//!
67//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68//! let (tx, mut rx) = watch::channel("hello");
69//!
70//! tokio::spawn(async move {
71//!     // Use the equivalent of a "do-while" loop so the initial value is
72//!     // processed before awaiting the `changed()` future.
73//!     loop {
74//!         println!("{}! ", *rx.borrow_and_update());
75//!         if rx.changed().await.is_err() {
76//!             break;
77//!         }
78//!     }
79//! });
80//!
81//! sleep(Duration::from_millis(100)).await;
82//! tx.send("world")?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! # Closing
88//!
89//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90//! when all [`Receiver`] handles have been dropped. This indicates that there
91//! is no further interest in the values being produced and work can be stopped.
92//!
93//! The value in the channel will not be dropped until the sender and all
94//! receivers have been dropped.
95//!
96//! # Thread safety
97//!
98//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100//! handles may be moved to separate threads and also used concurrently.
101//!
102//! [`Sender`]: crate::sync::watch::Sender
103//! [`Receiver`]: crate::sync::watch::Receiver
104//! [`changed`]: crate::sync::watch::Receiver::changed
105//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107//! [`Receiver::borrow_and_update()`]:
108//!     crate::sync::watch::Receiver::borrow_and_update
109//! [`channel`]: crate::sync::watch::channel
110//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111//! [`Sender::closed`]: crate::sync::watch::Sender::closed
112//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113
114use crate::sync::notify::Notify;
115use crate::task::coop::cooperative;
116
117use crate::loom::sync::atomic::AtomicUsize;
118use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
119use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
120use std::fmt;
121use std::mem;
122use std::ops;
123use std::panic;
124
125/// Receives values from the associated [`Sender`](struct@Sender).
126///
127/// Instances are created by the [`channel`](fn@channel) function.
128///
129/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
130/// wrapper.
131///
132/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
133#[derive(Debug)]
134pub struct Receiver<T> {
135    /// Pointer to the shared state
136    shared: Arc<Shared<T>>,
137
138    /// Last observed version
139    version: Version,
140}
141
142/// Sends values to the associated [`Receiver`](struct@Receiver).
143///
144/// Instances are created by the [`channel`](fn@channel) function.
145#[derive(Debug)]
146pub struct Sender<T> {
147    shared: Arc<Shared<T>>,
148}
149
150impl<T> Clone for Sender<T> {
151    fn clone(&self) -> Self {
152        self.shared.ref_count_tx.fetch_add(1, Relaxed);
153
154        Self {
155            shared: self.shared.clone(),
156        }
157    }
158}
159
160impl<T: Default> Default for Sender<T> {
161    fn default() -> Self {
162        Self::new(T::default())
163    }
164}
165
166/// Returns a reference to the inner value.
167///
168/// Outstanding borrows hold a read lock on the inner value. This means that
169/// long-lived borrows could cause the producer half to block. It is recommended
170/// to keep the borrow as short-lived as possible. Additionally, if you are
171/// running in an environment that allows `!Send` futures, you must ensure that
172/// the returned `Ref` type is never held alive across an `.await` point,
173/// otherwise, it can lead to a deadlock.
174///
175/// The priority policy of the lock is dependent on the underlying lock
176/// implementation, and this type does not guarantee that any particular policy
177/// will be used. In particular, a producer which is waiting to acquire the lock
178/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
179///
180/// <details><summary>Potential deadlock example</summary>
181///
182/// ```text
183/// // Task 1 (on thread A)    |  // Task 2 (on thread B)
184/// let _ref1 = rx.borrow();   |
185///                            |  // will block
186///                            |  let _ = tx.send(());
187/// // may deadlock            |
188/// let _ref2 = rx.borrow();   |
189/// ```
190/// </details>
191#[derive(Debug)]
192pub struct Ref<'a, T> {
193    inner: RwLockReadGuard<'a, T>,
194    has_changed: bool,
195}
196
197impl<'a, T> Ref<'a, T> {
198    /// Indicates if the borrowed value is considered as _changed_ since the last
199    /// time it has been marked as seen.
200    ///
201    /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
202    ///
203    /// When borrowed from the [`Sender`] this function will always return `false`.
204    ///
205    /// # Examples
206    ///
207    /// ```
208    /// use tokio::sync::watch;
209    ///
210    /// #[tokio::main]
211    /// async fn main() {
212    ///     let (tx, mut rx) = watch::channel("hello");
213    ///
214    ///     tx.send("goodbye").unwrap();
215    ///     // The sender does never consider the value as changed.
216    ///     assert!(!tx.borrow().has_changed());
217    ///
218    ///     // Drop the sender immediately, just for testing purposes.
219    ///     drop(tx);
220    ///
221    ///     // Even if the sender has already been dropped...
222    ///     assert!(rx.has_changed().is_err());
223    ///     // ...the modified value is still readable and detected as changed.
224    ///     assert_eq!(*rx.borrow(), "goodbye");
225    ///     assert!(rx.borrow().has_changed());
226    ///
227    ///     // Read the changed value and mark it as seen.
228    ///     {
229    ///         let received = rx.borrow_and_update();
230    ///         assert_eq!(*received, "goodbye");
231    ///         assert!(received.has_changed());
232    ///         // Release the read lock when leaving this scope.
233    ///     }
234    ///
235    ///     // Now the value has already been marked as seen and could
236    ///     // never be modified again (after the sender has been dropped).
237    ///     assert!(!rx.borrow().has_changed());
238    /// }
239    /// ```
240    pub fn has_changed(&self) -> bool {
241        self.has_changed
242    }
243}
244
245struct Shared<T> {
246    /// The most recent value.
247    value: RwLock<T>,
248
249    /// The current version.
250    ///
251    /// The lowest bit represents a "closed" state. The rest of the bits
252    /// represent the current version.
253    state: AtomicState,
254
255    /// Tracks the number of `Receiver` instances.
256    ref_count_rx: AtomicUsize,
257
258    /// Tracks the number of `Sender` instances.
259    ref_count_tx: AtomicUsize,
260
261    /// Notifies waiting receivers that the value changed.
262    notify_rx: big_notify::BigNotify,
263
264    /// Notifies any task listening for `Receiver` dropped events.
265    notify_tx: Notify,
266}
267
268impl<T: fmt::Debug> fmt::Debug for Shared<T> {
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        let state = self.state.load();
271        f.debug_struct("Shared")
272            .field("value", &self.value)
273            .field("version", &state.version())
274            .field("is_closed", &state.is_closed())
275            .field("ref_count_rx", &self.ref_count_rx)
276            .finish()
277    }
278}
279
280pub mod error {
281    //! Watch error types.
282
283    use std::error::Error;
284    use std::fmt;
285
286    /// Error produced when sending a value fails.
287    #[derive(PartialEq, Eq, Clone, Copy)]
288    pub struct SendError<T>(pub T);
289
290    // ===== impl SendError =====
291
292    impl<T> fmt::Debug for SendError<T> {
293        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294            f.debug_struct("SendError").finish_non_exhaustive()
295        }
296    }
297
298    impl<T> fmt::Display for SendError<T> {
299        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
300            write!(fmt, "channel closed")
301        }
302    }
303
304    impl<T> Error for SendError<T> {}
305
306    /// Error produced when receiving a change notification.
307    #[derive(Debug, Clone)]
308    pub struct RecvError(pub(super) ());
309
310    // ===== impl RecvError =====
311
312    impl fmt::Display for RecvError {
313        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
314            write!(fmt, "channel closed")
315        }
316    }
317
318    impl Error for RecvError {}
319}
320
321mod big_notify {
322    use super::Notify;
323    use crate::sync::notify::Notified;
324
325    // To avoid contention on the lock inside the `Notify`, we store multiple
326    // copies of it. Then, we use either circular access or randomness to spread
327    // out threads over different `Notify` objects.
328    //
329    // Some simple benchmarks show that randomness performs slightly better than
330    // circular access (probably due to contention on `next`), so we prefer to
331    // use randomness when Tokio is compiled with a random number generator.
332    //
333    // When the random number generator is not available, we fall back to
334    // circular access.
335
336    pub(super) struct BigNotify {
337        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
338        next: std::sync::atomic::AtomicUsize,
339        inner: [Notify; 8],
340    }
341
342    impl BigNotify {
343        pub(super) fn new() -> Self {
344            Self {
345                #[cfg(not(all(
346                    not(loom),
347                    feature = "sync",
348                    any(feature = "rt", feature = "macros")
349                )))]
350                next: std::sync::atomic::AtomicUsize::new(0),
351                inner: Default::default(),
352            }
353        }
354
355        pub(super) fn notify_waiters(&self) {
356            for notify in &self.inner {
357                notify.notify_waiters();
358            }
359        }
360
361        /// This function implements the case where randomness is not available.
362        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
363        pub(super) fn notified(&self) -> Notified<'_> {
364            let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
365            self.inner[i].notified()
366        }
367
368        /// This function implements the case where randomness is available.
369        #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
370        pub(super) fn notified(&self) -> Notified<'_> {
371            let i = crate::runtime::context::thread_rng_n(8) as usize;
372            self.inner[i].notified()
373        }
374    }
375}
376
377use self::state::{AtomicState, Version};
378mod state {
379    use crate::loom::sync::atomic::AtomicUsize;
380    use crate::loom::sync::atomic::Ordering;
381
382    const CLOSED_BIT: usize = 1;
383
384    // Using 2 as the step size preserves the `CLOSED_BIT`.
385    const STEP_SIZE: usize = 2;
386
387    /// The version part of the state. The lowest bit is always zero.
388    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
389    pub(super) struct Version(usize);
390
391    /// Snapshot of the state. The first bit is used as the CLOSED bit.
392    /// The remaining bits are used as the version.
393    ///
394    /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
395    /// receivers does not set it.
396    #[derive(Copy, Clone, Debug)]
397    pub(super) struct StateSnapshot(usize);
398
399    /// The state stored in an atomic integer.
400    ///
401    /// The `Sender` uses `Release` ordering for storing a new state
402    /// and the `Receiver`s use `Acquire` ordering for loading the
403    /// current state. This ensures that written values are seen by
404    /// the `Receiver`s for a proper handover.
405    #[derive(Debug)]
406    pub(super) struct AtomicState(AtomicUsize);
407
408    impl Version {
409        /// Decrements the version.
410        pub(super) fn decrement(&mut self) {
411            // Using a wrapping decrement here is required to ensure that the
412            // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
413            // which wraps on overflow.
414            self.0 = self.0.wrapping_sub(STEP_SIZE);
415        }
416
417        pub(super) const INITIAL: Self = Version(0);
418    }
419
420    impl StateSnapshot {
421        /// Extract the version from the state.
422        pub(super) fn version(self) -> Version {
423            Version(self.0 & !CLOSED_BIT)
424        }
425
426        /// Is the closed bit set?
427        pub(super) fn is_closed(self) -> bool {
428            (self.0 & CLOSED_BIT) == CLOSED_BIT
429        }
430    }
431
432    impl AtomicState {
433        /// Create a new `AtomicState` that is not closed and which has the
434        /// version set to `Version::INITIAL`.
435        pub(super) fn new() -> Self {
436            AtomicState(AtomicUsize::new(Version::INITIAL.0))
437        }
438
439        /// Load the current value of the state.
440        ///
441        /// Only used by the receiver and for debugging purposes.
442        ///
443        /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
444        /// of the shared value with the sender side (single writer). The state is always
445        /// updated after modifying and before releasing the (exclusive) lock on the
446        /// shared value.
447        pub(super) fn load(&self) -> StateSnapshot {
448            StateSnapshot(self.0.load(Ordering::Acquire))
449        }
450
451        /// Increment the version counter.
452        pub(super) fn increment_version_while_locked(&self) {
453            // Use `Release` ordering to ensure that the shared value
454            // has been written before updating the version. The shared
455            // value is still protected by an exclusive lock during this
456            // method.
457            self.0.fetch_add(STEP_SIZE, Ordering::Release);
458        }
459
460        /// Set the closed bit in the state.
461        pub(super) fn set_closed(&self) {
462            self.0.fetch_or(CLOSED_BIT, Ordering::Release);
463        }
464    }
465}
466
467/// Creates a new watch channel, returning the "send" and "receive" handles.
468///
469/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
470/// Only the last value sent is made available to the [`Receiver`] half. All
471/// intermediate values are dropped.
472///
473/// # Examples
474///
475/// The following example prints `hello! world! `.
476///
477/// ```
478/// use tokio::sync::watch;
479/// use tokio::time::{Duration, sleep};
480///
481/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
482/// let (tx, mut rx) = watch::channel("hello");
483///
484/// tokio::spawn(async move {
485///     // Use the equivalent of a "do-while" loop so the initial value is
486///     // processed before awaiting the `changed()` future.
487///     loop {
488///         println!("{}! ", *rx.borrow_and_update());
489///         if rx.changed().await.is_err() {
490///             break;
491///         }
492///     }
493/// });
494///
495/// sleep(Duration::from_millis(100)).await;
496/// tx.send("world")?;
497/// # Ok(())
498/// # }
499/// ```
500///
501/// [`Sender`]: struct@Sender
502/// [`Receiver`]: struct@Receiver
503pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
504    let shared = Arc::new(Shared {
505        value: RwLock::new(init),
506        state: AtomicState::new(),
507        ref_count_rx: AtomicUsize::new(1),
508        ref_count_tx: AtomicUsize::new(1),
509        notify_rx: big_notify::BigNotify::new(),
510        notify_tx: Notify::new(),
511    });
512
513    let tx = Sender {
514        shared: shared.clone(),
515    };
516
517    let rx = Receiver {
518        shared,
519        version: Version::INITIAL,
520    };
521
522    (tx, rx)
523}
524
525impl<T> Receiver<T> {
526    fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
527        // No synchronization necessary as this is only used as a counter and
528        // not memory access.
529        shared.ref_count_rx.fetch_add(1, Relaxed);
530
531        Self { shared, version }
532    }
533
534    /// Returns a reference to the most recently sent value.
535    ///
536    /// This method does not mark the returned value as seen, so future calls to
537    /// [`changed`] may return immediately even if you have already seen the
538    /// value with a call to `borrow`.
539    ///
540    /// Outstanding borrows hold a read lock on the inner value. This means that
541    /// long-lived borrows could cause the producer half to block. It is recommended
542    /// to keep the borrow as short-lived as possible. Additionally, if you are
543    /// running in an environment that allows `!Send` futures, you must ensure that
544    /// the returned `Ref` type is never held alive across an `.await` point,
545    /// otherwise, it can lead to a deadlock.
546    ///
547    /// The priority policy of the lock is dependent on the underlying lock
548    /// implementation, and this type does not guarantee that any particular policy
549    /// will be used. In particular, a producer which is waiting to acquire the lock
550    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
551    ///
552    /// <details><summary>Potential deadlock example</summary>
553    ///
554    /// ```text
555    /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
556    /// let _ref1 = rx.borrow();   |
557    ///                            |  // will block
558    ///                            |  let _ = tx.send(());
559    /// // may deadlock            |
560    /// let _ref2 = rx.borrow();   |
561    /// ```
562    /// </details>
563    ///
564    /// For more information on when to use this method versus
565    /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
566    ///
567    /// [`changed`]: Receiver::changed
568    /// [`borrow_and_update`]: Receiver::borrow_and_update
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// use tokio::sync::watch;
574    ///
575    /// let (_, rx) = watch::channel("hello");
576    /// assert_eq!(*rx.borrow(), "hello");
577    /// ```
578    pub fn borrow(&self) -> Ref<'_, T> {
579        let inner = self.shared.value.read();
580
581        // After obtaining a read-lock no concurrent writes could occur
582        // and the loaded version matches that of the borrowed reference.
583        let new_version = self.shared.state.load().version();
584        let has_changed = self.version != new_version;
585
586        Ref { inner, has_changed }
587    }
588
589    /// Returns a reference to the most recently sent value and marks that value
590    /// as seen.
591    ///
592    /// This method marks the current value as seen. Subsequent calls to [`changed`]
593    /// will not return immediately until the [`Sender`] has modified the shared
594    /// value again.
595    ///
596    /// Outstanding borrows hold a read lock on the inner value. This means that
597    /// long-lived borrows could cause the producer half to block. It is recommended
598    /// to keep the borrow as short-lived as possible. Additionally, if you are
599    /// running in an environment that allows `!Send` futures, you must ensure that
600    /// the returned `Ref` type is never held alive across an `.await` point,
601    /// otherwise, it can lead to a deadlock.
602    ///
603    /// The priority policy of the lock is dependent on the underlying lock
604    /// implementation, and this type does not guarantee that any particular policy
605    /// will be used. In particular, a producer which is waiting to acquire the lock
606    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
607    ///
608    /// <details><summary>Potential deadlock example</summary>
609    ///
610    /// ```text
611    /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
612    /// let _ref1 = rx1.borrow_and_update();   |
613    ///                                        |  // will block
614    ///                                        |  let _ = tx.send(());
615    /// // may deadlock                        |
616    /// let _ref2 = rx2.borrow_and_update();   |
617    /// ```
618    /// </details>
619    ///
620    /// For more information on when to use this method versus [`borrow`], see
621    /// [here](self#borrow_and_update-versus-borrow).
622    ///
623    /// [`changed`]: Receiver::changed
624    /// [`borrow`]: Receiver::borrow
625    pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
626        let inner = self.shared.value.read();
627
628        // After obtaining a read-lock no concurrent writes could occur
629        // and the loaded version matches that of the borrowed reference.
630        let new_version = self.shared.state.load().version();
631        let has_changed = self.version != new_version;
632
633        // Mark the shared value as seen by updating the version
634        self.version = new_version;
635
636        Ref { inner, has_changed }
637    }
638
639    /// Checks if this channel contains a message that this receiver has not yet
640    /// seen. The new value is not marked as seen.
641    ///
642    /// Although this method is called `has_changed`, it does not check new
643    /// messages for equality, so this call will return true even if the new
644    /// message is equal to the old message.
645    ///
646    /// Returns an error if the channel has been closed.
647    /// # Examples
648    ///
649    /// ```
650    /// use tokio::sync::watch;
651    ///
652    /// #[tokio::main]
653    /// async fn main() {
654    ///     let (tx, mut rx) = watch::channel("hello");
655    ///
656    ///     tx.send("goodbye").unwrap();
657    ///
658    ///     assert!(rx.has_changed().unwrap());
659    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
660    ///
661    ///     // The value has been marked as seen
662    ///     assert!(!rx.has_changed().unwrap());
663    ///
664    ///     drop(tx);
665    ///     // The `tx` handle has been dropped
666    ///     assert!(rx.has_changed().is_err());
667    /// }
668    /// ```
669    pub fn has_changed(&self) -> Result<bool, error::RecvError> {
670        // Load the version from the state
671        let state = self.shared.state.load();
672        if state.is_closed() {
673            // The sender has dropped.
674            return Err(error::RecvError(()));
675        }
676        let new_version = state.version();
677
678        Ok(self.version != new_version)
679    }
680
681    /// Marks the state as changed.
682    ///
683    /// After invoking this method [`has_changed()`](Self::has_changed)
684    /// returns `true` and [`changed()`](Self::changed) returns
685    /// immediately, regardless of whether a new value has been sent.
686    ///
687    /// This is useful for triggering an initial change notification after
688    /// subscribing to synchronize new receivers.
689    pub fn mark_changed(&mut self) {
690        self.version.decrement();
691    }
692
693    /// Marks the state as unchanged.
694    ///
695    /// The current value will be considered seen by the receiver.
696    ///
697    /// This is useful if you are not interested in the current value
698    /// visible in the receiver.
699    pub fn mark_unchanged(&mut self) {
700        let current_version = self.shared.state.load().version();
701        self.version = current_version;
702    }
703
704    /// Waits for a change notification, then marks the newest value as seen.
705    ///
706    /// If the newest value in the channel has not yet been marked seen when
707    /// this method is called, the method marks that value seen and returns
708    /// immediately. If the newest value has already been marked seen, then the
709    /// method sleeps until a new message is sent by the [`Sender`] connected to
710    /// this `Receiver`, or until the [`Sender`] is dropped.
711    ///
712    /// This method returns an error if and only if the [`Sender`] is dropped.
713    ///
714    /// For more information, see
715    /// [*Change notifications*](self#change-notifications) in the module-level documentation.
716    ///
717    /// # Cancel safety
718    ///
719    /// This method is cancel safe. If you use it as the event in a
720    /// [`tokio::select!`](crate::select) statement and some other branch
721    /// completes first, then it is guaranteed that no values have been marked
722    /// seen by this call to `changed`.
723    ///
724    /// [`Sender`]: struct@Sender
725    ///
726    /// # Examples
727    ///
728    /// ```
729    /// use tokio::sync::watch;
730    ///
731    /// #[tokio::main]
732    /// async fn main() {
733    ///     let (tx, mut rx) = watch::channel("hello");
734    ///
735    ///     tokio::spawn(async move {
736    ///         tx.send("goodbye").unwrap();
737    ///     });
738    ///
739    ///     assert!(rx.changed().await.is_ok());
740    ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
741    ///
742    ///     // The `tx` handle has been dropped
743    ///     assert!(rx.changed().await.is_err());
744    /// }
745    /// ```
746    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
747        cooperative(changed_impl(&self.shared, &mut self.version)).await
748    }
749
750    /// Waits for a value that satisfies the provided condition.
751    ///
752    /// This method will call the provided closure whenever something is sent on
753    /// the channel. Once the closure returns `true`, this method will return a
754    /// reference to the value that was passed to the closure.
755    ///
756    /// Before `wait_for` starts waiting for changes, it will call the closure
757    /// on the current value. If the closure returns `true` when given the
758    /// current value, then `wait_for` will immediately return a reference to
759    /// the current value. This is the case even if the current value is already
760    /// considered seen.
761    ///
762    /// The watch channel only keeps track of the most recent value, so if
763    /// several messages are sent faster than `wait_for` is able to call the
764    /// closure, then it may skip some updates. Whenever the closure is called,
765    /// it will be called with the most recent value.
766    ///
767    /// When this function returns, the value that was passed to the closure
768    /// when it returned `true` will be considered seen.
769    ///
770    /// If the channel is closed, then `wait_for` will return a [`RecvError`].
771    /// Once this happens, no more messages can ever be sent on the channel.
772    /// When an error is returned, it is guaranteed that the closure has been
773    /// called on the last value, and that it returned `false` for that value.
774    /// (If the closure returned `true`, then the last value would have been
775    /// returned instead of the error.)
776    ///
777    /// Like the [`borrow`] method, the returned borrow holds a read lock on the
778    /// inner value. This means that long-lived borrows could cause the producer
779    /// half to block. It is recommended to keep the borrow as short-lived as
780    /// possible. See the documentation of `borrow` for more information on
781    /// this.
782    ///
783    /// [`borrow`]: Receiver::borrow
784    /// [`RecvError`]: error::RecvError
785    ///
786    /// # Cancel safety
787    ///
788    /// This method is cancel safe. If you use it as the event in a
789    /// [`tokio::select!`](crate::select) statement and some other branch
790    /// completes first, then it is guaranteed that the last seen value `val`
791    /// (if any) satisfies `f(val) == false`.
792    ///
793    /// # Panics
794    ///
795    /// If and only if the closure `f` panics. In that case, no resource owned
796    /// or shared by this [`Receiver`] will be poisoned.
797    ///
798    /// # Examples
799    ///
800    /// ```
801    /// use tokio::sync::watch;
802    /// use tokio::time::{sleep, Duration};
803    ///
804    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
805    /// async fn main() {
806    ///     let (tx, mut rx) = watch::channel("hello");
807    ///
808    ///     tokio::spawn(async move {
809    ///         sleep(Duration::from_secs(1)).await;
810    ///         tx.send("goodbye").unwrap();
811    ///     });
812    ///
813    ///     assert!(rx.wait_for(|val| *val == "goodbye").await.is_ok());
814    ///     assert_eq!(*rx.borrow(), "goodbye");
815    /// }
816    /// ```
817    pub async fn wait_for(
818        &mut self,
819        f: impl FnMut(&T) -> bool,
820    ) -> Result<Ref<'_, T>, error::RecvError> {
821        cooperative(self.wait_for_inner(f)).await
822    }
823
824    async fn wait_for_inner(
825        &mut self,
826        mut f: impl FnMut(&T) -> bool,
827    ) -> Result<Ref<'_, T>, error::RecvError> {
828        let mut closed = false;
829        loop {
830            {
831                let inner = self.shared.value.read();
832
833                let new_version = self.shared.state.load().version();
834                let has_changed = self.version != new_version;
835                self.version = new_version;
836
837                if !closed || has_changed {
838                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
839                    match result {
840                        Ok(true) => {
841                            return Ok(Ref { inner, has_changed });
842                        }
843                        Ok(false) => {
844                            // Skip the value.
845                        }
846                        Err(panicked) => {
847                            // Drop the read-lock to avoid poisoning it.
848                            drop(inner);
849                            // Forward the panic to the caller.
850                            panic::resume_unwind(panicked);
851                            // Unreachable
852                        }
853                    };
854                }
855            }
856
857            if closed {
858                return Err(error::RecvError(()));
859            }
860
861            // Wait for the value to change.
862            closed = changed_impl(&self.shared, &mut self.version).await.is_err();
863        }
864    }
865
866    /// Returns `true` if receivers belong to the same channel.
867    ///
868    /// # Examples
869    ///
870    /// ```
871    /// let (tx, rx) = tokio::sync::watch::channel(true);
872    /// let rx2 = rx.clone();
873    /// assert!(rx.same_channel(&rx2));
874    ///
875    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
876    /// assert!(!rx3.same_channel(&rx2));
877    /// ```
878    pub fn same_channel(&self, other: &Self) -> bool {
879        Arc::ptr_eq(&self.shared, &other.shared)
880    }
881
882    cfg_process_driver! {
883        pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
884            maybe_changed(&self.shared, &mut self.version)
885        }
886    }
887}
888
889fn maybe_changed<T>(
890    shared: &Shared<T>,
891    version: &mut Version,
892) -> Option<Result<(), error::RecvError>> {
893    // Load the version from the state
894    let state = shared.state.load();
895    let new_version = state.version();
896
897    if *version != new_version {
898        // Observe the new version and return
899        *version = new_version;
900        return Some(Ok(()));
901    }
902
903    if state.is_closed() {
904        // The sender has been dropped.
905        return Some(Err(error::RecvError(())));
906    }
907
908    None
909}
910
911async fn changed_impl<T>(
912    shared: &Shared<T>,
913    version: &mut Version,
914) -> Result<(), error::RecvError> {
915    crate::trace::async_trace_leaf().await;
916
917    loop {
918        // In order to avoid a race condition, we first request a notification,
919        // **then** check the current value's version. If a new version exists,
920        // the notification request is dropped.
921        let notified = shared.notify_rx.notified();
922
923        if let Some(ret) = maybe_changed(shared, version) {
924            return ret;
925        }
926
927        notified.await;
928        // loop around again in case the wake-up was spurious
929    }
930}
931
932impl<T> Clone for Receiver<T> {
933    fn clone(&self) -> Self {
934        let version = self.version;
935        let shared = self.shared.clone();
936
937        Self::from_shared(version, shared)
938    }
939}
940
941impl<T> Drop for Receiver<T> {
942    fn drop(&mut self) {
943        // No synchronization necessary as this is only used as a counter and
944        // not memory access.
945        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
946            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
947            self.shared.notify_tx.notify_waiters();
948        }
949    }
950}
951
952impl<T> Sender<T> {
953    /// Creates the sending-half of the [`watch`] channel.
954    ///
955    /// See documentation of [`watch::channel`] for errors when calling this function.
956    /// Beware that attempting to send a value when there are no receivers will
957    /// return an error.
958    ///
959    /// [`watch`]: crate::sync::watch
960    /// [`watch::channel`]: crate::sync::watch
961    ///
962    /// # Examples
963    /// ```
964    /// let sender = tokio::sync::watch::Sender::new(0u8);
965    /// assert!(sender.send(3).is_err());
966    /// let _rec = sender.subscribe();
967    /// assert!(sender.send(4).is_ok());
968    /// ```
969    pub fn new(init: T) -> Self {
970        let (tx, _) = channel(init);
971        tx
972    }
973
974    /// Sends a new value via the channel, notifying all receivers.
975    ///
976    /// This method fails if the channel is closed, which is the case when
977    /// every receiver has been dropped. It is possible to reopen the channel
978    /// using the [`subscribe`] method. However, when `send` fails, the value
979    /// isn't made available for future receivers (but returned with the
980    /// [`SendError`]).
981    ///
982    /// To always make a new value available for future receivers, even if no
983    /// receiver currently exists, one of the other send methods
984    /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
985    /// used instead.
986    ///
987    /// [`subscribe`]: Sender::subscribe
988    /// [`SendError`]: error::SendError
989    /// [`send_if_modified`]: Sender::send_if_modified
990    /// [`send_modify`]: Sender::send_modify
991    /// [`send_replace`]: Sender::send_replace
992    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
993        // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
994        if 0 == self.receiver_count() {
995            return Err(error::SendError(value));
996        }
997
998        self.send_replace(value);
999        Ok(())
1000    }
1001
1002    /// Modifies the watched value **unconditionally** in-place,
1003    /// notifying all receivers.
1004    ///
1005    /// This can be useful for modifying the watched value, without
1006    /// having to allocate a new instance. Additionally, this
1007    /// method permits sending values even when there are no receivers.
1008    ///
1009    /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1010    /// if the value is only modified conditionally during the mutable borrow
1011    /// to prevent unneeded change notifications for unmodified values.
1012    ///
1013    /// # Panics
1014    ///
1015    /// This function panics when the invocation of the `modify` closure panics.
1016    /// No receivers are notified when panicking. All changes of the watched
1017    /// value applied by the closure before panicking will be visible in
1018    /// subsequent calls to `borrow`.
1019    ///
1020    /// # Examples
1021    ///
1022    /// ```
1023    /// use tokio::sync::watch;
1024    ///
1025    /// struct State {
1026    ///     counter: usize,
1027    /// }
1028    /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1029    /// state_tx.send_modify(|state| state.counter += 1);
1030    /// assert_eq!(state_rx.borrow().counter, 1);
1031    /// ```
1032    pub fn send_modify<F>(&self, modify: F)
1033    where
1034        F: FnOnce(&mut T),
1035    {
1036        self.send_if_modified(|value| {
1037            modify(value);
1038            true
1039        });
1040    }
1041
1042    /// Modifies the watched value **conditionally** in-place,
1043    /// notifying all receivers only if modified.
1044    ///
1045    /// This can be useful for modifying the watched value, without
1046    /// having to allocate a new instance. Additionally, this
1047    /// method permits sending values even when there are no receivers.
1048    ///
1049    /// The `modify` closure must return `true` if the value has actually
1050    /// been modified during the mutable borrow. It should only return `false`
1051    /// if the value is guaranteed to be unmodified despite the mutable
1052    /// borrow.
1053    ///
1054    /// Receivers are only notified if the closure returned `true`. If the
1055    /// closure has modified the value but returned `false` this results
1056    /// in a *silent modification*, i.e. the modified value will be visible
1057    /// in subsequent calls to `borrow`, but receivers will not receive
1058    /// a change notification.
1059    ///
1060    /// Returns the result of the closure, i.e. `true` if the value has
1061    /// been modified and `false` otherwise.
1062    ///
1063    /// # Panics
1064    ///
1065    /// This function panics when the invocation of the `modify` closure panics.
1066    /// No receivers are notified when panicking. All changes of the watched
1067    /// value applied by the closure before panicking will be visible in
1068    /// subsequent calls to `borrow`.
1069    ///
1070    /// # Examples
1071    ///
1072    /// ```
1073    /// use tokio::sync::watch;
1074    ///
1075    /// struct State {
1076    ///     counter: usize,
1077    /// }
1078    /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1079    /// let inc_counter_if_odd = |state: &mut State| {
1080    ///     if state.counter % 2 == 1 {
1081    ///         state.counter += 1;
1082    ///         return true;
1083    ///     }
1084    ///     false
1085    /// };
1086    ///
1087    /// assert_eq!(state_rx.borrow().counter, 1);
1088    ///
1089    /// assert!(!state_rx.has_changed().unwrap());
1090    /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1091    /// assert!(state_rx.has_changed().unwrap());
1092    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1093    ///
1094    /// assert!(!state_rx.has_changed().unwrap());
1095    /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1096    /// assert!(!state_rx.has_changed().unwrap());
1097    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1098    /// ```
1099    pub fn send_if_modified<F>(&self, modify: F) -> bool
1100    where
1101        F: FnOnce(&mut T) -> bool,
1102    {
1103        {
1104            // Acquire the write lock and update the value.
1105            let mut lock = self.shared.value.write();
1106
1107            // Update the value and catch possible panic inside func.
1108            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1109            match result {
1110                Ok(modified) => {
1111                    if !modified {
1112                        // Abort, i.e. don't notify receivers if unmodified
1113                        return false;
1114                    }
1115                    // Continue if modified
1116                }
1117                Err(panicked) => {
1118                    // Drop the lock to avoid poisoning it.
1119                    drop(lock);
1120                    // Forward the panic to the caller.
1121                    panic::resume_unwind(panicked);
1122                    // Unreachable
1123                }
1124            };
1125
1126            self.shared.state.increment_version_while_locked();
1127
1128            // Release the write lock.
1129            //
1130            // Incrementing the version counter while holding the lock ensures
1131            // that receivers are able to figure out the version number of the
1132            // value they are currently looking at.
1133            drop(lock);
1134        }
1135
1136        self.shared.notify_rx.notify_waiters();
1137
1138        true
1139    }
1140
1141    /// Sends a new value via the channel, notifying all receivers and returning
1142    /// the previous value in the channel.
1143    ///
1144    /// This can be useful for reusing the buffers inside a watched value.
1145    /// Additionally, this method permits sending values even when there are no
1146    /// receivers.
1147    ///
1148    /// # Examples
1149    ///
1150    /// ```
1151    /// use tokio::sync::watch;
1152    ///
1153    /// let (tx, _rx) = watch::channel(1);
1154    /// assert_eq!(tx.send_replace(2), 1);
1155    /// assert_eq!(tx.send_replace(3), 2);
1156    /// ```
1157    pub fn send_replace(&self, mut value: T) -> T {
1158        // swap old watched value with the new one
1159        self.send_modify(|old| mem::swap(old, &mut value));
1160
1161        value
1162    }
1163
1164    /// Returns a reference to the most recently sent value
1165    ///
1166    /// Outstanding borrows hold a read lock on the inner value. This means that
1167    /// long-lived borrows could cause the producer half to block. It is recommended
1168    /// to keep the borrow as short-lived as possible. Additionally, if you are
1169    /// running in an environment that allows `!Send` futures, you must ensure that
1170    /// the returned `Ref` type is never held alive across an `.await` point,
1171    /// otherwise, it can lead to a deadlock.
1172    ///
1173    /// # Examples
1174    ///
1175    /// ```
1176    /// use tokio::sync::watch;
1177    ///
1178    /// let (tx, _) = watch::channel("hello");
1179    /// assert_eq!(*tx.borrow(), "hello");
1180    /// ```
1181    pub fn borrow(&self) -> Ref<'_, T> {
1182        let inner = self.shared.value.read();
1183
1184        // The sender/producer always sees the current version
1185        let has_changed = false;
1186
1187        Ref { inner, has_changed }
1188    }
1189
1190    /// Checks if the channel has been closed. This happens when all receivers
1191    /// have dropped.
1192    ///
1193    /// # Examples
1194    ///
1195    /// ```
1196    /// let (tx, rx) = tokio::sync::watch::channel(());
1197    /// assert!(!tx.is_closed());
1198    ///
1199    /// drop(rx);
1200    /// assert!(tx.is_closed());
1201    /// ```
1202    pub fn is_closed(&self) -> bool {
1203        self.receiver_count() == 0
1204    }
1205
1206    /// Completes when all receivers have dropped.
1207    ///
1208    /// This allows the producer to get notified when interest in the produced
1209    /// values is canceled and immediately stop doing work. Once a channel is
1210    /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1211    /// get a new receiver.
1212    ///
1213    /// If the channel becomes closed for a brief amount of time (e.g., the last
1214    /// receiver is dropped and then `subscribe` is called), then this call to
1215    /// `closed` might return, but it is also possible that it does not "notice"
1216    /// that the channel was closed for a brief amount of time.
1217    ///
1218    /// # Cancel safety
1219    ///
1220    /// This method is cancel safe.
1221    ///
1222    /// # Examples
1223    ///
1224    /// ```
1225    /// use tokio::sync::watch;
1226    ///
1227    /// #[tokio::main]
1228    /// async fn main() {
1229    ///     let (tx, rx) = watch::channel("hello");
1230    ///
1231    ///     tokio::spawn(async move {
1232    ///         // use `rx`
1233    ///         drop(rx);
1234    ///     });
1235    ///
1236    ///     // Waits for `rx` to drop
1237    ///     tx.closed().await;
1238    ///     println!("the `rx` handles dropped")
1239    /// }
1240    /// ```
1241    pub async fn closed(&self) {
1242        cooperative(async {
1243            crate::trace::async_trace_leaf().await;
1244
1245            while self.receiver_count() > 0 {
1246                let notified = self.shared.notify_tx.notified();
1247
1248                if self.receiver_count() == 0 {
1249                    return;
1250                }
1251
1252                notified.await;
1253                // The channel could have been reopened in the meantime by calling
1254                // `subscribe`, so we loop again.
1255            }
1256        })
1257        .await;
1258    }
1259
1260    /// Creates a new [`Receiver`] connected to this `Sender`.
1261    ///
1262    /// All messages sent before this call to `subscribe` are initially marked
1263    /// as seen by the new `Receiver`.
1264    ///
1265    /// This method can be called even if there are no other receivers. In this
1266    /// case, the channel is reopened.
1267    ///
1268    /// # Examples
1269    ///
1270    /// The new channel will receive messages sent on this `Sender`.
1271    ///
1272    /// ```
1273    /// use tokio::sync::watch;
1274    ///
1275    /// #[tokio::main]
1276    /// async fn main() {
1277    ///     let (tx, _rx) = watch::channel(0u64);
1278    ///
1279    ///     tx.send(5).unwrap();
1280    ///
1281    ///     let rx = tx.subscribe();
1282    ///     assert_eq!(5, *rx.borrow());
1283    ///
1284    ///     tx.send(10).unwrap();
1285    ///     assert_eq!(10, *rx.borrow());
1286    /// }
1287    /// ```
1288    ///
1289    /// The most recent message is considered seen by the channel, so this test
1290    /// is guaranteed to pass.
1291    ///
1292    /// ```
1293    /// use tokio::sync::watch;
1294    /// use tokio::time::Duration;
1295    ///
1296    /// #[tokio::main]
1297    /// async fn main() {
1298    ///     let (tx, _rx) = watch::channel(0u64);
1299    ///     tx.send(5).unwrap();
1300    ///     let mut rx = tx.subscribe();
1301    ///
1302    ///     tokio::spawn(async move {
1303    ///         // by spawning and sleeping, the message is sent after `main`
1304    ///         // hits the call to `changed`.
1305    ///         # if false {
1306    ///         tokio::time::sleep(Duration::from_millis(10)).await;
1307    ///         # }
1308    ///         tx.send(100).unwrap();
1309    ///     });
1310    ///
1311    ///     rx.changed().await.unwrap();
1312    ///     assert_eq!(100, *rx.borrow());
1313    /// }
1314    /// ```
1315    pub fn subscribe(&self) -> Receiver<T> {
1316        let shared = self.shared.clone();
1317        let version = shared.state.load().version();
1318
1319        // The CLOSED bit in the state tracks only whether the sender is
1320        // dropped, so we do not need to unset it if this reopens the channel.
1321        Receiver::from_shared(version, shared)
1322    }
1323
1324    /// Returns the number of receivers that currently exist.
1325    ///
1326    /// # Examples
1327    ///
1328    /// ```
1329    /// use tokio::sync::watch;
1330    ///
1331    /// #[tokio::main]
1332    /// async fn main() {
1333    ///     let (tx, rx1) = watch::channel("hello");
1334    ///
1335    ///     assert_eq!(1, tx.receiver_count());
1336    ///
1337    ///     let mut _rx2 = rx1.clone();
1338    ///
1339    ///     assert_eq!(2, tx.receiver_count());
1340    /// }
1341    /// ```
1342    pub fn receiver_count(&self) -> usize {
1343        self.shared.ref_count_rx.load(Relaxed)
1344    }
1345
1346    /// Returns the number of senders that currently exist.
1347    ///
1348    /// # Examples
1349    ///
1350    /// ```
1351    /// use tokio::sync::watch;
1352    ///
1353    /// #[tokio::main]
1354    /// async fn main() {
1355    ///     let (tx1, rx) = watch::channel("hello");
1356    ///
1357    ///     assert_eq!(1, tx1.sender_count());
1358    ///
1359    ///     let tx2 = tx1.clone();
1360    ///
1361    ///     assert_eq!(2, tx1.sender_count());
1362    ///     assert_eq!(2, tx2.sender_count());
1363    /// }
1364    /// ```
1365    pub fn sender_count(&self) -> usize {
1366        self.shared.ref_count_tx.load(Relaxed)
1367    }
1368
1369    /// Returns `true` if senders belong to the same channel.
1370    ///
1371    /// # Examples
1372    ///
1373    /// ```
1374    /// let (tx, rx) = tokio::sync::watch::channel(true);
1375    /// let tx2 = tx.clone();
1376    /// assert!(tx.same_channel(&tx2));
1377    ///
1378    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1379    /// assert!(!tx3.same_channel(&tx2));
1380    /// ```
1381    pub fn same_channel(&self, other: &Self) -> bool {
1382        Arc::ptr_eq(&self.shared, &other.shared)
1383    }
1384}
1385
1386impl<T> Drop for Sender<T> {
1387    fn drop(&mut self) {
1388        if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1389            self.shared.state.set_closed();
1390            self.shared.notify_rx.notify_waiters();
1391        }
1392    }
1393}
1394
1395// ===== impl Ref =====
1396
1397impl<T> ops::Deref for Ref<'_, T> {
1398    type Target = T;
1399
1400    fn deref(&self) -> &T {
1401        self.inner.deref()
1402    }
1403}
1404
1405#[cfg(all(test, loom))]
1406mod tests {
1407    use futures::future::FutureExt;
1408    use loom::thread;
1409
1410    // test for https://github.com/tokio-rs/tokio/issues/3168
1411    #[test]
1412    fn watch_spurious_wakeup() {
1413        loom::model(|| {
1414            let (send, mut recv) = crate::sync::watch::channel(0i32);
1415
1416            send.send(1).unwrap();
1417
1418            let send_thread = thread::spawn(move || {
1419                send.send(2).unwrap();
1420                send
1421            });
1422
1423            recv.changed().now_or_never();
1424
1425            let send = send_thread.join().unwrap();
1426            let recv_thread = thread::spawn(move || {
1427                recv.changed().now_or_never();
1428                recv.changed().now_or_never();
1429                recv
1430            });
1431
1432            send.send(3).unwrap();
1433
1434            let mut recv = recv_thread.join().unwrap();
1435            let send_thread = thread::spawn(move || {
1436                send.send(2).unwrap();
1437            });
1438
1439            recv.changed().now_or_never();
1440
1441            send_thread.join().unwrap();
1442        });
1443    }
1444
1445    #[test]
1446    fn watch_borrow() {
1447        loom::model(|| {
1448            let (send, mut recv) = crate::sync::watch::channel(0i32);
1449
1450            assert!(send.borrow().eq(&0));
1451            assert!(recv.borrow().eq(&0));
1452
1453            send.send(1).unwrap();
1454            assert!(send.borrow().eq(&1));
1455
1456            let send_thread = thread::spawn(move || {
1457                send.send(2).unwrap();
1458                send
1459            });
1460
1461            recv.changed().now_or_never();
1462
1463            let send = send_thread.join().unwrap();
1464            let recv_thread = thread::spawn(move || {
1465                recv.changed().now_or_never();
1466                recv.changed().now_or_never();
1467                recv
1468            });
1469
1470            send.send(3).unwrap();
1471
1472            let recv = recv_thread.join().unwrap();
1473            assert!(recv.borrow().eq(&3));
1474            assert!(send.borrow().eq(&3));
1475
1476            send.send(2).unwrap();
1477
1478            thread::spawn(move || {
1479                assert!(recv.borrow().eq(&2));
1480            });
1481            assert!(send.borrow().eq(&2));
1482        });
1483    }
1484}