tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::task::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// A sender that does not prevent the channel from being closed.
170///
171/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172/// instances remain, the channel is closed.
173///
174/// In order to send messages, the `WeakSender` needs to be upgraded using
175/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177///
178/// [`Sender`]: Sender
179/// [`WeakSender::upgrade`]: WeakSender::upgrade
180///
181/// # Examples
182///
183/// ```
184/// use tokio::sync::broadcast::channel;
185///
186/// #[tokio::main]
187/// async fn main() {
188/// let (tx, _rx) = channel::<i32>(15);
189/// let tx_weak = tx.downgrade();
190///
191/// // Upgrading will succeed because `tx` still exists.
192/// assert!(tx_weak.upgrade().is_some());
193///
194/// // If we drop `tx`, then it will fail.
195/// drop(tx);
196/// assert!(tx_weak.clone().upgrade().is_none());
197/// }
198/// ```
199pub struct WeakSender<T> {
200 shared: Arc<Shared<T>>,
201}
202
203/// Receiving-half of the [`broadcast`] channel.
204///
205/// Must not be used concurrently. Messages may be retrieved using
206/// [`recv`][Receiver::recv].
207///
208/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209/// wrapper.
210///
211/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212///
213/// # Examples
214///
215/// ```
216/// use tokio::sync::broadcast;
217///
218/// #[tokio::main]
219/// async fn main() {
220/// let (tx, mut rx1) = broadcast::channel(16);
221/// let mut rx2 = tx.subscribe();
222///
223/// tokio::spawn(async move {
224/// assert_eq!(rx1.recv().await.unwrap(), 10);
225/// assert_eq!(rx1.recv().await.unwrap(), 20);
226/// });
227///
228/// tokio::spawn(async move {
229/// assert_eq!(rx2.recv().await.unwrap(), 10);
230/// assert_eq!(rx2.recv().await.unwrap(), 20);
231/// });
232///
233/// tx.send(10).unwrap();
234/// tx.send(20).unwrap();
235/// }
236/// ```
237///
238/// [`broadcast`]: crate::sync::broadcast
239pub struct Receiver<T> {
240 /// State shared with all receivers and senders.
241 shared: Arc<Shared<T>>,
242
243 /// Next position to read from
244 next: u64,
245}
246
247pub mod error {
248 //! Broadcast error types
249
250 use std::fmt;
251
252 /// Error returned by the [`send`] function on a [`Sender`].
253 ///
254 /// A **send** operation can only fail if there are no active receivers,
255 /// implying that the message could never be received. The error contains the
256 /// message being sent as a payload so it can be recovered.
257 ///
258 /// [`send`]: crate::sync::broadcast::Sender::send
259 /// [`Sender`]: crate::sync::broadcast::Sender
260 #[derive(Debug)]
261 pub struct SendError<T>(pub T);
262
263 impl<T> fmt::Display for SendError<T> {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 write!(f, "channel closed")
266 }
267 }
268
269 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271 /// An error returned from the [`recv`] function on a [`Receiver`].
272 ///
273 /// [`recv`]: crate::sync::broadcast::Receiver::recv
274 /// [`Receiver`]: crate::sync::broadcast::Receiver
275 #[derive(Debug, PartialEq, Eq, Clone)]
276 pub enum RecvError {
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind. Attempting to receive again will
282 /// return the oldest message still retained by the channel.
283 ///
284 /// Includes the number of skipped messages.
285 Lagged(u64),
286 }
287
288 impl fmt::Display for RecvError {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 RecvError::Closed => write!(f, "channel closed"),
292 RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293 }
294 }
295 }
296
297 impl std::error::Error for RecvError {}
298
299 /// An error returned from the [`try_recv`] function on a [`Receiver`].
300 ///
301 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302 /// [`Receiver`]: crate::sync::broadcast::Receiver
303 #[derive(Debug, PartialEq, Eq, Clone)]
304 pub enum TryRecvError {
305 /// The channel is currently empty. There are still active
306 /// [`Sender`] handles, so data may yet become available.
307 ///
308 /// [`Sender`]: crate::sync::broadcast::Sender
309 Empty,
310
311 /// There are no more active senders implying no further messages will ever
312 /// be sent.
313 Closed,
314
315 /// The receiver lagged too far behind and has been forcibly disconnected.
316 /// Attempting to receive again will return the oldest message still
317 /// retained by the channel.
318 ///
319 /// Includes the number of skipped messages.
320 Lagged(u64),
321 }
322
323 impl fmt::Display for TryRecvError {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 match self {
326 TryRecvError::Empty => write!(f, "channel empty"),
327 TryRecvError::Closed => write!(f, "channel closed"),
328 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329 }
330 }
331 }
332
333 impl std::error::Error for TryRecvError {}
334}
335
336use self::error::{RecvError, SendError, TryRecvError};
337
338use super::Notify;
339
340/// Data shared between senders and receivers.
341struct Shared<T> {
342 /// slots in the channel.
343 buffer: Box<[RwLock<Slot<T>>]>,
344
345 /// Mask a position -> index.
346 mask: usize,
347
348 /// Tail of the queue. Includes the rx wait list.
349 tail: Mutex<Tail>,
350
351 /// Number of outstanding Sender handles.
352 num_tx: AtomicUsize,
353
354 /// Number of outstanding weak Sender handles.
355 num_weak_tx: AtomicUsize,
356
357 /// Notify when the last subscribed [`Receiver`] drops.
358 notify_last_rx_drop: Notify,
359}
360
361/// Next position to write a value.
362struct Tail {
363 /// Next position to write to.
364 pos: u64,
365
366 /// Number of active receivers.
367 rx_cnt: usize,
368
369 /// True if the channel is closed.
370 closed: bool,
371
372 /// Receivers waiting for a value.
373 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374}
375
376/// Slot in the buffer.
377struct Slot<T> {
378 /// Remaining number of receivers that are expected to see this value.
379 ///
380 /// When this goes to zero, the value is released.
381 ///
382 /// An atomic is used as it is mutated concurrently with the slot read lock
383 /// acquired.
384 rem: AtomicUsize,
385
386 /// Uniquely identifies the `send` stored in the slot.
387 pos: u64,
388
389 /// The value being broadcast.
390 ///
391 /// The value is set by `send` when the write lock is held. When a reader
392 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393 val: UnsafeCell<Option<T>>,
394}
395
396/// An entry in the wait queue.
397struct Waiter {
398 /// True if queued.
399 queued: AtomicBool,
400
401 /// Task waiting on the broadcast channel.
402 waker: Option<Waker>,
403
404 /// Intrusive linked-list pointers.
405 pointers: linked_list::Pointers<Waiter>,
406
407 /// Should not be `Unpin`.
408 _p: PhantomPinned,
409}
410
411impl Waiter {
412 fn new() -> Self {
413 Self {
414 queued: AtomicBool::new(false),
415 waker: None,
416 pointers: linked_list::Pointers::new(),
417 _p: PhantomPinned,
418 }
419 }
420}
421
422generate_addr_of_methods! {
423 impl<> Waiter {
424 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425 &self.pointers
426 }
427 }
428}
429
430struct RecvGuard<'a, T> {
431 slot: RwLockReadGuard<'a, Slot<T>>,
432}
433
434/// Receive a value future.
435struct Recv<'a, T> {
436 /// Receiver being waited on.
437 receiver: &'a mut Receiver<T>,
438
439 /// Entry in the waiter `LinkedList`.
440 waiter: UnsafeCell<Waiter>,
441}
442
443unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
444unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
445
446/// Max number of receivers. Reserve space to lock.
447const MAX_RECEIVERS: usize = usize::MAX >> 2;
448
449/// Create a bounded, multi-producer, multi-consumer channel where each sent
450/// value is broadcasted to all active receivers.
451///
452/// **Note:** The actual capacity may be greater than the provided `capacity`.
453///
454/// All data sent on [`Sender`] will become available on every active
455/// [`Receiver`] in the same order as it was sent.
456///
457/// The `Sender` can be cloned to `send` to the same channel from multiple
458/// points in the process or it can be used concurrently from an `Arc`. New
459/// `Receiver` handles are created by calling [`Sender::subscribe`].
460///
461/// If all [`Receiver`] handles are dropped, the `send` method will return a
462/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
463/// method will return a [`RecvError`].
464///
465/// [`Sender`]: crate::sync::broadcast::Sender
466/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
467/// [`Receiver`]: crate::sync::broadcast::Receiver
468/// [`recv`]: crate::sync::broadcast::Receiver::recv
469/// [`SendError`]: crate::sync::broadcast::error::SendError
470/// [`RecvError`]: crate::sync::broadcast::error::RecvError
471///
472/// # Examples
473///
474/// ```
475/// use tokio::sync::broadcast;
476///
477/// #[tokio::main]
478/// async fn main() {
479/// let (tx, mut rx1) = broadcast::channel(16);
480/// let mut rx2 = tx.subscribe();
481///
482/// tokio::spawn(async move {
483/// assert_eq!(rx1.recv().await.unwrap(), 10);
484/// assert_eq!(rx1.recv().await.unwrap(), 20);
485/// });
486///
487/// tokio::spawn(async move {
488/// assert_eq!(rx2.recv().await.unwrap(), 10);
489/// assert_eq!(rx2.recv().await.unwrap(), 20);
490/// });
491///
492/// tx.send(10).unwrap();
493/// tx.send(20).unwrap();
494/// }
495/// ```
496///
497/// # Panics
498///
499/// This will panic if `capacity` is equal to `0` or larger
500/// than `usize::MAX / 2`.
501#[track_caller]
502pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
503 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
504 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
505 let rx = Receiver {
506 shared: tx.shared.clone(),
507 next: 0,
508 };
509 (tx, rx)
510}
511
512unsafe impl<T: Send> Send for Sender<T> {}
513unsafe impl<T: Send> Sync for Sender<T> {}
514
515unsafe impl<T: Send> Send for WeakSender<T> {}
516unsafe impl<T: Send> Sync for WeakSender<T> {}
517
518unsafe impl<T: Send> Send for Receiver<T> {}
519unsafe impl<T: Send> Sync for Receiver<T> {}
520
521impl<T> Sender<T> {
522 /// Creates the sending-half of the [`broadcast`] channel.
523 ///
524 /// See the documentation of [`broadcast::channel`] for more information on this method.
525 ///
526 /// [`broadcast`]: crate::sync::broadcast
527 /// [`broadcast::channel`]: crate::sync::broadcast::channel
528 #[track_caller]
529 pub fn new(capacity: usize) -> Self {
530 // SAFETY: We don't create extra receivers, so there are 0.
531 unsafe { Self::new_with_receiver_count(0, capacity) }
532 }
533
534 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
535 /// count.
536 ///
537 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
538 /// calling this function.
539 ///
540 /// # Safety:
541 ///
542 /// The caller must ensure that the amount of receivers for this Sender is correct before
543 /// the channel functionalities are used, the count is zero by default, as this function
544 /// does not create any receivers by itself.
545 #[track_caller]
546 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
547 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
548 assert!(
549 capacity <= usize::MAX >> 1,
550 "broadcast channel capacity exceeded `usize::MAX / 2`"
551 );
552
553 // Round to a power of two
554 capacity = capacity.next_power_of_two();
555
556 let mut buffer = Vec::with_capacity(capacity);
557
558 for i in 0..capacity {
559 buffer.push(RwLock::new(Slot {
560 rem: AtomicUsize::new(0),
561 pos: (i as u64).wrapping_sub(capacity as u64),
562 val: UnsafeCell::new(None),
563 }));
564 }
565
566 let shared = Arc::new(Shared {
567 buffer: buffer.into_boxed_slice(),
568 mask: capacity - 1,
569 tail: Mutex::new(Tail {
570 pos: 0,
571 rx_cnt: receiver_count,
572 closed: false,
573 waiters: LinkedList::new(),
574 }),
575 num_tx: AtomicUsize::new(1),
576 num_weak_tx: AtomicUsize::new(0),
577 notify_last_rx_drop: Notify::new(),
578 });
579
580 Sender { shared }
581 }
582
583 /// Attempts to send a value to all active [`Receiver`] handles, returning
584 /// it back if it could not be sent.
585 ///
586 /// A successful send occurs when there is at least one active [`Receiver`]
587 /// handle. An unsuccessful send would be one where all associated
588 /// [`Receiver`] handles have already been dropped.
589 ///
590 /// # Return
591 ///
592 /// On success, the number of subscribed [`Receiver`] handles is returned.
593 /// This does not mean that this number of receivers will see the message as
594 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
595 /// the message.
596 ///
597 /// # Note
598 ///
599 /// A return value of `Ok` **does not** mean that the sent value will be
600 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
601 /// handles may be dropped before receiving the sent message.
602 ///
603 /// A return value of `Err` **does not** mean that future calls to `send`
604 /// will fail. New [`Receiver`] handles may be created by calling
605 /// [`subscribe`].
606 ///
607 /// [`Receiver`]: crate::sync::broadcast::Receiver
608 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
609 ///
610 /// # Examples
611 ///
612 /// ```
613 /// use tokio::sync::broadcast;
614 ///
615 /// #[tokio::main]
616 /// async fn main() {
617 /// let (tx, mut rx1) = broadcast::channel(16);
618 /// let mut rx2 = tx.subscribe();
619 ///
620 /// tokio::spawn(async move {
621 /// assert_eq!(rx1.recv().await.unwrap(), 10);
622 /// assert_eq!(rx1.recv().await.unwrap(), 20);
623 /// });
624 ///
625 /// tokio::spawn(async move {
626 /// assert_eq!(rx2.recv().await.unwrap(), 10);
627 /// assert_eq!(rx2.recv().await.unwrap(), 20);
628 /// });
629 ///
630 /// tx.send(10).unwrap();
631 /// tx.send(20).unwrap();
632 /// }
633 /// ```
634 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
635 let mut tail = self.shared.tail.lock();
636
637 if tail.rx_cnt == 0 {
638 return Err(SendError(value));
639 }
640
641 // Position to write into
642 let pos = tail.pos;
643 let rem = tail.rx_cnt;
644 let idx = (pos & self.shared.mask as u64) as usize;
645
646 // Update the tail position
647 tail.pos = tail.pos.wrapping_add(1);
648
649 // Get the slot
650 let mut slot = self.shared.buffer[idx].write();
651
652 // Track the position
653 slot.pos = pos;
654
655 // Set remaining receivers
656 slot.rem.with_mut(|v| *v = rem);
657
658 // Write the value
659 slot.val = UnsafeCell::new(Some(value));
660
661 // Release the slot lock before notifying the receivers.
662 drop(slot);
663
664 // Notify and release the mutex. This must happen after the slot lock is
665 // released, otherwise the writer lock bit could be cleared while another
666 // thread is in the critical section.
667 self.shared.notify_rx(tail);
668
669 Ok(rem)
670 }
671
672 /// Creates a new [`Receiver`] handle that will receive values sent **after**
673 /// this call to `subscribe`.
674 ///
675 /// # Examples
676 ///
677 /// ```
678 /// use tokio::sync::broadcast;
679 ///
680 /// #[tokio::main]
681 /// async fn main() {
682 /// let (tx, _rx) = broadcast::channel(16);
683 ///
684 /// // Will not be seen
685 /// tx.send(10).unwrap();
686 ///
687 /// let mut rx = tx.subscribe();
688 ///
689 /// tx.send(20).unwrap();
690 ///
691 /// let value = rx.recv().await.unwrap();
692 /// assert_eq!(20, value);
693 /// }
694 /// ```
695 pub fn subscribe(&self) -> Receiver<T> {
696 let shared = self.shared.clone();
697 new_receiver(shared)
698 }
699
700 /// Converts the `Sender` to a [`WeakSender`] that does not count
701 /// towards RAII semantics, i.e. if all `Sender` instances of the
702 /// channel were dropped and only `WeakSender` instances remain,
703 /// the channel is closed.
704 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
705 pub fn downgrade(&self) -> WeakSender<T> {
706 self.shared.num_weak_tx.fetch_add(1, Relaxed);
707 WeakSender {
708 shared: self.shared.clone(),
709 }
710 }
711
712 /// Returns the number of queued values.
713 ///
714 /// A value is queued until it has either been seen by all receivers that were alive at the time
715 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
716 /// queue's capacity.
717 ///
718 /// # Note
719 ///
720 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
721 /// have been evicted from the queue before being seen by all receivers.
722 ///
723 /// # Examples
724 ///
725 /// ```
726 /// use tokio::sync::broadcast;
727 ///
728 /// #[tokio::main]
729 /// async fn main() {
730 /// let (tx, mut rx1) = broadcast::channel(16);
731 /// let mut rx2 = tx.subscribe();
732 ///
733 /// tx.send(10).unwrap();
734 /// tx.send(20).unwrap();
735 /// tx.send(30).unwrap();
736 ///
737 /// assert_eq!(tx.len(), 3);
738 ///
739 /// rx1.recv().await.unwrap();
740 ///
741 /// // The len is still 3 since rx2 hasn't seen the first value yet.
742 /// assert_eq!(tx.len(), 3);
743 ///
744 /// rx2.recv().await.unwrap();
745 ///
746 /// assert_eq!(tx.len(), 2);
747 /// }
748 /// ```
749 pub fn len(&self) -> usize {
750 let tail = self.shared.tail.lock();
751
752 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
753 let mut low = 0;
754 let mut high = self.shared.buffer.len();
755 while low < high {
756 let mid = low + (high - low) / 2;
757 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
758 if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
759 low = mid + 1;
760 } else {
761 high = mid;
762 }
763 }
764
765 self.shared.buffer.len() - low
766 }
767
768 /// Returns true if there are no queued values.
769 ///
770 /// # Examples
771 ///
772 /// ```
773 /// use tokio::sync::broadcast;
774 ///
775 /// #[tokio::main]
776 /// async fn main() {
777 /// let (tx, mut rx1) = broadcast::channel(16);
778 /// let mut rx2 = tx.subscribe();
779 ///
780 /// assert!(tx.is_empty());
781 ///
782 /// tx.send(10).unwrap();
783 ///
784 /// assert!(!tx.is_empty());
785 ///
786 /// rx1.recv().await.unwrap();
787 ///
788 /// // The queue is still not empty since rx2 hasn't seen the value.
789 /// assert!(!tx.is_empty());
790 ///
791 /// rx2.recv().await.unwrap();
792 ///
793 /// assert!(tx.is_empty());
794 /// }
795 /// ```
796 pub fn is_empty(&self) -> bool {
797 let tail = self.shared.tail.lock();
798
799 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
800 self.shared.buffer[idx].read().rem.load(SeqCst) == 0
801 }
802
803 /// Returns the number of active receivers.
804 ///
805 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
806 /// [`subscribe`]. These are the handles that will receive values sent on
807 /// this [`Sender`].
808 ///
809 /// # Note
810 ///
811 /// It is not guaranteed that a sent message will reach this number of
812 /// receivers. Active receivers may never call [`recv`] again before
813 /// dropping.
814 ///
815 /// [`recv`]: crate::sync::broadcast::Receiver::recv
816 /// [`Receiver`]: crate::sync::broadcast::Receiver
817 /// [`Sender`]: crate::sync::broadcast::Sender
818 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
819 /// [`channel`]: crate::sync::broadcast::channel
820 ///
821 /// # Examples
822 ///
823 /// ```
824 /// use tokio::sync::broadcast;
825 ///
826 /// #[tokio::main]
827 /// async fn main() {
828 /// let (tx, _rx1) = broadcast::channel(16);
829 ///
830 /// assert_eq!(1, tx.receiver_count());
831 ///
832 /// let mut _rx2 = tx.subscribe();
833 ///
834 /// assert_eq!(2, tx.receiver_count());
835 ///
836 /// tx.send(10).unwrap();
837 /// }
838 /// ```
839 pub fn receiver_count(&self) -> usize {
840 let tail = self.shared.tail.lock();
841 tail.rx_cnt
842 }
843
844 /// Returns `true` if senders belong to the same channel.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// use tokio::sync::broadcast;
850 ///
851 /// #[tokio::main]
852 /// async fn main() {
853 /// let (tx, _rx) = broadcast::channel::<()>(16);
854 /// let tx2 = tx.clone();
855 ///
856 /// assert!(tx.same_channel(&tx2));
857 ///
858 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
859 ///
860 /// assert!(!tx3.same_channel(&tx2));
861 /// }
862 /// ```
863 pub fn same_channel(&self, other: &Self) -> bool {
864 Arc::ptr_eq(&self.shared, &other.shared)
865 }
866
867 /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
868 /// zero.
869 ///
870 /// # Examples
871 ///
872 /// ```
873 /// use futures::FutureExt;
874 /// use tokio::sync::broadcast;
875 ///
876 /// #[tokio::main]
877 /// async fn main() {
878 /// let (tx, mut rx1) = broadcast::channel::<u32>(16);
879 /// let mut rx2 = tx.subscribe();
880 ///
881 /// let _ = tx.send(10);
882 ///
883 /// assert_eq!(rx1.recv().await.unwrap(), 10);
884 /// drop(rx1);
885 /// assert!(tx.closed().now_or_never().is_none());
886 ///
887 /// assert_eq!(rx2.recv().await.unwrap(), 10);
888 /// drop(rx2);
889 /// assert!(tx.closed().now_or_never().is_some());
890 /// }
891 /// ```
892 pub async fn closed(&self) {
893 loop {
894 let notified = self.shared.notify_last_rx_drop.notified();
895
896 {
897 // Ensure the lock drops if the channel isn't closed
898 let tail = self.shared.tail.lock();
899 if tail.closed {
900 return;
901 }
902 }
903
904 notified.await;
905 }
906 }
907
908 fn close_channel(&self) {
909 let mut tail = self.shared.tail.lock();
910 tail.closed = true;
911
912 self.shared.notify_rx(tail);
913 }
914
915 /// Returns the number of [`Sender`] handles.
916 pub fn strong_count(&self) -> usize {
917 self.shared.num_tx.load(Acquire)
918 }
919
920 /// Returns the number of [`WeakSender`] handles.
921 pub fn weak_count(&self) -> usize {
922 self.shared.num_weak_tx.load(Acquire)
923 }
924}
925
926/// Create a new `Receiver` which reads starting from the tail.
927fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
928 let mut tail = shared.tail.lock();
929
930 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
931
932 if tail.rx_cnt == 0 {
933 // Potentially need to re-open the channel, if a new receiver has been added between calls
934 // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
935 // applies if the sender has been dropped
936 tail.closed = false;
937 }
938
939 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
940 let next = tail.pos;
941
942 drop(tail);
943
944 Receiver { shared, next }
945}
946
947/// List used in `Shared::notify_rx`. It wraps a guarded linked list
948/// and gates the access to it on the `Shared.tail` mutex. It also empties
949/// the list on drop.
950struct WaitersList<'a, T> {
951 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
952 is_empty: bool,
953 shared: &'a Shared<T>,
954}
955
956impl<'a, T> Drop for WaitersList<'a, T> {
957 fn drop(&mut self) {
958 // If the list is not empty, we unlink all waiters from it.
959 // We do not wake the waiters to avoid double panics.
960 if !self.is_empty {
961 let _lock_guard = self.shared.tail.lock();
962 while self.list.pop_back().is_some() {}
963 }
964 }
965}
966
967impl<'a, T> WaitersList<'a, T> {
968 fn new(
969 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
970 guard: Pin<&'a Waiter>,
971 shared: &'a Shared<T>,
972 ) -> Self {
973 let guard_ptr = NonNull::from(guard.get_ref());
974 let list = unguarded_list.into_guarded(guard_ptr);
975 WaitersList {
976 list,
977 is_empty: false,
978 shared,
979 }
980 }
981
982 /// Removes the last element from the guarded list. Modifying this list
983 /// requires an exclusive access to the main list in `Notify`.
984 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
985 let result = self.list.pop_back();
986 if result.is_none() {
987 // Save information about emptiness to avoid waiting for lock
988 // in the destructor.
989 self.is_empty = true;
990 }
991 result
992 }
993}
994
995impl<T> Shared<T> {
996 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
997 // It is critical for `GuardedLinkedList` safety that the guard node is
998 // pinned in memory and is not dropped until the guarded list is dropped.
999 let guard = Waiter::new();
1000 pin!(guard);
1001
1002 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
1003 // underneath to allow every waiter to safely remove itself from it.
1004 //
1005 // * This list will be still guarded by the `waiters` lock.
1006 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1007 // * This wrapper will empty the list on drop. It is critical for safety
1008 // that we will not leave any list entry with a pointer to the local
1009 // guard node after this function returns / panics.
1010 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1011
1012 let mut wakers = WakeList::new();
1013 'outer: loop {
1014 while wakers.can_push() {
1015 match list.pop_back_locked(&mut tail) {
1016 Some(waiter) => {
1017 unsafe {
1018 // Safety: accessing `waker` is safe because
1019 // the tail lock is held.
1020 if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1021 wakers.push(waker);
1022 }
1023
1024 // Safety: `queued` is atomic.
1025 let queued = &(*waiter.as_ptr()).queued;
1026 // `Relaxed` suffices because the tail lock is held.
1027 assert!(queued.load(Relaxed));
1028 // `Release` is needed to synchronize with `Recv::drop`.
1029 // It is critical to set this variable **after** waker
1030 // is extracted, otherwise we may data race with `Recv::drop`.
1031 queued.store(false, Release);
1032 }
1033 }
1034 None => {
1035 break 'outer;
1036 }
1037 }
1038 }
1039
1040 // Release the lock before waking.
1041 drop(tail);
1042
1043 // Before we acquire the lock again all sorts of things can happen:
1044 // some waiters may remove themselves from the list and new waiters
1045 // may be added. This is fine since at worst we will unnecessarily
1046 // wake up waiters which will then queue themselves again.
1047
1048 wakers.wake_all();
1049
1050 // Acquire the lock again.
1051 tail = self.tail.lock();
1052 }
1053
1054 // Release the lock before waking.
1055 drop(tail);
1056
1057 wakers.wake_all();
1058 }
1059}
1060
1061impl<T> Clone for Sender<T> {
1062 fn clone(&self) -> Sender<T> {
1063 let shared = self.shared.clone();
1064 shared.num_tx.fetch_add(1, Relaxed);
1065
1066 Sender { shared }
1067 }
1068}
1069
1070impl<T> Drop for Sender<T> {
1071 fn drop(&mut self) {
1072 if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1073 self.close_channel();
1074 }
1075 }
1076}
1077
1078impl<T> WeakSender<T> {
1079 /// Tries to convert a `WeakSender` into a [`Sender`].
1080 ///
1081 /// This will return `Some` if there are other `Sender` instances alive and
1082 /// the channel wasn't previously dropped, otherwise `None` is returned.
1083 #[must_use]
1084 pub fn upgrade(&self) -> Option<Sender<T>> {
1085 let mut tx_count = self.shared.num_tx.load(Acquire);
1086
1087 loop {
1088 if tx_count == 0 {
1089 // channel is closed so this WeakSender can not be upgraded
1090 return None;
1091 }
1092
1093 match self
1094 .shared
1095 .num_tx
1096 .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1097 {
1098 Ok(_) => {
1099 return Some(Sender {
1100 shared: self.shared.clone(),
1101 })
1102 }
1103 Err(prev_count) => tx_count = prev_count,
1104 }
1105 }
1106 }
1107
1108 /// Returns the number of [`Sender`] handles.
1109 pub fn strong_count(&self) -> usize {
1110 self.shared.num_tx.load(Acquire)
1111 }
1112
1113 /// Returns the number of [`WeakSender`] handles.
1114 pub fn weak_count(&self) -> usize {
1115 self.shared.num_weak_tx.load(Acquire)
1116 }
1117}
1118
1119impl<T> Clone for WeakSender<T> {
1120 fn clone(&self) -> WeakSender<T> {
1121 let shared = self.shared.clone();
1122 shared.num_weak_tx.fetch_add(1, Relaxed);
1123
1124 Self { shared }
1125 }
1126}
1127
1128impl<T> Drop for WeakSender<T> {
1129 fn drop(&mut self) {
1130 self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1131 }
1132}
1133
1134impl<T> Receiver<T> {
1135 /// Returns the number of messages that were sent into the channel and that
1136 /// this [`Receiver`] has yet to receive.
1137 ///
1138 /// If the returned value from `len` is larger than the next largest power of 2
1139 /// of the capacity of the channel any call to [`recv`] will return an
1140 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1141 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1142 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1143 /// values larger than 16.
1144 ///
1145 /// [`Receiver`]: crate::sync::broadcast::Receiver
1146 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1147 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1148 ///
1149 /// # Examples
1150 ///
1151 /// ```
1152 /// use tokio::sync::broadcast;
1153 ///
1154 /// #[tokio::main]
1155 /// async fn main() {
1156 /// let (tx, mut rx1) = broadcast::channel(16);
1157 ///
1158 /// tx.send(10).unwrap();
1159 /// tx.send(20).unwrap();
1160 ///
1161 /// assert_eq!(rx1.len(), 2);
1162 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1163 /// assert_eq!(rx1.len(), 1);
1164 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1165 /// assert_eq!(rx1.len(), 0);
1166 /// }
1167 /// ```
1168 pub fn len(&self) -> usize {
1169 let next_send_pos = self.shared.tail.lock().pos;
1170 (next_send_pos - self.next) as usize
1171 }
1172
1173 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1174 /// has yet to receive.
1175 ///
1176 /// [`Receiver]: create::sync::broadcast::Receiver
1177 ///
1178 /// # Examples
1179 ///
1180 /// ```
1181 /// use tokio::sync::broadcast;
1182 ///
1183 /// #[tokio::main]
1184 /// async fn main() {
1185 /// let (tx, mut rx1) = broadcast::channel(16);
1186 ///
1187 /// assert!(rx1.is_empty());
1188 ///
1189 /// tx.send(10).unwrap();
1190 /// tx.send(20).unwrap();
1191 ///
1192 /// assert!(!rx1.is_empty());
1193 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1194 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1195 /// assert!(rx1.is_empty());
1196 /// }
1197 /// ```
1198 pub fn is_empty(&self) -> bool {
1199 self.len() == 0
1200 }
1201
1202 /// Returns `true` if receivers belong to the same channel.
1203 ///
1204 /// # Examples
1205 ///
1206 /// ```
1207 /// use tokio::sync::broadcast;
1208 ///
1209 /// #[tokio::main]
1210 /// async fn main() {
1211 /// let (tx, rx) = broadcast::channel::<()>(16);
1212 /// let rx2 = tx.subscribe();
1213 ///
1214 /// assert!(rx.same_channel(&rx2));
1215 ///
1216 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1217 ///
1218 /// assert!(!rx3.same_channel(&rx2));
1219 /// }
1220 /// ```
1221 pub fn same_channel(&self, other: &Self) -> bool {
1222 Arc::ptr_eq(&self.shared, &other.shared)
1223 }
1224
1225 /// Locks the next value if there is one.
1226 fn recv_ref(
1227 &mut self,
1228 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1229 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1230 let idx = (self.next & self.shared.mask as u64) as usize;
1231
1232 // The slot holding the next value to read
1233 let mut slot = self.shared.buffer[idx].read();
1234
1235 if slot.pos != self.next {
1236 // Release the `slot` lock before attempting to acquire the `tail`
1237 // lock. This is required because `send2` acquires the tail lock
1238 // first followed by the slot lock. Acquiring the locks in reverse
1239 // order here would result in a potential deadlock: `recv_ref`
1240 // acquires the `slot` lock and attempts to acquire the `tail` lock
1241 // while `send2` acquired the `tail` lock and attempts to acquire
1242 // the slot lock.
1243 drop(slot);
1244
1245 let mut old_waker = None;
1246
1247 let mut tail = self.shared.tail.lock();
1248
1249 // Acquire slot lock again
1250 slot = self.shared.buffer[idx].read();
1251
1252 // Make sure the position did not change. This could happen in the
1253 // unlikely event that the buffer is wrapped between dropping the
1254 // read lock and acquiring the tail lock.
1255 if slot.pos != self.next {
1256 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1257
1258 if next_pos == self.next {
1259 // At this point the channel is empty for *this* receiver. If
1260 // it's been closed, then that's what we return, otherwise we
1261 // set a waker and return empty.
1262 if tail.closed {
1263 return Err(TryRecvError::Closed);
1264 }
1265
1266 // Store the waker
1267 if let Some((waiter, waker)) = waiter {
1268 // Safety: called while locked.
1269 unsafe {
1270 // Only queue if not already queued
1271 waiter.with_mut(|ptr| {
1272 // If there is no waker **or** if the currently
1273 // stored waker references a **different** task,
1274 // track the tasks' waker to be notified on
1275 // receipt of a new value.
1276 match (*ptr).waker {
1277 Some(ref w) if w.will_wake(waker) => {}
1278 _ => {
1279 old_waker = std::mem::replace(
1280 &mut (*ptr).waker,
1281 Some(waker.clone()),
1282 );
1283 }
1284 }
1285
1286 // If the waiter is not already queued, enqueue it.
1287 // `Relaxed` order suffices: we have synchronized with
1288 // all writers through the tail lock that we hold.
1289 if !(*ptr).queued.load(Relaxed) {
1290 // `Relaxed` order suffices: all the readers will
1291 // synchronize with this write through the tail lock.
1292 (*ptr).queued.store(true, Relaxed);
1293 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1294 }
1295 });
1296 }
1297 }
1298
1299 // Drop the old waker after releasing the locks.
1300 drop(slot);
1301 drop(tail);
1302 drop(old_waker);
1303
1304 return Err(TryRecvError::Empty);
1305 }
1306
1307 // At this point, the receiver has lagged behind the sender by
1308 // more than the channel capacity. The receiver will attempt to
1309 // catch up by skipping dropped messages and setting the
1310 // internal cursor to the **oldest** message stored by the
1311 // channel.
1312 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1313
1314 let missed = next.wrapping_sub(self.next);
1315
1316 drop(tail);
1317
1318 // The receiver is slow but no values have been missed
1319 if missed == 0 {
1320 self.next = self.next.wrapping_add(1);
1321
1322 return Ok(RecvGuard { slot });
1323 }
1324
1325 self.next = next;
1326
1327 return Err(TryRecvError::Lagged(missed));
1328 }
1329 }
1330
1331 self.next = self.next.wrapping_add(1);
1332
1333 Ok(RecvGuard { slot })
1334 }
1335
1336 /// Returns the number of [`Sender`] handles.
1337 pub fn sender_strong_count(&self) -> usize {
1338 self.shared.num_tx.load(Acquire)
1339 }
1340
1341 /// Returns the number of [`WeakSender`] handles.
1342 pub fn sender_weak_count(&self) -> usize {
1343 self.shared.num_weak_tx.load(Acquire)
1344 }
1345
1346 /// Checks if a channel is closed.
1347 ///
1348 /// This method returns `true` if the channel has been closed. The channel is closed
1349 /// when all [`Sender`] have been dropped.
1350 ///
1351 /// [`Sender`]: crate::sync::broadcast::Sender
1352 ///
1353 /// # Examples
1354 /// ```
1355 /// use tokio::sync::broadcast;
1356 ///
1357 /// #[tokio::main]
1358 /// async fn main() {
1359 /// let (tx, rx) = broadcast::channel::<()>(10);
1360 /// assert!(!rx.is_closed());
1361 ///
1362 /// drop(tx);
1363 ///
1364 /// assert!(rx.is_closed());
1365 /// }
1366 /// ```
1367 pub fn is_closed(&self) -> bool {
1368 // Channel is closed when there are no strong senders left active
1369 self.shared.num_tx.load(Acquire) == 0
1370 }
1371}
1372
1373impl<T: Clone> Receiver<T> {
1374 /// Re-subscribes to the channel starting from the current tail element.
1375 ///
1376 /// This [`Receiver`] handle will receive a clone of all values sent
1377 /// **after** it has resubscribed. This will not include elements that are
1378 /// in the queue of the current receiver. Consider the following example.
1379 ///
1380 /// # Examples
1381 ///
1382 /// ```
1383 /// use tokio::sync::broadcast;
1384 ///
1385 /// #[tokio::main]
1386 /// async fn main() {
1387 /// let (tx, mut rx) = broadcast::channel(2);
1388 ///
1389 /// tx.send(1).unwrap();
1390 /// let mut rx2 = rx.resubscribe();
1391 /// tx.send(2).unwrap();
1392 ///
1393 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1394 /// assert_eq!(rx.recv().await.unwrap(), 1);
1395 /// }
1396 /// ```
1397 pub fn resubscribe(&self) -> Self {
1398 let shared = self.shared.clone();
1399 new_receiver(shared)
1400 }
1401 /// Receives the next value for this receiver.
1402 ///
1403 /// Each [`Receiver`] handle will receive a clone of all values sent
1404 /// **after** it has subscribed.
1405 ///
1406 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1407 /// dropped, indicating that no further values can be sent on the channel.
1408 ///
1409 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1410 /// sent values will overwrite old values. At this point, a call to [`recv`]
1411 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1412 /// internal cursor is updated to point to the oldest value still held by
1413 /// the channel. A subsequent call to [`recv`] will return this value
1414 /// **unless** it has been since overwritten.
1415 ///
1416 /// # Cancel safety
1417 ///
1418 /// This method is cancel safe. If `recv` is used as the event in a
1419 /// [`tokio::select!`](crate::select) statement and some other branch
1420 /// completes first, it is guaranteed that no messages were received on this
1421 /// channel.
1422 ///
1423 /// [`Receiver`]: crate::sync::broadcast::Receiver
1424 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1425 ///
1426 /// # Examples
1427 ///
1428 /// ```
1429 /// use tokio::sync::broadcast;
1430 ///
1431 /// #[tokio::main]
1432 /// async fn main() {
1433 /// let (tx, mut rx1) = broadcast::channel(16);
1434 /// let mut rx2 = tx.subscribe();
1435 ///
1436 /// tokio::spawn(async move {
1437 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1438 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1439 /// });
1440 ///
1441 /// tokio::spawn(async move {
1442 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1443 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1444 /// });
1445 ///
1446 /// tx.send(10).unwrap();
1447 /// tx.send(20).unwrap();
1448 /// }
1449 /// ```
1450 ///
1451 /// Handling lag
1452 ///
1453 /// ```
1454 /// use tokio::sync::broadcast;
1455 ///
1456 /// #[tokio::main]
1457 /// async fn main() {
1458 /// let (tx, mut rx) = broadcast::channel(2);
1459 ///
1460 /// tx.send(10).unwrap();
1461 /// tx.send(20).unwrap();
1462 /// tx.send(30).unwrap();
1463 ///
1464 /// // The receiver lagged behind
1465 /// assert!(rx.recv().await.is_err());
1466 ///
1467 /// // At this point, we can abort or continue with lost messages
1468 ///
1469 /// assert_eq!(20, rx.recv().await.unwrap());
1470 /// assert_eq!(30, rx.recv().await.unwrap());
1471 /// }
1472 /// ```
1473 pub async fn recv(&mut self) -> Result<T, RecvError> {
1474 cooperative(Recv::new(self)).await
1475 }
1476
1477 /// Attempts to return a pending value on this receiver without awaiting.
1478 ///
1479 /// This is useful for a flavor of "optimistic check" before deciding to
1480 /// await on a receiver.
1481 ///
1482 /// Compared with [`recv`], this function has three failure cases instead of two
1483 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1484 ///
1485 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1486 /// dropped, indicating that no further values can be sent on the channel.
1487 ///
1488 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1489 /// sent values will overwrite old values. At this point, a call to [`recv`]
1490 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1491 /// internal cursor is updated to point to the oldest value still held by
1492 /// the channel. A subsequent call to [`try_recv`] will return this value
1493 /// **unless** it has been since overwritten. If there are no values to
1494 /// receive, `Err(TryRecvError::Empty)` is returned.
1495 ///
1496 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1497 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1498 /// [`Receiver`]: crate::sync::broadcast::Receiver
1499 ///
1500 /// # Examples
1501 ///
1502 /// ```
1503 /// use tokio::sync::broadcast;
1504 ///
1505 /// #[tokio::main]
1506 /// async fn main() {
1507 /// let (tx, mut rx) = broadcast::channel(16);
1508 ///
1509 /// assert!(rx.try_recv().is_err());
1510 ///
1511 /// tx.send(10).unwrap();
1512 ///
1513 /// let value = rx.try_recv().unwrap();
1514 /// assert_eq!(10, value);
1515 /// }
1516 /// ```
1517 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1518 let guard = self.recv_ref(None)?;
1519 guard.clone_value().ok_or(TryRecvError::Closed)
1520 }
1521
1522 /// Blocking receive to call outside of asynchronous contexts.
1523 ///
1524 /// # Panics
1525 ///
1526 /// This function panics if called within an asynchronous execution
1527 /// context.
1528 ///
1529 /// # Examples
1530 /// ```
1531 /// use std::thread;
1532 /// use tokio::sync::broadcast;
1533 ///
1534 /// #[tokio::main]
1535 /// async fn main() {
1536 /// let (tx, mut rx) = broadcast::channel(16);
1537 ///
1538 /// let sync_code = thread::spawn(move || {
1539 /// assert_eq!(rx.blocking_recv(), Ok(10));
1540 /// });
1541 ///
1542 /// let _ = tx.send(10);
1543 /// sync_code.join().unwrap();
1544 /// }
1545 /// ```
1546 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1547 crate::future::block_on(self.recv())
1548 }
1549}
1550
1551impl<T> Drop for Receiver<T> {
1552 fn drop(&mut self) {
1553 let mut tail = self.shared.tail.lock();
1554
1555 tail.rx_cnt -= 1;
1556 let until = tail.pos;
1557 let remaining_rx = tail.rx_cnt;
1558
1559 if remaining_rx == 0 {
1560 self.shared.notify_last_rx_drop.notify_waiters();
1561 tail.closed = true;
1562 }
1563
1564 drop(tail);
1565
1566 while self.next < until {
1567 match self.recv_ref(None) {
1568 Ok(_) => {}
1569 // The channel is closed
1570 Err(TryRecvError::Closed) => break,
1571 // Ignore lagging, we will catch up
1572 Err(TryRecvError::Lagged(..)) => {}
1573 // Can't be empty
1574 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1575 }
1576 }
1577 }
1578}
1579
1580impl<'a, T> Recv<'a, T> {
1581 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1582 Recv {
1583 receiver,
1584 waiter: UnsafeCell::new(Waiter {
1585 queued: AtomicBool::new(false),
1586 waker: None,
1587 pointers: linked_list::Pointers::new(),
1588 _p: PhantomPinned,
1589 }),
1590 }
1591 }
1592
1593 /// A custom `project` implementation is used in place of `pin-project-lite`
1594 /// as a custom drop implementation is needed.
1595 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1596 unsafe {
1597 // Safety: Receiver is Unpin
1598 is_unpin::<&mut Receiver<T>>();
1599
1600 let me = self.get_unchecked_mut();
1601 (me.receiver, &me.waiter)
1602 }
1603 }
1604}
1605
1606impl<'a, T> Future for Recv<'a, T>
1607where
1608 T: Clone,
1609{
1610 type Output = Result<T, RecvError>;
1611
1612 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1613 ready!(crate::trace::trace_leaf(cx));
1614
1615 let (receiver, waiter) = self.project();
1616
1617 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1618 Ok(value) => value,
1619 Err(TryRecvError::Empty) => return Poll::Pending,
1620 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1621 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1622 };
1623
1624 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1625 }
1626}
1627
1628impl<'a, T> Drop for Recv<'a, T> {
1629 fn drop(&mut self) {
1630 // Safety: `waiter.queued` is atomic.
1631 // Acquire ordering is required to synchronize with
1632 // `Shared::notify_rx` before we drop the object.
1633 let queued = self
1634 .waiter
1635 .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1636
1637 // If the waiter is queued, we need to unlink it from the waiters list.
1638 // If not, no further synchronization is required, since the waiter
1639 // is not in the list and, as such, is not shared with any other threads.
1640 if queued {
1641 // Acquire the tail lock. This is required for safety before accessing
1642 // the waiter node.
1643 let mut tail = self.receiver.shared.tail.lock();
1644
1645 // Safety: tail lock is held.
1646 // `Relaxed` order suffices because we hold the tail lock.
1647 let queued = self
1648 .waiter
1649 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1650
1651 if queued {
1652 // Remove the node
1653 //
1654 // safety: tail lock is held and the wait node is verified to be in
1655 // the list.
1656 unsafe {
1657 self.waiter.with_mut(|ptr| {
1658 tail.waiters.remove((&mut *ptr).into());
1659 });
1660 }
1661 }
1662 }
1663 }
1664}
1665
1666/// # Safety
1667///
1668/// `Waiter` is forced to be !Unpin.
1669unsafe impl linked_list::Link for Waiter {
1670 type Handle = NonNull<Waiter>;
1671 type Target = Waiter;
1672
1673 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1674 *handle
1675 }
1676
1677 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1678 ptr
1679 }
1680
1681 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1682 Waiter::addr_of_pointers(target)
1683 }
1684}
1685
1686impl<T> fmt::Debug for Sender<T> {
1687 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1688 write!(fmt, "broadcast::Sender")
1689 }
1690}
1691
1692impl<T> fmt::Debug for WeakSender<T> {
1693 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1694 write!(fmt, "broadcast::WeakSender")
1695 }
1696}
1697
1698impl<T> fmt::Debug for Receiver<T> {
1699 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1700 write!(fmt, "broadcast::Receiver")
1701 }
1702}
1703
1704impl<'a, T> RecvGuard<'a, T> {
1705 fn clone_value(&self) -> Option<T>
1706 where
1707 T: Clone,
1708 {
1709 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1710 }
1711}
1712
1713impl<'a, T> Drop for RecvGuard<'a, T> {
1714 fn drop(&mut self) {
1715 // Decrement the remaining counter
1716 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1717 // Safety: Last receiver, drop the value
1718 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1719 }
1720 }
1721}
1722
1723fn is_unpin<T: Unpin>() {}
1724
1725#[cfg(not(loom))]
1726#[cfg(test)]
1727mod tests {
1728 use super::*;
1729
1730 #[test]
1731 fn receiver_count_on_sender_constructor() {
1732 let sender = Sender::<i32>::new(16);
1733 assert_eq!(sender.receiver_count(), 0);
1734
1735 let rx_1 = sender.subscribe();
1736 assert_eq!(sender.receiver_count(), 1);
1737
1738 let rx_2 = rx_1.resubscribe();
1739 assert_eq!(sender.receiver_count(), 2);
1740
1741 let rx_3 = sender.subscribe();
1742 assert_eq!(sender.receiver_count(), 3);
1743
1744 drop(rx_3);
1745 drop(rx_1);
1746 assert_eq!(sender.receiver_count(), 1);
1747
1748 drop(rx_2);
1749 assert_eq!(sender.receiver_count(), 0);
1750 }
1751
1752 #[cfg(not(loom))]
1753 #[test]
1754 fn receiver_count_on_channel_constructor() {
1755 let (sender, rx) = channel::<i32>(16);
1756 assert_eq!(sender.receiver_count(), 1);
1757
1758 let _rx_2 = rx.resubscribe();
1759 assert_eq!(sender.receiver_count(), 2);
1760 }
1761}