tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//!     let (tx, mut rx1) = broadcast::channel(16);
79//!     let mut rx2 = tx.subscribe();
80//!
81//!     tokio::spawn(async move {
82//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84//!     });
85//!
86//!     tokio::spawn(async move {
87//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89//!     });
90//!
91//!     tx.send(10).unwrap();
92//!     tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//!     let (tx, mut rx) = broadcast::channel(2);
104//!
105//!     tx.send(10).unwrap();
106//!     tx.send(20).unwrap();
107//!     tx.send(30).unwrap();
108//!
109//!     // The receiver lagged behind
110//!     assert!(rx.recv().await.is_err());
111//!
112//!     // At this point, we can abort or continue with lost messages
113//!
114//!     assert_eq!(20, rx.recv().await.unwrap());
115//!     assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::task::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146///     let (tx, mut rx1) = broadcast::channel(16);
147///     let mut rx2 = tx.subscribe();
148///
149///     tokio::spawn(async move {
150///         assert_eq!(rx1.recv().await.unwrap(), 10);
151///         assert_eq!(rx1.recv().await.unwrap(), 20);
152///     });
153///
154///     tokio::spawn(async move {
155///         assert_eq!(rx2.recv().await.unwrap(), 10);
156///         assert_eq!(rx2.recv().await.unwrap(), 20);
157///     });
158///
159///     tx.send(10).unwrap();
160///     tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166    shared: Arc<Shared<T>>,
167}
168
169/// A sender that does not prevent the channel from being closed.
170///
171/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172/// instances remain, the channel is closed.
173///
174/// In order to send messages, the `WeakSender` needs to be upgraded using
175/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177///
178/// [`Sender`]: Sender
179/// [`WeakSender::upgrade`]: WeakSender::upgrade
180///
181/// # Examples
182///
183/// ```
184/// use tokio::sync::broadcast::channel;
185///
186/// #[tokio::main]
187/// async fn main() {
188///     let (tx, _rx) = channel::<i32>(15);
189///     let tx_weak = tx.downgrade();
190///
191///     // Upgrading will succeed because `tx` still exists.
192///     assert!(tx_weak.upgrade().is_some());
193///
194///     // If we drop `tx`, then it will fail.
195///     drop(tx);
196///     assert!(tx_weak.clone().upgrade().is_none());
197/// }
198/// ```
199pub struct WeakSender<T> {
200    shared: Arc<Shared<T>>,
201}
202
203/// Receiving-half of the [`broadcast`] channel.
204///
205/// Must not be used concurrently. Messages may be retrieved using
206/// [`recv`][Receiver::recv].
207///
208/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209/// wrapper.
210///
211/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212///
213/// # Examples
214///
215/// ```
216/// use tokio::sync::broadcast;
217///
218/// #[tokio::main]
219/// async fn main() {
220///     let (tx, mut rx1) = broadcast::channel(16);
221///     let mut rx2 = tx.subscribe();
222///
223///     tokio::spawn(async move {
224///         assert_eq!(rx1.recv().await.unwrap(), 10);
225///         assert_eq!(rx1.recv().await.unwrap(), 20);
226///     });
227///
228///     tokio::spawn(async move {
229///         assert_eq!(rx2.recv().await.unwrap(), 10);
230///         assert_eq!(rx2.recv().await.unwrap(), 20);
231///     });
232///
233///     tx.send(10).unwrap();
234///     tx.send(20).unwrap();
235/// }
236/// ```
237///
238/// [`broadcast`]: crate::sync::broadcast
239pub struct Receiver<T> {
240    /// State shared with all receivers and senders.
241    shared: Arc<Shared<T>>,
242
243    /// Next position to read from
244    next: u64,
245}
246
247pub mod error {
248    //! Broadcast error types
249
250    use std::fmt;
251
252    /// Error returned by the [`send`] function on a [`Sender`].
253    ///
254    /// A **send** operation can only fail if there are no active receivers,
255    /// implying that the message could never be received. The error contains the
256    /// message being sent as a payload so it can be recovered.
257    ///
258    /// [`send`]: crate::sync::broadcast::Sender::send
259    /// [`Sender`]: crate::sync::broadcast::Sender
260    #[derive(Debug)]
261    pub struct SendError<T>(pub T);
262
263    impl<T> fmt::Display for SendError<T> {
264        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265            write!(f, "channel closed")
266        }
267    }
268
269    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271    /// An error returned from the [`recv`] function on a [`Receiver`].
272    ///
273    /// [`recv`]: crate::sync::broadcast::Receiver::recv
274    /// [`Receiver`]: crate::sync::broadcast::Receiver
275    #[derive(Debug, PartialEq, Eq, Clone)]
276    pub enum RecvError {
277        /// There are no more active senders implying no further messages will ever
278        /// be sent.
279        Closed,
280
281        /// The receiver lagged too far behind. Attempting to receive again will
282        /// return the oldest message still retained by the channel.
283        ///
284        /// Includes the number of skipped messages.
285        Lagged(u64),
286    }
287
288    impl fmt::Display for RecvError {
289        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290            match self {
291                RecvError::Closed => write!(f, "channel closed"),
292                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293            }
294        }
295    }
296
297    impl std::error::Error for RecvError {}
298
299    /// An error returned from the [`try_recv`] function on a [`Receiver`].
300    ///
301    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302    /// [`Receiver`]: crate::sync::broadcast::Receiver
303    #[derive(Debug, PartialEq, Eq, Clone)]
304    pub enum TryRecvError {
305        /// The channel is currently empty. There are still active
306        /// [`Sender`] handles, so data may yet become available.
307        ///
308        /// [`Sender`]: crate::sync::broadcast::Sender
309        Empty,
310
311        /// There are no more active senders implying no further messages will ever
312        /// be sent.
313        Closed,
314
315        /// The receiver lagged too far behind and has been forcibly disconnected.
316        /// Attempting to receive again will return the oldest message still
317        /// retained by the channel.
318        ///
319        /// Includes the number of skipped messages.
320        Lagged(u64),
321    }
322
323    impl fmt::Display for TryRecvError {
324        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325            match self {
326                TryRecvError::Empty => write!(f, "channel empty"),
327                TryRecvError::Closed => write!(f, "channel closed"),
328                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329            }
330        }
331    }
332
333    impl std::error::Error for TryRecvError {}
334}
335
336use self::error::{RecvError, SendError, TryRecvError};
337
338use super::Notify;
339
340/// Data shared between senders and receivers.
341struct Shared<T> {
342    /// slots in the channel.
343    buffer: Box<[RwLock<Slot<T>>]>,
344
345    /// Mask a position -> index.
346    mask: usize,
347
348    /// Tail of the queue. Includes the rx wait list.
349    tail: Mutex<Tail>,
350
351    /// Number of outstanding Sender handles.
352    num_tx: AtomicUsize,
353
354    /// Number of outstanding weak Sender handles.
355    num_weak_tx: AtomicUsize,
356
357    /// Notify when the last subscribed [`Receiver`] drops.
358    notify_last_rx_drop: Notify,
359}
360
361/// Next position to write a value.
362struct Tail {
363    /// Next position to write to.
364    pos: u64,
365
366    /// Number of active receivers.
367    rx_cnt: usize,
368
369    /// True if the channel is closed.
370    closed: bool,
371
372    /// Receivers waiting for a value.
373    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374}
375
376/// Slot in the buffer.
377struct Slot<T> {
378    /// Remaining number of receivers that are expected to see this value.
379    ///
380    /// When this goes to zero, the value is released.
381    ///
382    /// An atomic is used as it is mutated concurrently with the slot read lock
383    /// acquired.
384    rem: AtomicUsize,
385
386    /// Uniquely identifies the `send` stored in the slot.
387    pos: u64,
388
389    /// The value being broadcast.
390    ///
391    /// The value is set by `send` when the write lock is held. When a reader
392    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393    val: UnsafeCell<Option<T>>,
394}
395
396/// An entry in the wait queue.
397struct Waiter {
398    /// True if queued.
399    queued: AtomicBool,
400
401    /// Task waiting on the broadcast channel.
402    waker: Option<Waker>,
403
404    /// Intrusive linked-list pointers.
405    pointers: linked_list::Pointers<Waiter>,
406
407    /// Should not be `Unpin`.
408    _p: PhantomPinned,
409}
410
411impl Waiter {
412    fn new() -> Self {
413        Self {
414            queued: AtomicBool::new(false),
415            waker: None,
416            pointers: linked_list::Pointers::new(),
417            _p: PhantomPinned,
418        }
419    }
420}
421
422generate_addr_of_methods! {
423    impl<> Waiter {
424        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425            &self.pointers
426        }
427    }
428}
429
430struct RecvGuard<'a, T> {
431    slot: RwLockReadGuard<'a, Slot<T>>,
432}
433
434/// Receive a value future.
435struct Recv<'a, T> {
436    /// Receiver being waited on.
437    receiver: &'a mut Receiver<T>,
438
439    /// Entry in the waiter `LinkedList`.
440    waiter: UnsafeCell<Waiter>,
441}
442
443unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
444unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
445
446/// Max number of receivers. Reserve space to lock.
447const MAX_RECEIVERS: usize = usize::MAX >> 2;
448
449/// Create a bounded, multi-producer, multi-consumer channel where each sent
450/// value is broadcasted to all active receivers.
451///
452/// **Note:** The actual capacity may be greater than the provided `capacity`.
453///
454/// All data sent on [`Sender`] will become available on every active
455/// [`Receiver`] in the same order as it was sent.
456///
457/// The `Sender` can be cloned to `send` to the same channel from multiple
458/// points in the process or it can be used concurrently from an `Arc`. New
459/// `Receiver` handles are created by calling [`Sender::subscribe`].
460///
461/// If all [`Receiver`] handles are dropped, the `send` method will return a
462/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
463/// method will return a [`RecvError`].
464///
465/// [`Sender`]: crate::sync::broadcast::Sender
466/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
467/// [`Receiver`]: crate::sync::broadcast::Receiver
468/// [`recv`]: crate::sync::broadcast::Receiver::recv
469/// [`SendError`]: crate::sync::broadcast::error::SendError
470/// [`RecvError`]: crate::sync::broadcast::error::RecvError
471///
472/// # Examples
473///
474/// ```
475/// use tokio::sync::broadcast;
476///
477/// #[tokio::main]
478/// async fn main() {
479///     let (tx, mut rx1) = broadcast::channel(16);
480///     let mut rx2 = tx.subscribe();
481///
482///     tokio::spawn(async move {
483///         assert_eq!(rx1.recv().await.unwrap(), 10);
484///         assert_eq!(rx1.recv().await.unwrap(), 20);
485///     });
486///
487///     tokio::spawn(async move {
488///         assert_eq!(rx2.recv().await.unwrap(), 10);
489///         assert_eq!(rx2.recv().await.unwrap(), 20);
490///     });
491///
492///     tx.send(10).unwrap();
493///     tx.send(20).unwrap();
494/// }
495/// ```
496///
497/// # Panics
498///
499/// This will panic if `capacity` is equal to `0` or larger
500/// than `usize::MAX / 2`.
501#[track_caller]
502pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
503    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
504    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
505    let rx = Receiver {
506        shared: tx.shared.clone(),
507        next: 0,
508    };
509    (tx, rx)
510}
511
512unsafe impl<T: Send> Send for Sender<T> {}
513unsafe impl<T: Send> Sync for Sender<T> {}
514
515unsafe impl<T: Send> Send for WeakSender<T> {}
516unsafe impl<T: Send> Sync for WeakSender<T> {}
517
518unsafe impl<T: Send> Send for Receiver<T> {}
519unsafe impl<T: Send> Sync for Receiver<T> {}
520
521impl<T> Sender<T> {
522    /// Creates the sending-half of the [`broadcast`] channel.
523    ///
524    /// See the documentation of [`broadcast::channel`] for more information on this method.
525    ///
526    /// [`broadcast`]: crate::sync::broadcast
527    /// [`broadcast::channel`]: crate::sync::broadcast::channel
528    #[track_caller]
529    pub fn new(capacity: usize) -> Self {
530        // SAFETY: We don't create extra receivers, so there are 0.
531        unsafe { Self::new_with_receiver_count(0, capacity) }
532    }
533
534    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
535    /// count.
536    ///
537    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
538    /// calling this function.
539    ///
540    /// # Safety:
541    ///
542    /// The caller must ensure that the amount of receivers for this Sender is correct before
543    /// the channel functionalities are used, the count is zero by default, as this function
544    /// does not create any receivers by itself.
545    #[track_caller]
546    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
547        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
548        assert!(
549            capacity <= usize::MAX >> 1,
550            "broadcast channel capacity exceeded `usize::MAX / 2`"
551        );
552
553        // Round to a power of two
554        capacity = capacity.next_power_of_two();
555
556        let mut buffer = Vec::with_capacity(capacity);
557
558        for i in 0..capacity {
559            buffer.push(RwLock::new(Slot {
560                rem: AtomicUsize::new(0),
561                pos: (i as u64).wrapping_sub(capacity as u64),
562                val: UnsafeCell::new(None),
563            }));
564        }
565
566        let shared = Arc::new(Shared {
567            buffer: buffer.into_boxed_slice(),
568            mask: capacity - 1,
569            tail: Mutex::new(Tail {
570                pos: 0,
571                rx_cnt: receiver_count,
572                closed: false,
573                waiters: LinkedList::new(),
574            }),
575            num_tx: AtomicUsize::new(1),
576            num_weak_tx: AtomicUsize::new(0),
577            notify_last_rx_drop: Notify::new(),
578        });
579
580        Sender { shared }
581    }
582
583    /// Attempts to send a value to all active [`Receiver`] handles, returning
584    /// it back if it could not be sent.
585    ///
586    /// A successful send occurs when there is at least one active [`Receiver`]
587    /// handle. An unsuccessful send would be one where all associated
588    /// [`Receiver`] handles have already been dropped.
589    ///
590    /// # Return
591    ///
592    /// On success, the number of subscribed [`Receiver`] handles is returned.
593    /// This does not mean that this number of receivers will see the message as
594    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
595    /// the message.
596    ///
597    /// # Note
598    ///
599    /// A return value of `Ok` **does not** mean that the sent value will be
600    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
601    /// handles may be dropped before receiving the sent message.
602    ///
603    /// A return value of `Err` **does not** mean that future calls to `send`
604    /// will fail. New [`Receiver`] handles may be created by calling
605    /// [`subscribe`].
606    ///
607    /// [`Receiver`]: crate::sync::broadcast::Receiver
608    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
609    ///
610    /// # Examples
611    ///
612    /// ```
613    /// use tokio::sync::broadcast;
614    ///
615    /// #[tokio::main]
616    /// async fn main() {
617    ///     let (tx, mut rx1) = broadcast::channel(16);
618    ///     let mut rx2 = tx.subscribe();
619    ///
620    ///     tokio::spawn(async move {
621    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
622    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
623    ///     });
624    ///
625    ///     tokio::spawn(async move {
626    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
627    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
628    ///     });
629    ///
630    ///     tx.send(10).unwrap();
631    ///     tx.send(20).unwrap();
632    /// }
633    /// ```
634    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
635        let mut tail = self.shared.tail.lock();
636
637        if tail.rx_cnt == 0 {
638            return Err(SendError(value));
639        }
640
641        // Position to write into
642        let pos = tail.pos;
643        let rem = tail.rx_cnt;
644        let idx = (pos & self.shared.mask as u64) as usize;
645
646        // Update the tail position
647        tail.pos = tail.pos.wrapping_add(1);
648
649        // Get the slot
650        let mut slot = self.shared.buffer[idx].write();
651
652        // Track the position
653        slot.pos = pos;
654
655        // Set remaining receivers
656        slot.rem.with_mut(|v| *v = rem);
657
658        // Write the value
659        slot.val = UnsafeCell::new(Some(value));
660
661        // Release the slot lock before notifying the receivers.
662        drop(slot);
663
664        // Notify and release the mutex. This must happen after the slot lock is
665        // released, otherwise the writer lock bit could be cleared while another
666        // thread is in the critical section.
667        self.shared.notify_rx(tail);
668
669        Ok(rem)
670    }
671
672    /// Creates a new [`Receiver`] handle that will receive values sent **after**
673    /// this call to `subscribe`.
674    ///
675    /// # Examples
676    ///
677    /// ```
678    /// use tokio::sync::broadcast;
679    ///
680    /// #[tokio::main]
681    /// async fn main() {
682    ///     let (tx, _rx) = broadcast::channel(16);
683    ///
684    ///     // Will not be seen
685    ///     tx.send(10).unwrap();
686    ///
687    ///     let mut rx = tx.subscribe();
688    ///
689    ///     tx.send(20).unwrap();
690    ///
691    ///     let value = rx.recv().await.unwrap();
692    ///     assert_eq!(20, value);
693    /// }
694    /// ```
695    pub fn subscribe(&self) -> Receiver<T> {
696        let shared = self.shared.clone();
697        new_receiver(shared)
698    }
699
700    /// Converts the `Sender` to a [`WeakSender`] that does not count
701    /// towards RAII semantics, i.e. if all `Sender` instances of the
702    /// channel were dropped and only `WeakSender` instances remain,
703    /// the channel is closed.
704    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
705    pub fn downgrade(&self) -> WeakSender<T> {
706        self.shared.num_weak_tx.fetch_add(1, Relaxed);
707        WeakSender {
708            shared: self.shared.clone(),
709        }
710    }
711
712    /// Returns the number of queued values.
713    ///
714    /// A value is queued until it has either been seen by all receivers that were alive at the time
715    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
716    /// queue's capacity.
717    ///
718    /// # Note
719    ///
720    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
721    /// have been evicted from the queue before being seen by all receivers.
722    ///
723    /// # Examples
724    ///
725    /// ```
726    /// use tokio::sync::broadcast;
727    ///
728    /// #[tokio::main]
729    /// async fn main() {
730    ///     let (tx, mut rx1) = broadcast::channel(16);
731    ///     let mut rx2 = tx.subscribe();
732    ///
733    ///     tx.send(10).unwrap();
734    ///     tx.send(20).unwrap();
735    ///     tx.send(30).unwrap();
736    ///
737    ///     assert_eq!(tx.len(), 3);
738    ///
739    ///     rx1.recv().await.unwrap();
740    ///
741    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
742    ///     assert_eq!(tx.len(), 3);
743    ///
744    ///     rx2.recv().await.unwrap();
745    ///
746    ///     assert_eq!(tx.len(), 2);
747    /// }
748    /// ```
749    pub fn len(&self) -> usize {
750        let tail = self.shared.tail.lock();
751
752        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
753        let mut low = 0;
754        let mut high = self.shared.buffer.len();
755        while low < high {
756            let mid = low + (high - low) / 2;
757            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
758            if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
759                low = mid + 1;
760            } else {
761                high = mid;
762            }
763        }
764
765        self.shared.buffer.len() - low
766    }
767
768    /// Returns true if there are no queued values.
769    ///
770    /// # Examples
771    ///
772    /// ```
773    /// use tokio::sync::broadcast;
774    ///
775    /// #[tokio::main]
776    /// async fn main() {
777    ///     let (tx, mut rx1) = broadcast::channel(16);
778    ///     let mut rx2 = tx.subscribe();
779    ///
780    ///     assert!(tx.is_empty());
781    ///
782    ///     tx.send(10).unwrap();
783    ///
784    ///     assert!(!tx.is_empty());
785    ///
786    ///     rx1.recv().await.unwrap();
787    ///
788    ///     // The queue is still not empty since rx2 hasn't seen the value.
789    ///     assert!(!tx.is_empty());
790    ///
791    ///     rx2.recv().await.unwrap();
792    ///
793    ///     assert!(tx.is_empty());
794    /// }
795    /// ```
796    pub fn is_empty(&self) -> bool {
797        let tail = self.shared.tail.lock();
798
799        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
800        self.shared.buffer[idx].read().rem.load(SeqCst) == 0
801    }
802
803    /// Returns the number of active receivers.
804    ///
805    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
806    /// [`subscribe`]. These are the handles that will receive values sent on
807    /// this [`Sender`].
808    ///
809    /// # Note
810    ///
811    /// It is not guaranteed that a sent message will reach this number of
812    /// receivers. Active receivers may never call [`recv`] again before
813    /// dropping.
814    ///
815    /// [`recv`]: crate::sync::broadcast::Receiver::recv
816    /// [`Receiver`]: crate::sync::broadcast::Receiver
817    /// [`Sender`]: crate::sync::broadcast::Sender
818    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
819    /// [`channel`]: crate::sync::broadcast::channel
820    ///
821    /// # Examples
822    ///
823    /// ```
824    /// use tokio::sync::broadcast;
825    ///
826    /// #[tokio::main]
827    /// async fn main() {
828    ///     let (tx, _rx1) = broadcast::channel(16);
829    ///
830    ///     assert_eq!(1, tx.receiver_count());
831    ///
832    ///     let mut _rx2 = tx.subscribe();
833    ///
834    ///     assert_eq!(2, tx.receiver_count());
835    ///
836    ///     tx.send(10).unwrap();
837    /// }
838    /// ```
839    pub fn receiver_count(&self) -> usize {
840        let tail = self.shared.tail.lock();
841        tail.rx_cnt
842    }
843
844    /// Returns `true` if senders belong to the same channel.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// use tokio::sync::broadcast;
850    ///
851    /// #[tokio::main]
852    /// async fn main() {
853    ///     let (tx, _rx) = broadcast::channel::<()>(16);
854    ///     let tx2 = tx.clone();
855    ///
856    ///     assert!(tx.same_channel(&tx2));
857    ///
858    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
859    ///
860    ///     assert!(!tx3.same_channel(&tx2));
861    /// }
862    /// ```
863    pub fn same_channel(&self, other: &Self) -> bool {
864        Arc::ptr_eq(&self.shared, &other.shared)
865    }
866
867    /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
868    /// zero.
869    ///
870    /// # Examples
871    ///
872    /// ```
873    /// use futures::FutureExt;
874    /// use tokio::sync::broadcast;
875    ///
876    /// #[tokio::main]
877    /// async fn main() {
878    ///     let (tx, mut rx1) = broadcast::channel::<u32>(16);
879    ///     let mut rx2 = tx.subscribe();
880    ///
881    ///     let _ = tx.send(10);
882    ///
883    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
884    ///     drop(rx1);
885    ///     assert!(tx.closed().now_or_never().is_none());
886    ///
887    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
888    ///     drop(rx2);
889    ///     assert!(tx.closed().now_or_never().is_some());
890    /// }
891    /// ```
892    pub async fn closed(&self) {
893        loop {
894            let notified = self.shared.notify_last_rx_drop.notified();
895
896            {
897                // Ensure the lock drops if the channel isn't closed
898                let tail = self.shared.tail.lock();
899                if tail.closed {
900                    return;
901                }
902            }
903
904            notified.await;
905        }
906    }
907
908    fn close_channel(&self) {
909        let mut tail = self.shared.tail.lock();
910        tail.closed = true;
911
912        self.shared.notify_rx(tail);
913    }
914
915    /// Returns the number of [`Sender`] handles.
916    pub fn strong_count(&self) -> usize {
917        self.shared.num_tx.load(Acquire)
918    }
919
920    /// Returns the number of [`WeakSender`] handles.
921    pub fn weak_count(&self) -> usize {
922        self.shared.num_weak_tx.load(Acquire)
923    }
924}
925
926/// Create a new `Receiver` which reads starting from the tail.
927fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
928    let mut tail = shared.tail.lock();
929
930    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
931
932    if tail.rx_cnt == 0 {
933        // Potentially need to re-open the channel, if a new receiver has been added between calls
934        // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
935        // applies if the sender has been dropped
936        tail.closed = false;
937    }
938
939    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
940    let next = tail.pos;
941
942    drop(tail);
943
944    Receiver { shared, next }
945}
946
947/// List used in `Shared::notify_rx`. It wraps a guarded linked list
948/// and gates the access to it on the `Shared.tail` mutex. It also empties
949/// the list on drop.
950struct WaitersList<'a, T> {
951    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
952    is_empty: bool,
953    shared: &'a Shared<T>,
954}
955
956impl<'a, T> Drop for WaitersList<'a, T> {
957    fn drop(&mut self) {
958        // If the list is not empty, we unlink all waiters from it.
959        // We do not wake the waiters to avoid double panics.
960        if !self.is_empty {
961            let _lock_guard = self.shared.tail.lock();
962            while self.list.pop_back().is_some() {}
963        }
964    }
965}
966
967impl<'a, T> WaitersList<'a, T> {
968    fn new(
969        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
970        guard: Pin<&'a Waiter>,
971        shared: &'a Shared<T>,
972    ) -> Self {
973        let guard_ptr = NonNull::from(guard.get_ref());
974        let list = unguarded_list.into_guarded(guard_ptr);
975        WaitersList {
976            list,
977            is_empty: false,
978            shared,
979        }
980    }
981
982    /// Removes the last element from the guarded list. Modifying this list
983    /// requires an exclusive access to the main list in `Notify`.
984    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
985        let result = self.list.pop_back();
986        if result.is_none() {
987            // Save information about emptiness to avoid waiting for lock
988            // in the destructor.
989            self.is_empty = true;
990        }
991        result
992    }
993}
994
995impl<T> Shared<T> {
996    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
997        // It is critical for `GuardedLinkedList` safety that the guard node is
998        // pinned in memory and is not dropped until the guarded list is dropped.
999        let guard = Waiter::new();
1000        pin!(guard);
1001
1002        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
1003        // underneath to allow every waiter to safely remove itself from it.
1004        //
1005        // * This list will be still guarded by the `waiters` lock.
1006        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1007        // * This wrapper will empty the list on drop. It is critical for safety
1008        //   that we will not leave any list entry with a pointer to the local
1009        //   guard node after this function returns / panics.
1010        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1011
1012        let mut wakers = WakeList::new();
1013        'outer: loop {
1014            while wakers.can_push() {
1015                match list.pop_back_locked(&mut tail) {
1016                    Some(waiter) => {
1017                        unsafe {
1018                            // Safety: accessing `waker` is safe because
1019                            // the tail lock is held.
1020                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1021                                wakers.push(waker);
1022                            }
1023
1024                            // Safety: `queued` is atomic.
1025                            let queued = &(*waiter.as_ptr()).queued;
1026                            // `Relaxed` suffices because the tail lock is held.
1027                            assert!(queued.load(Relaxed));
1028                            // `Release` is needed to synchronize with `Recv::drop`.
1029                            // It is critical to set this variable **after** waker
1030                            // is extracted, otherwise we may data race with `Recv::drop`.
1031                            queued.store(false, Release);
1032                        }
1033                    }
1034                    None => {
1035                        break 'outer;
1036                    }
1037                }
1038            }
1039
1040            // Release the lock before waking.
1041            drop(tail);
1042
1043            // Before we acquire the lock again all sorts of things can happen:
1044            // some waiters may remove themselves from the list and new waiters
1045            // may be added. This is fine since at worst we will unnecessarily
1046            // wake up waiters which will then queue themselves again.
1047
1048            wakers.wake_all();
1049
1050            // Acquire the lock again.
1051            tail = self.tail.lock();
1052        }
1053
1054        // Release the lock before waking.
1055        drop(tail);
1056
1057        wakers.wake_all();
1058    }
1059}
1060
1061impl<T> Clone for Sender<T> {
1062    fn clone(&self) -> Sender<T> {
1063        let shared = self.shared.clone();
1064        shared.num_tx.fetch_add(1, Relaxed);
1065
1066        Sender { shared }
1067    }
1068}
1069
1070impl<T> Drop for Sender<T> {
1071    fn drop(&mut self) {
1072        if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1073            self.close_channel();
1074        }
1075    }
1076}
1077
1078impl<T> WeakSender<T> {
1079    /// Tries to convert a `WeakSender` into a [`Sender`].
1080    ///
1081    /// This will return `Some` if there are other `Sender` instances alive and
1082    /// the channel wasn't previously dropped, otherwise `None` is returned.
1083    #[must_use]
1084    pub fn upgrade(&self) -> Option<Sender<T>> {
1085        let mut tx_count = self.shared.num_tx.load(Acquire);
1086
1087        loop {
1088            if tx_count == 0 {
1089                // channel is closed so this WeakSender can not be upgraded
1090                return None;
1091            }
1092
1093            match self
1094                .shared
1095                .num_tx
1096                .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1097            {
1098                Ok(_) => {
1099                    return Some(Sender {
1100                        shared: self.shared.clone(),
1101                    })
1102                }
1103                Err(prev_count) => tx_count = prev_count,
1104            }
1105        }
1106    }
1107
1108    /// Returns the number of [`Sender`] handles.
1109    pub fn strong_count(&self) -> usize {
1110        self.shared.num_tx.load(Acquire)
1111    }
1112
1113    /// Returns the number of [`WeakSender`] handles.
1114    pub fn weak_count(&self) -> usize {
1115        self.shared.num_weak_tx.load(Acquire)
1116    }
1117}
1118
1119impl<T> Clone for WeakSender<T> {
1120    fn clone(&self) -> WeakSender<T> {
1121        let shared = self.shared.clone();
1122        shared.num_weak_tx.fetch_add(1, Relaxed);
1123
1124        Self { shared }
1125    }
1126}
1127
1128impl<T> Drop for WeakSender<T> {
1129    fn drop(&mut self) {
1130        self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1131    }
1132}
1133
1134impl<T> Receiver<T> {
1135    /// Returns the number of messages that were sent into the channel and that
1136    /// this [`Receiver`] has yet to receive.
1137    ///
1138    /// If the returned value from `len` is larger than the next largest power of 2
1139    /// of the capacity of the channel any call to [`recv`] will return an
1140    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1141    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1142    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1143    /// values larger than 16.
1144    ///
1145    /// [`Receiver`]: crate::sync::broadcast::Receiver
1146    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1147    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1148    ///
1149    /// # Examples
1150    ///
1151    /// ```
1152    /// use tokio::sync::broadcast;
1153    ///
1154    /// #[tokio::main]
1155    /// async fn main() {
1156    ///     let (tx, mut rx1) = broadcast::channel(16);
1157    ///
1158    ///     tx.send(10).unwrap();
1159    ///     tx.send(20).unwrap();
1160    ///
1161    ///     assert_eq!(rx1.len(), 2);
1162    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1163    ///     assert_eq!(rx1.len(), 1);
1164    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1165    ///     assert_eq!(rx1.len(), 0);
1166    /// }
1167    /// ```
1168    pub fn len(&self) -> usize {
1169        let next_send_pos = self.shared.tail.lock().pos;
1170        (next_send_pos - self.next) as usize
1171    }
1172
1173    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1174    /// has yet to receive.
1175    ///
1176    /// [`Receiver]: create::sync::broadcast::Receiver
1177    ///
1178    /// # Examples
1179    ///
1180    /// ```
1181    /// use tokio::sync::broadcast;
1182    ///
1183    /// #[tokio::main]
1184    /// async fn main() {
1185    ///     let (tx, mut rx1) = broadcast::channel(16);
1186    ///
1187    ///     assert!(rx1.is_empty());
1188    ///
1189    ///     tx.send(10).unwrap();
1190    ///     tx.send(20).unwrap();
1191    ///
1192    ///     assert!(!rx1.is_empty());
1193    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1194    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1195    ///     assert!(rx1.is_empty());
1196    /// }
1197    /// ```
1198    pub fn is_empty(&self) -> bool {
1199        self.len() == 0
1200    }
1201
1202    /// Returns `true` if receivers belong to the same channel.
1203    ///
1204    /// # Examples
1205    ///
1206    /// ```
1207    /// use tokio::sync::broadcast;
1208    ///
1209    /// #[tokio::main]
1210    /// async fn main() {
1211    ///     let (tx, rx) = broadcast::channel::<()>(16);
1212    ///     let rx2 = tx.subscribe();
1213    ///
1214    ///     assert!(rx.same_channel(&rx2));
1215    ///
1216    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1217    ///
1218    ///     assert!(!rx3.same_channel(&rx2));
1219    /// }
1220    /// ```
1221    pub fn same_channel(&self, other: &Self) -> bool {
1222        Arc::ptr_eq(&self.shared, &other.shared)
1223    }
1224
1225    /// Locks the next value if there is one.
1226    fn recv_ref(
1227        &mut self,
1228        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1229    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1230        let idx = (self.next & self.shared.mask as u64) as usize;
1231
1232        // The slot holding the next value to read
1233        let mut slot = self.shared.buffer[idx].read();
1234
1235        if slot.pos != self.next {
1236            // Release the `slot` lock before attempting to acquire the `tail`
1237            // lock. This is required because `send2` acquires the tail lock
1238            // first followed by the slot lock. Acquiring the locks in reverse
1239            // order here would result in a potential deadlock: `recv_ref`
1240            // acquires the `slot` lock and attempts to acquire the `tail` lock
1241            // while `send2` acquired the `tail` lock and attempts to acquire
1242            // the slot lock.
1243            drop(slot);
1244
1245            let mut old_waker = None;
1246
1247            let mut tail = self.shared.tail.lock();
1248
1249            // Acquire slot lock again
1250            slot = self.shared.buffer[idx].read();
1251
1252            // Make sure the position did not change. This could happen in the
1253            // unlikely event that the buffer is wrapped between dropping the
1254            // read lock and acquiring the tail lock.
1255            if slot.pos != self.next {
1256                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1257
1258                if next_pos == self.next {
1259                    // At this point the channel is empty for *this* receiver. If
1260                    // it's been closed, then that's what we return, otherwise we
1261                    // set a waker and return empty.
1262                    if tail.closed {
1263                        return Err(TryRecvError::Closed);
1264                    }
1265
1266                    // Store the waker
1267                    if let Some((waiter, waker)) = waiter {
1268                        // Safety: called while locked.
1269                        unsafe {
1270                            // Only queue if not already queued
1271                            waiter.with_mut(|ptr| {
1272                                // If there is no waker **or** if the currently
1273                                // stored waker references a **different** task,
1274                                // track the tasks' waker to be notified on
1275                                // receipt of a new value.
1276                                match (*ptr).waker {
1277                                    Some(ref w) if w.will_wake(waker) => {}
1278                                    _ => {
1279                                        old_waker = std::mem::replace(
1280                                            &mut (*ptr).waker,
1281                                            Some(waker.clone()),
1282                                        );
1283                                    }
1284                                }
1285
1286                                // If the waiter is not already queued, enqueue it.
1287                                // `Relaxed` order suffices: we have synchronized with
1288                                // all writers through the tail lock that we hold.
1289                                if !(*ptr).queued.load(Relaxed) {
1290                                    // `Relaxed` order suffices: all the readers will
1291                                    // synchronize with this write through the tail lock.
1292                                    (*ptr).queued.store(true, Relaxed);
1293                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1294                                }
1295                            });
1296                        }
1297                    }
1298
1299                    // Drop the old waker after releasing the locks.
1300                    drop(slot);
1301                    drop(tail);
1302                    drop(old_waker);
1303
1304                    return Err(TryRecvError::Empty);
1305                }
1306
1307                // At this point, the receiver has lagged behind the sender by
1308                // more than the channel capacity. The receiver will attempt to
1309                // catch up by skipping dropped messages and setting the
1310                // internal cursor to the **oldest** message stored by the
1311                // channel.
1312                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1313
1314                let missed = next.wrapping_sub(self.next);
1315
1316                drop(tail);
1317
1318                // The receiver is slow but no values have been missed
1319                if missed == 0 {
1320                    self.next = self.next.wrapping_add(1);
1321
1322                    return Ok(RecvGuard { slot });
1323                }
1324
1325                self.next = next;
1326
1327                return Err(TryRecvError::Lagged(missed));
1328            }
1329        }
1330
1331        self.next = self.next.wrapping_add(1);
1332
1333        Ok(RecvGuard { slot })
1334    }
1335
1336    /// Returns the number of [`Sender`] handles.
1337    pub fn sender_strong_count(&self) -> usize {
1338        self.shared.num_tx.load(Acquire)
1339    }
1340
1341    /// Returns the number of [`WeakSender`] handles.
1342    pub fn sender_weak_count(&self) -> usize {
1343        self.shared.num_weak_tx.load(Acquire)
1344    }
1345
1346    /// Checks if a channel is closed.
1347    ///
1348    /// This method returns `true` if the channel has been closed. The channel is closed
1349    /// when all [`Sender`] have been dropped.
1350    ///
1351    /// [`Sender`]: crate::sync::broadcast::Sender
1352    ///
1353    /// # Examples
1354    /// ```
1355    /// use tokio::sync::broadcast;
1356    ///
1357    /// #[tokio::main]
1358    /// async fn main() {
1359    ///     let (tx, rx) = broadcast::channel::<()>(10);
1360    ///     assert!(!rx.is_closed());
1361    ///
1362    ///     drop(tx);
1363    ///
1364    ///     assert!(rx.is_closed());
1365    /// }
1366    /// ```
1367    pub fn is_closed(&self) -> bool {
1368        // Channel is closed when there are no strong senders left active
1369        self.shared.num_tx.load(Acquire) == 0
1370    }
1371}
1372
1373impl<T: Clone> Receiver<T> {
1374    /// Re-subscribes to the channel starting from the current tail element.
1375    ///
1376    /// This [`Receiver`] handle will receive a clone of all values sent
1377    /// **after** it has resubscribed. This will not include elements that are
1378    /// in the queue of the current receiver. Consider the following example.
1379    ///
1380    /// # Examples
1381    ///
1382    /// ```
1383    /// use tokio::sync::broadcast;
1384    ///
1385    /// #[tokio::main]
1386    /// async fn main() {
1387    ///   let (tx, mut rx) = broadcast::channel(2);
1388    ///
1389    ///   tx.send(1).unwrap();
1390    ///   let mut rx2 = rx.resubscribe();
1391    ///   tx.send(2).unwrap();
1392    ///
1393    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1394    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1395    /// }
1396    /// ```
1397    pub fn resubscribe(&self) -> Self {
1398        let shared = self.shared.clone();
1399        new_receiver(shared)
1400    }
1401    /// Receives the next value for this receiver.
1402    ///
1403    /// Each [`Receiver`] handle will receive a clone of all values sent
1404    /// **after** it has subscribed.
1405    ///
1406    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1407    /// dropped, indicating that no further values can be sent on the channel.
1408    ///
1409    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1410    /// sent values will overwrite old values. At this point, a call to [`recv`]
1411    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1412    /// internal cursor is updated to point to the oldest value still held by
1413    /// the channel. A subsequent call to [`recv`] will return this value
1414    /// **unless** it has been since overwritten.
1415    ///
1416    /// # Cancel safety
1417    ///
1418    /// This method is cancel safe. If `recv` is used as the event in a
1419    /// [`tokio::select!`](crate::select) statement and some other branch
1420    /// completes first, it is guaranteed that no messages were received on this
1421    /// channel.
1422    ///
1423    /// [`Receiver`]: crate::sync::broadcast::Receiver
1424    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1425    ///
1426    /// # Examples
1427    ///
1428    /// ```
1429    /// use tokio::sync::broadcast;
1430    ///
1431    /// #[tokio::main]
1432    /// async fn main() {
1433    ///     let (tx, mut rx1) = broadcast::channel(16);
1434    ///     let mut rx2 = tx.subscribe();
1435    ///
1436    ///     tokio::spawn(async move {
1437    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1438    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1439    ///     });
1440    ///
1441    ///     tokio::spawn(async move {
1442    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1443    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1444    ///     });
1445    ///
1446    ///     tx.send(10).unwrap();
1447    ///     tx.send(20).unwrap();
1448    /// }
1449    /// ```
1450    ///
1451    /// Handling lag
1452    ///
1453    /// ```
1454    /// use tokio::sync::broadcast;
1455    ///
1456    /// #[tokio::main]
1457    /// async fn main() {
1458    ///     let (tx, mut rx) = broadcast::channel(2);
1459    ///
1460    ///     tx.send(10).unwrap();
1461    ///     tx.send(20).unwrap();
1462    ///     tx.send(30).unwrap();
1463    ///
1464    ///     // The receiver lagged behind
1465    ///     assert!(rx.recv().await.is_err());
1466    ///
1467    ///     // At this point, we can abort or continue with lost messages
1468    ///
1469    ///     assert_eq!(20, rx.recv().await.unwrap());
1470    ///     assert_eq!(30, rx.recv().await.unwrap());
1471    /// }
1472    /// ```
1473    pub async fn recv(&mut self) -> Result<T, RecvError> {
1474        cooperative(Recv::new(self)).await
1475    }
1476
1477    /// Attempts to return a pending value on this receiver without awaiting.
1478    ///
1479    /// This is useful for a flavor of "optimistic check" before deciding to
1480    /// await on a receiver.
1481    ///
1482    /// Compared with [`recv`], this function has three failure cases instead of two
1483    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1484    ///
1485    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1486    /// dropped, indicating that no further values can be sent on the channel.
1487    ///
1488    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1489    /// sent values will overwrite old values. At this point, a call to [`recv`]
1490    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1491    /// internal cursor is updated to point to the oldest value still held by
1492    /// the channel. A subsequent call to [`try_recv`] will return this value
1493    /// **unless** it has been since overwritten. If there are no values to
1494    /// receive, `Err(TryRecvError::Empty)` is returned.
1495    ///
1496    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1497    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1498    /// [`Receiver`]: crate::sync::broadcast::Receiver
1499    ///
1500    /// # Examples
1501    ///
1502    /// ```
1503    /// use tokio::sync::broadcast;
1504    ///
1505    /// #[tokio::main]
1506    /// async fn main() {
1507    ///     let (tx, mut rx) = broadcast::channel(16);
1508    ///
1509    ///     assert!(rx.try_recv().is_err());
1510    ///
1511    ///     tx.send(10).unwrap();
1512    ///
1513    ///     let value = rx.try_recv().unwrap();
1514    ///     assert_eq!(10, value);
1515    /// }
1516    /// ```
1517    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1518        let guard = self.recv_ref(None)?;
1519        guard.clone_value().ok_or(TryRecvError::Closed)
1520    }
1521
1522    /// Blocking receive to call outside of asynchronous contexts.
1523    ///
1524    /// # Panics
1525    ///
1526    /// This function panics if called within an asynchronous execution
1527    /// context.
1528    ///
1529    /// # Examples
1530    /// ```
1531    /// use std::thread;
1532    /// use tokio::sync::broadcast;
1533    ///
1534    /// #[tokio::main]
1535    /// async fn main() {
1536    ///     let (tx, mut rx) = broadcast::channel(16);
1537    ///
1538    ///     let sync_code = thread::spawn(move || {
1539    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1540    ///     });
1541    ///
1542    ///     let _ = tx.send(10);
1543    ///     sync_code.join().unwrap();
1544    /// }
1545    /// ```
1546    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1547        crate::future::block_on(self.recv())
1548    }
1549}
1550
1551impl<T> Drop for Receiver<T> {
1552    fn drop(&mut self) {
1553        let mut tail = self.shared.tail.lock();
1554
1555        tail.rx_cnt -= 1;
1556        let until = tail.pos;
1557        let remaining_rx = tail.rx_cnt;
1558
1559        if remaining_rx == 0 {
1560            self.shared.notify_last_rx_drop.notify_waiters();
1561            tail.closed = true;
1562        }
1563
1564        drop(tail);
1565
1566        while self.next < until {
1567            match self.recv_ref(None) {
1568                Ok(_) => {}
1569                // The channel is closed
1570                Err(TryRecvError::Closed) => break,
1571                // Ignore lagging, we will catch up
1572                Err(TryRecvError::Lagged(..)) => {}
1573                // Can't be empty
1574                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1575            }
1576        }
1577    }
1578}
1579
1580impl<'a, T> Recv<'a, T> {
1581    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1582        Recv {
1583            receiver,
1584            waiter: UnsafeCell::new(Waiter {
1585                queued: AtomicBool::new(false),
1586                waker: None,
1587                pointers: linked_list::Pointers::new(),
1588                _p: PhantomPinned,
1589            }),
1590        }
1591    }
1592
1593    /// A custom `project` implementation is used in place of `pin-project-lite`
1594    /// as a custom drop implementation is needed.
1595    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1596        unsafe {
1597            // Safety: Receiver is Unpin
1598            is_unpin::<&mut Receiver<T>>();
1599
1600            let me = self.get_unchecked_mut();
1601            (me.receiver, &me.waiter)
1602        }
1603    }
1604}
1605
1606impl<'a, T> Future for Recv<'a, T>
1607where
1608    T: Clone,
1609{
1610    type Output = Result<T, RecvError>;
1611
1612    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1613        ready!(crate::trace::trace_leaf(cx));
1614
1615        let (receiver, waiter) = self.project();
1616
1617        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1618            Ok(value) => value,
1619            Err(TryRecvError::Empty) => return Poll::Pending,
1620            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1621            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1622        };
1623
1624        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1625    }
1626}
1627
1628impl<'a, T> Drop for Recv<'a, T> {
1629    fn drop(&mut self) {
1630        // Safety: `waiter.queued` is atomic.
1631        // Acquire ordering is required to synchronize with
1632        // `Shared::notify_rx` before we drop the object.
1633        let queued = self
1634            .waiter
1635            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1636
1637        // If the waiter is queued, we need to unlink it from the waiters list.
1638        // If not, no further synchronization is required, since the waiter
1639        // is not in the list and, as such, is not shared with any other threads.
1640        if queued {
1641            // Acquire the tail lock. This is required for safety before accessing
1642            // the waiter node.
1643            let mut tail = self.receiver.shared.tail.lock();
1644
1645            // Safety: tail lock is held.
1646            // `Relaxed` order suffices because we hold the tail lock.
1647            let queued = self
1648                .waiter
1649                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1650
1651            if queued {
1652                // Remove the node
1653                //
1654                // safety: tail lock is held and the wait node is verified to be in
1655                // the list.
1656                unsafe {
1657                    self.waiter.with_mut(|ptr| {
1658                        tail.waiters.remove((&mut *ptr).into());
1659                    });
1660                }
1661            }
1662        }
1663    }
1664}
1665
1666/// # Safety
1667///
1668/// `Waiter` is forced to be !Unpin.
1669unsafe impl linked_list::Link for Waiter {
1670    type Handle = NonNull<Waiter>;
1671    type Target = Waiter;
1672
1673    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1674        *handle
1675    }
1676
1677    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1678        ptr
1679    }
1680
1681    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1682        Waiter::addr_of_pointers(target)
1683    }
1684}
1685
1686impl<T> fmt::Debug for Sender<T> {
1687    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1688        write!(fmt, "broadcast::Sender")
1689    }
1690}
1691
1692impl<T> fmt::Debug for WeakSender<T> {
1693    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1694        write!(fmt, "broadcast::WeakSender")
1695    }
1696}
1697
1698impl<T> fmt::Debug for Receiver<T> {
1699    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1700        write!(fmt, "broadcast::Receiver")
1701    }
1702}
1703
1704impl<'a, T> RecvGuard<'a, T> {
1705    fn clone_value(&self) -> Option<T>
1706    where
1707        T: Clone,
1708    {
1709        self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1710    }
1711}
1712
1713impl<'a, T> Drop for RecvGuard<'a, T> {
1714    fn drop(&mut self) {
1715        // Decrement the remaining counter
1716        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1717            // Safety: Last receiver, drop the value
1718            self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1719        }
1720    }
1721}
1722
1723fn is_unpin<T: Unpin>() {}
1724
1725#[cfg(not(loom))]
1726#[cfg(test)]
1727mod tests {
1728    use super::*;
1729
1730    #[test]
1731    fn receiver_count_on_sender_constructor() {
1732        let sender = Sender::<i32>::new(16);
1733        assert_eq!(sender.receiver_count(), 0);
1734
1735        let rx_1 = sender.subscribe();
1736        assert_eq!(sender.receiver_count(), 1);
1737
1738        let rx_2 = rx_1.resubscribe();
1739        assert_eq!(sender.receiver_count(), 2);
1740
1741        let rx_3 = sender.subscribe();
1742        assert_eq!(sender.receiver_count(), 3);
1743
1744        drop(rx_3);
1745        drop(rx_1);
1746        assert_eq!(sender.receiver_count(), 1);
1747
1748        drop(rx_2);
1749        assert_eq!(sender.receiver_count(), 0);
1750    }
1751
1752    #[cfg(not(loom))]
1753    #[test]
1754    fn receiver_count_on_channel_constructor() {
1755        let (sender, rx) = channel::<i32>(16);
1756        assert_eq!(sender.receiver_count(), 1);
1757
1758        let _rx_2 = rx.resubscribe();
1759        assert_eq!(sender.receiver_count(), 2);
1760    }
1761}