tokio/sync/
oneshot.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! #[tokio::main]
25//! async fn main() {
26//!     let (tx, rx) = oneshot::channel();
27//!
28//!     tokio::spawn(async move {
29//!         if let Err(_) = tx.send(3) {
30//!             println!("the receiver dropped");
31//!         }
32//!     });
33//!
34//!     match rx.await {
35//!         Ok(v) => println!("got = {:?}", v),
36//!         Err(_) => println!("the sender dropped"),
37//!     }
38//! }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! #[tokio::main]
48//! async fn main() {
49//!     let (tx, rx) = oneshot::channel::<u32>();
50//!
51//!     tokio::spawn(async move {
52//!         drop(tx);
53//!     });
54//!
55//!     match rx.await {
56//!         Ok(_) => panic!("This doesn't happen"),
57//!         Err(_) => println!("the sender dropped"),
58//!     }
59//! }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! #[tokio::main]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! async fn main() {
73//!     let (send, mut recv) = oneshot::channel();
74//!     let mut interval = interval(Duration::from_millis(100));
75//!
76//!     # let handle =
77//!     tokio::spawn(async move {
78//!         sleep(Duration::from_secs(1)).await;
79//!         send.send("shut down").unwrap();
80//!     });
81//!
82//!     loop {
83//!         tokio::select! {
84//!             _ = interval.tick() => println!("Another 100ms"),
85//!             msg = &mut recv => {
86//!                 println!("Got message: {}", msg.unwrap());
87//!                 break;
88//!             }
89//!         }
90//!     }
91//!     # handle.await.unwrap();
92//! }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//!     sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//!     fn drop(&mut self) {
106//!         if let Some(sender) = self.sender.take() {
107//!             // Using `let _ =` to ignore send errors.
108//!             let _ = sender.send("I got dropped!");
109//!         }
110//!     }
111//! }
112//!
113//! #[tokio::main]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! async fn main() {
117//!     let (send, recv) = oneshot::channel();
118//!
119//!     let send_on_drop = SendOnDrop { sender: Some(send) };
120//!     drop(send_on_drop);
121//!
122//!     assert_eq!(recv.await, Ok("I got dropped!"));
123//! }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// #[tokio::main]
151/// async fn main() {
152///     let (tx, rx) = oneshot::channel();
153///
154///     tokio::spawn(async move {
155///         if let Err(_) = tx.send(3) {
156///             println!("the receiver dropped");
157///         }
158///     });
159///
160///     match rx.await {
161///         Ok(v) => println!("got = {:?}", v),
162///         Err(_) => println!("the sender dropped"),
163///     }
164/// }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// #[tokio::main]
174/// async fn main() {
175///     let (tx, rx) = oneshot::channel::<u32>();
176///
177///     tokio::spawn(async move {
178///         drop(tx);
179///     });
180///
181///     match rx.await {
182///         Ok(_) => panic!("This doesn't happen"),
183///         Err(_) => println!("the sender dropped"),
184///     }
185/// }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195///     sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198///     fn drop(&mut self) {
199///         if let Some(sender) = self.sender.take() {
200///             // Using `let _ =` to ignore send errors.
201///             let _ = sender.send("I got dropped!");
202///         }
203///     }
204/// }
205///
206/// #[tokio::main]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// async fn main() {
210///     let (send, recv) = oneshot::channel();
211///
212///     let send_on_drop = SendOnDrop { sender: Some(send) };
213///     drop(send_on_drop);
214///
215///     assert_eq!(recv.await, Ok("I got dropped!"));
216/// }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223    inner: Option<Arc<Inner<T>>>,
224    #[cfg(all(tokio_unstable, feature = "tracing"))]
225    resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Examples
247///
248/// ```
249/// use tokio::sync::oneshot;
250///
251/// #[tokio::main]
252/// async fn main() {
253///     let (tx, rx) = oneshot::channel();
254///
255///     tokio::spawn(async move {
256///         if let Err(_) = tx.send(3) {
257///             println!("the receiver dropped");
258///         }
259///     });
260///
261///     match rx.await {
262///         Ok(v) => println!("got = {:?}", v),
263///         Err(_) => println!("the sender dropped"),
264///     }
265/// }
266/// ```
267///
268/// If the sender is dropped without sending, the receiver will fail with
269/// [`error::RecvError`]:
270///
271/// ```
272/// use tokio::sync::oneshot;
273///
274/// #[tokio::main]
275/// async fn main() {
276///     let (tx, rx) = oneshot::channel::<u32>();
277///
278///     tokio::spawn(async move {
279///         drop(tx);
280///     });
281///
282///     match rx.await {
283///         Ok(_) => panic!("This doesn't happen"),
284///         Err(_) => println!("the sender dropped"),
285///     }
286/// }
287/// ```
288///
289/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290/// channel.
291///
292/// ```
293/// use tokio::sync::oneshot;
294/// use tokio::time::{interval, sleep, Duration};
295///
296/// #[tokio::main]
297/// # async fn _doc() {}
298/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299/// async fn main() {
300///     let (send, mut recv) = oneshot::channel();
301///     let mut interval = interval(Duration::from_millis(100));
302///
303///     # let handle =
304///     tokio::spawn(async move {
305///         sleep(Duration::from_secs(1)).await;
306///         send.send("shut down").unwrap();
307///     });
308///
309///     loop {
310///         tokio::select! {
311///             _ = interval.tick() => println!("Another 100ms"),
312///             msg = &mut recv => {
313///                 println!("Got message: {}", msg.unwrap());
314///                 break;
315///             }
316///         }
317///     }
318///     # handle.await.unwrap();
319/// }
320/// ```
321#[derive(Debug)]
322pub struct Receiver<T> {
323    inner: Option<Arc<Inner<T>>>,
324    #[cfg(all(tokio_unstable, feature = "tracing"))]
325    resource_span: tracing::Span,
326    #[cfg(all(tokio_unstable, feature = "tracing"))]
327    async_op_span: tracing::Span,
328    #[cfg(all(tokio_unstable, feature = "tracing"))]
329    async_op_poll_span: tracing::Span,
330}
331
332pub mod error {
333    //! `Oneshot` error types.
334
335    use std::fmt;
336
337    /// Error returned by the `Future` implementation for `Receiver`.
338    ///
339    /// This error is returned by the receiver when the sender is dropped without sending.
340    #[derive(Debug, Eq, PartialEq, Clone)]
341    pub struct RecvError(pub(super) ());
342
343    /// Error returned by the `try_recv` function on `Receiver`.
344    #[derive(Debug, Eq, PartialEq, Clone)]
345    pub enum TryRecvError {
346        /// The send half of the channel has not yet sent a value.
347        Empty,
348
349        /// The send half of the channel was dropped without sending a value.
350        Closed,
351    }
352
353    // ===== impl RecvError =====
354
355    impl fmt::Display for RecvError {
356        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357            write!(fmt, "channel closed")
358        }
359    }
360
361    impl std::error::Error for RecvError {}
362
363    // ===== impl TryRecvError =====
364
365    impl fmt::Display for TryRecvError {
366        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367            match self {
368                TryRecvError::Empty => write!(fmt, "channel empty"),
369                TryRecvError::Closed => write!(fmt, "channel closed"),
370            }
371        }
372    }
373
374    impl std::error::Error for TryRecvError {}
375}
376
377use self::error::*;
378
379struct Inner<T> {
380    /// Manages the state of the inner cell.
381    state: AtomicUsize,
382
383    /// The value. This is set by `Sender` and read by `Receiver`. The state of
384    /// the cell is tracked by `state`.
385    value: UnsafeCell<Option<T>>,
386
387    /// The task to notify when the receiver drops without consuming the value.
388    ///
389    /// ## Safety
390    ///
391    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392    /// initialized. If that bit is unset, this field may be uninitialized.
393    tx_task: Task,
394
395    /// The task to notify when the value is sent.
396    ///
397    /// ## Safety
398    ///
399    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400    /// initialized. If that bit is unset, this field may be uninitialized.
401    rx_task: Task,
402}
403
404struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406impl Task {
407    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408        self.with_task(|w| w.will_wake(cx.waker()))
409    }
410
411    unsafe fn with_task<F, R>(&self, f: F) -> R
412    where
413        F: FnOnce(&Waker) -> R,
414    {
415        self.0.with(|ptr| {
416            let waker: *const Waker = (*ptr).as_ptr();
417            f(&*waker)
418        })
419    }
420
421    unsafe fn drop_task(&self) {
422        self.0.with_mut(|ptr| {
423            let ptr: *mut Waker = (*ptr).as_mut_ptr();
424            ptr.drop_in_place();
425        });
426    }
427
428    unsafe fn set_task(&self, cx: &mut Context<'_>) {
429        self.0.with_mut(|ptr| {
430            let ptr: *mut Waker = (*ptr).as_mut_ptr();
431            ptr.write(cx.waker().clone());
432        });
433    }
434}
435
436#[derive(Clone, Copy)]
437struct State(usize);
438
439/// Creates a new one-shot channel for sending single values across asynchronous
440/// tasks.
441///
442/// The function returns separate "send" and "receive" handles. The `Sender`
443/// handle is used by the producer to send the value. The `Receiver` handle is
444/// used by the consumer to receive the value.
445///
446/// Each handle can be used on separate tasks.
447///
448/// # Examples
449///
450/// ```
451/// use tokio::sync::oneshot;
452///
453/// #[tokio::main]
454/// async fn main() {
455///     let (tx, rx) = oneshot::channel();
456///
457///     tokio::spawn(async move {
458///         if let Err(_) = tx.send(3) {
459///             println!("the receiver dropped");
460///         }
461///     });
462///
463///     match rx.await {
464///         Ok(v) => println!("got = {:?}", v),
465///         Err(_) => println!("the sender dropped"),
466///     }
467/// }
468/// ```
469#[track_caller]
470pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471    #[cfg(all(tokio_unstable, feature = "tracing"))]
472    let resource_span = {
473        let location = std::panic::Location::caller();
474
475        let resource_span = tracing::trace_span!(
476            parent: None,
477            "runtime.resource",
478            concrete_type = "Sender|Receiver",
479            kind = "Sync",
480            loc.file = location.file(),
481            loc.line = location.line(),
482            loc.col = location.column(),
483        );
484
485        resource_span.in_scope(|| {
486            tracing::trace!(
487            target: "runtime::resource::state_update",
488            tx_dropped = false,
489            tx_dropped.op = "override",
490            )
491        });
492
493        resource_span.in_scope(|| {
494            tracing::trace!(
495            target: "runtime::resource::state_update",
496            rx_dropped = false,
497            rx_dropped.op = "override",
498            )
499        });
500
501        resource_span.in_scope(|| {
502            tracing::trace!(
503            target: "runtime::resource::state_update",
504            value_sent = false,
505            value_sent.op = "override",
506            )
507        });
508
509        resource_span.in_scope(|| {
510            tracing::trace!(
511            target: "runtime::resource::state_update",
512            value_received = false,
513            value_received.op = "override",
514            )
515        });
516
517        resource_span
518    };
519
520    let inner = Arc::new(Inner {
521        state: AtomicUsize::new(State::new().as_usize()),
522        value: UnsafeCell::new(None),
523        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
525    });
526
527    let tx = Sender {
528        inner: Some(inner.clone()),
529        #[cfg(all(tokio_unstable, feature = "tracing"))]
530        resource_span: resource_span.clone(),
531    };
532
533    #[cfg(all(tokio_unstable, feature = "tracing"))]
534    let async_op_span = resource_span
535        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
536
537    #[cfg(all(tokio_unstable, feature = "tracing"))]
538    let async_op_poll_span =
539        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
540
541    let rx = Receiver {
542        inner: Some(inner),
543        #[cfg(all(tokio_unstable, feature = "tracing"))]
544        resource_span,
545        #[cfg(all(tokio_unstable, feature = "tracing"))]
546        async_op_span,
547        #[cfg(all(tokio_unstable, feature = "tracing"))]
548        async_op_poll_span,
549    };
550
551    (tx, rx)
552}
553
554impl<T> Sender<T> {
555    /// Attempts to send a value on this channel, returning it back if it could
556    /// not be sent.
557    ///
558    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
559    /// channel. It is not marked async because sending a message to an `oneshot`
560    /// channel never requires any form of waiting.  Because of this, the `send`
561    /// method can be used in both synchronous and asynchronous code without
562    /// problems.
563    ///
564    /// A successful send occurs when it is determined that the other end of the
565    /// channel has not hung up already. An unsuccessful send would be one where
566    /// the corresponding receiver has already been deallocated. Note that a
567    /// return value of `Err` means that the data will never be received, but
568    /// a return value of `Ok` does *not* mean that the data will be received.
569    /// It is possible for the corresponding receiver to hang up immediately
570    /// after this function returns `Ok`.
571    ///
572    /// # Examples
573    ///
574    /// Send a value to another task
575    ///
576    /// ```
577    /// use tokio::sync::oneshot;
578    ///
579    /// #[tokio::main]
580    /// async fn main() {
581    ///     let (tx, rx) = oneshot::channel();
582    ///
583    ///     tokio::spawn(async move {
584    ///         if let Err(_) = tx.send(3) {
585    ///             println!("the receiver dropped");
586    ///         }
587    ///     });
588    ///
589    ///     match rx.await {
590    ///         Ok(v) => println!("got = {:?}", v),
591    ///         Err(_) => println!("the sender dropped"),
592    ///     }
593    /// }
594    /// ```
595    pub fn send(mut self, t: T) -> Result<(), T> {
596        let inner = self.inner.take().unwrap();
597
598        inner.value.with_mut(|ptr| unsafe {
599            // SAFETY: The receiver will not access the `UnsafeCell` unless the
600            // channel has been marked as "complete" (the `VALUE_SENT` state bit
601            // is set).
602            // That bit is only set by the sender later on in this method, and
603            // calling this method consumes `self`. Therefore, if it was possible to
604            // call this method, we know that the `VALUE_SENT` bit is unset, and
605            // the receiver is not currently accessing the `UnsafeCell`.
606            *ptr = Some(t);
607        });
608
609        if !inner.complete() {
610            unsafe {
611                // SAFETY: The receiver will not access the `UnsafeCell` unless
612                // the channel has been marked as "complete". Calling
613                // `complete()` will return true if this bit is set, and false
614                // if it is not set. Thus, if `complete()` returned false, it is
615                // safe for us to access the value, because we know that the
616                // receiver will not.
617                return Err(inner.consume_value().unwrap());
618            }
619        }
620
621        #[cfg(all(tokio_unstable, feature = "tracing"))]
622        self.resource_span.in_scope(|| {
623            tracing::trace!(
624            target: "runtime::resource::state_update",
625            value_sent = true,
626            value_sent.op = "override",
627            )
628        });
629
630        Ok(())
631    }
632
633    /// Waits for the associated [`Receiver`] handle to close.
634    ///
635    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
636    /// [`Receiver`] value is dropped.
637    ///
638    /// This function is useful when paired with `select!` to abort a
639    /// computation when the receiver is no longer interested in the result.
640    ///
641    /// # Return
642    ///
643    /// Returns a `Future` which must be awaited on.
644    ///
645    /// [`Receiver`]: Receiver
646    /// [`close`]: Receiver::close
647    ///
648    /// # Examples
649    ///
650    /// Basic usage
651    ///
652    /// ```
653    /// use tokio::sync::oneshot;
654    ///
655    /// #[tokio::main]
656    /// async fn main() {
657    ///     let (mut tx, rx) = oneshot::channel::<()>();
658    ///
659    ///     tokio::spawn(async move {
660    ///         drop(rx);
661    ///     });
662    ///
663    ///     tx.closed().await;
664    ///     println!("the receiver dropped");
665    /// }
666    /// ```
667    ///
668    /// Paired with select
669    ///
670    /// ```
671    /// use tokio::sync::oneshot;
672    /// use tokio::time::{self, Duration};
673    ///
674    /// async fn compute() -> String {
675    ///     // Complex computation returning a `String`
676    /// # "hello".to_string()
677    /// }
678    ///
679    /// #[tokio::main]
680    /// async fn main() {
681    ///     let (mut tx, rx) = oneshot::channel();
682    ///
683    ///     tokio::spawn(async move {
684    ///         tokio::select! {
685    ///             _ = tx.closed() => {
686    ///                 // The receiver dropped, no need to do any further work
687    ///             }
688    ///             value = compute() => {
689    ///                 // The send can fail if the channel was closed at the exact same
690    ///                 // time as when compute() finished, so just ignore the failure.
691    ///                 let _ = tx.send(value);
692    ///             }
693    ///         }
694    ///     });
695    ///
696    ///     // Wait for up to 10 seconds
697    ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
698    /// }
699    /// ```
700    pub async fn closed(&mut self) {
701        use std::future::poll_fn;
702
703        #[cfg(all(tokio_unstable, feature = "tracing"))]
704        let resource_span = self.resource_span.clone();
705        #[cfg(all(tokio_unstable, feature = "tracing"))]
706        let closed = trace::async_op(
707            || poll_fn(|cx| self.poll_closed(cx)),
708            resource_span,
709            "Sender::closed",
710            "poll_closed",
711            false,
712        );
713        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
714        let closed = poll_fn(|cx| self.poll_closed(cx));
715
716        closed.await;
717    }
718
719    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
720    ///
721    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
722    /// [`Receiver`] value is dropped.
723    ///
724    /// If `true` is returned, a call to `send` will always result in an error.
725    ///
726    /// [`Receiver`]: Receiver
727    /// [`close`]: Receiver::close
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// use tokio::sync::oneshot;
733    ///
734    /// #[tokio::main]
735    /// async fn main() {
736    ///     let (tx, rx) = oneshot::channel();
737    ///
738    ///     assert!(!tx.is_closed());
739    ///
740    ///     drop(rx);
741    ///
742    ///     assert!(tx.is_closed());
743    ///     assert!(tx.send("never received").is_err());
744    /// }
745    /// ```
746    pub fn is_closed(&self) -> bool {
747        let inner = self.inner.as_ref().unwrap();
748
749        let state = State::load(&inner.state, Acquire);
750        state.is_closed()
751    }
752
753    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
754    /// `Waker` in the provided `Context` to receive a notification when the channel is
755    /// closed.
756    ///
757    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
758    /// [`Receiver`] value is dropped.
759    ///
760    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
761    /// to the most recent call will be scheduled to receive a wakeup.
762    ///
763    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
764    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
765    ///
766    /// # Return value
767    ///
768    /// This function returns:
769    ///
770    ///  * `Poll::Pending` if the channel is still open.
771    ///  * `Poll::Ready(())` if the channel is closed.
772    ///
773    /// # Examples
774    ///
775    /// ```
776    /// use tokio::sync::oneshot;
777    ///
778    /// use std::future::poll_fn;
779    ///
780    /// #[tokio::main]
781    /// async fn main() {
782    ///     let (mut tx, mut rx) = oneshot::channel::<()>();
783    ///
784    ///     tokio::spawn(async move {
785    ///         rx.close();
786    ///     });
787    ///
788    ///     poll_fn(|cx| tx.poll_closed(cx)).await;
789    ///
790    ///     println!("the receiver dropped");
791    /// }
792    /// ```
793    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
794        ready!(crate::trace::trace_leaf(cx));
795
796        // Keep track of task budget
797        let coop = ready!(crate::task::coop::poll_proceed(cx));
798
799        let inner = self.inner.as_ref().unwrap();
800
801        let mut state = State::load(&inner.state, Acquire);
802
803        if state.is_closed() {
804            coop.made_progress();
805            return Ready(());
806        }
807
808        if state.is_tx_task_set() {
809            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
810
811            if !will_notify {
812                state = State::unset_tx_task(&inner.state);
813
814                if state.is_closed() {
815                    // Set the flag again so that the waker is released in drop
816                    State::set_tx_task(&inner.state);
817                    coop.made_progress();
818                    return Ready(());
819                } else {
820                    unsafe { inner.tx_task.drop_task() };
821                }
822            }
823        }
824
825        if !state.is_tx_task_set() {
826            // Attempt to set the task
827            unsafe {
828                inner.tx_task.set_task(cx);
829            }
830
831            // Update the state
832            state = State::set_tx_task(&inner.state);
833
834            if state.is_closed() {
835                coop.made_progress();
836                return Ready(());
837            }
838        }
839
840        Pending
841    }
842}
843
844impl<T> Drop for Sender<T> {
845    fn drop(&mut self) {
846        if let Some(inner) = self.inner.as_ref() {
847            inner.complete();
848            #[cfg(all(tokio_unstable, feature = "tracing"))]
849            self.resource_span.in_scope(|| {
850                tracing::trace!(
851                target: "runtime::resource::state_update",
852                tx_dropped = true,
853                tx_dropped.op = "override",
854                )
855            });
856        }
857    }
858}
859
860impl<T> Receiver<T> {
861    /// Prevents the associated [`Sender`] handle from sending a value.
862    ///
863    /// Any `send` operation which happens after calling `close` is guaranteed
864    /// to fail. After calling `close`, [`try_recv`] should be called to
865    /// receive a value if one was sent **before** the call to `close`
866    /// completed.
867    ///
868    /// This function is useful to perform a graceful shutdown and ensure that a
869    /// value will not be sent into the channel and never received.
870    ///
871    /// `close` is no-op if a message is already received or the channel
872    /// is already closed.
873    ///
874    /// [`Sender`]: Sender
875    /// [`try_recv`]: Receiver::try_recv
876    ///
877    /// # Examples
878    ///
879    /// Prevent a value from being sent
880    ///
881    /// ```
882    /// use tokio::sync::oneshot;
883    /// use tokio::sync::oneshot::error::TryRecvError;
884    ///
885    /// #[tokio::main]
886    /// async fn main() {
887    ///     let (tx, mut rx) = oneshot::channel();
888    ///
889    ///     assert!(!tx.is_closed());
890    ///
891    ///     rx.close();
892    ///
893    ///     assert!(tx.is_closed());
894    ///     assert!(tx.send("never received").is_err());
895    ///
896    ///     match rx.try_recv() {
897    ///         Err(TryRecvError::Closed) => {}
898    ///         _ => unreachable!(),
899    ///     }
900    /// }
901    /// ```
902    ///
903    /// Receive a value sent **before** calling `close`
904    ///
905    /// ```
906    /// use tokio::sync::oneshot;
907    ///
908    /// #[tokio::main]
909    /// async fn main() {
910    ///     let (tx, mut rx) = oneshot::channel();
911    ///
912    ///     assert!(tx.send("will receive").is_ok());
913    ///
914    ///     rx.close();
915    ///
916    ///     let msg = rx.try_recv().unwrap();
917    ///     assert_eq!(msg, "will receive");
918    /// }
919    /// ```
920    pub fn close(&mut self) {
921        if let Some(inner) = self.inner.as_ref() {
922            inner.close();
923            #[cfg(all(tokio_unstable, feature = "tracing"))]
924            self.resource_span.in_scope(|| {
925                tracing::trace!(
926                target: "runtime::resource::state_update",
927                rx_dropped = true,
928                rx_dropped.op = "override",
929                )
930            });
931        }
932    }
933
934    /// Checks if this receiver is terminated.
935    ///
936    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
937    /// If so, this receiver should no longer be polled.
938    ///
939    /// # Examples
940    ///
941    /// Sending a value and polling it.
942    ///
943    /// ```
944    /// use tokio::sync::oneshot;
945    ///
946    /// use std::task::Poll;
947    ///
948    /// #[tokio::main]
949    /// async fn main() {
950    ///     let (tx, mut rx) = oneshot::channel();
951    ///
952    ///     // A receiver is not terminated when it is initialized.
953    ///     assert!(!rx.is_terminated());
954    ///
955    ///     // A receiver is not terminated it is polled and is still pending.
956    ///     let poll = futures::poll!(&mut rx);
957    ///     assert_eq!(poll, Poll::Pending);
958    ///     assert!(!rx.is_terminated());
959    ///
960    ///     // A receiver is not terminated if a value has been sent, but not yet read.
961    ///     tx.send(0).unwrap();
962    ///     assert!(!rx.is_terminated());
963    ///
964    ///     // A receiver *is* terminated after it has been polled and yielded a value.
965    ///     assert_eq!((&mut rx).await, Ok(0));
966    ///     assert!(rx.is_terminated());
967    /// }
968    /// ```
969    ///
970    /// Dropping the sender.
971    ///
972    /// ```
973    /// use tokio::sync::oneshot;
974    ///
975    /// #[tokio::main]
976    /// async fn main() {
977    ///     let (tx, mut rx) = oneshot::channel::<()>();
978    ///
979    ///     // A receiver is not immediately terminated when the sender is dropped.
980    ///     drop(tx);
981    ///     assert!(!rx.is_terminated());
982    ///
983    ///     // A receiver *is* terminated after it has been polled and yielded an error.
984    ///     let _ = (&mut rx).await.unwrap_err();
985    ///     assert!(rx.is_terminated());
986    /// }
987    /// ```
988    pub fn is_terminated(&self) -> bool {
989        self.inner.is_none()
990    }
991
992    /// Checks if a channel is empty.
993    ///
994    /// This method returns `true` if the channel has no messages.
995    ///
996    /// It is not necessarily safe to poll an empty receiver, which may have
997    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
998    /// to check whether or not a receiver can be safely polled, instead.
999    ///
1000    /// # Examples
1001    ///
1002    /// Sending a value.
1003    ///
1004    /// ```
1005    /// use tokio::sync::oneshot;
1006    ///
1007    /// #[tokio::main]
1008    /// async fn main() {
1009    ///     let (tx, mut rx) = oneshot::channel();
1010    ///     assert!(rx.is_empty());
1011    ///
1012    ///     tx.send(0).unwrap();
1013    ///     assert!(!rx.is_empty());
1014    ///
1015    ///     let _ = (&mut rx).await;
1016    ///     assert!(rx.is_empty());
1017    /// }
1018    /// ```
1019    ///
1020    /// Dropping the sender.
1021    ///
1022    /// ```
1023    /// use tokio::sync::oneshot;
1024    ///
1025    /// #[tokio::main]
1026    /// async fn main() {
1027    ///     let (tx, mut rx) = oneshot::channel::<()>();
1028    ///
1029    ///     // A channel is empty if the sender is dropped.
1030    ///     drop(tx);
1031    ///     assert!(rx.is_empty());
1032    ///
1033    ///     // A closed channel still yields an error, however.
1034    ///     (&mut rx).await.expect_err("should yield an error");
1035    ///     assert!(rx.is_empty());
1036    /// }
1037    /// ```
1038    ///
1039    /// Terminated channels are empty.
1040    ///
1041    /// ```should_panic
1042    /// use tokio::sync::oneshot;
1043    ///
1044    /// #[tokio::main]
1045    /// async fn main() {
1046    ///     let (tx, mut rx) = oneshot::channel();
1047    ///     tx.send(0).unwrap();
1048    ///     let _ = (&mut rx).await;
1049    ///
1050    ///     // NB: an empty channel is not necessarily safe to poll!
1051    ///     assert!(rx.is_empty());
1052    ///     let _ = (&mut rx).await;
1053    /// }
1054    /// ```
1055    pub fn is_empty(&self) -> bool {
1056        let Some(inner) = self.inner.as_ref() else {
1057            // The channel has already terminated.
1058            return true;
1059        };
1060
1061        let state = State::load(&inner.state, Acquire);
1062        if state.is_complete() {
1063            // SAFETY: If `state.is_complete()` returns true, then the
1064            // `VALUE_SENT` bit has been set and the sender side of the
1065            // channel will no longer attempt to access the inner
1066            // `UnsafeCell`. Therefore, it is now safe for us to access the
1067            // cell.
1068            //
1069            // The channel is empty if it does not have a value.
1070            unsafe { !inner.has_value() }
1071        } else {
1072            // The receiver closed the channel or no value has been sent yet.
1073            true
1074        }
1075    }
1076
1077    /// Attempts to receive a value.
1078    ///
1079    /// If a pending value exists in the channel, it is returned. If no value
1080    /// has been sent, the current task **will not** be registered for
1081    /// future notification.
1082    ///
1083    /// This function is useful to call from outside the context of an
1084    /// asynchronous task.
1085    ///
1086    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1087    /// spuriously. Any send or close event that happens before this call to
1088    /// `try_recv` will be correctly returned to the caller.
1089    ///
1090    /// # Return
1091    ///
1092    /// - `Ok(T)` if a value is pending in the channel.
1093    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1094    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1095    ///   a value, or if the message has already been received.
1096    ///
1097    /// # Examples
1098    ///
1099    /// `try_recv` before a value is sent, then after.
1100    ///
1101    /// ```
1102    /// use tokio::sync::oneshot;
1103    /// use tokio::sync::oneshot::error::TryRecvError;
1104    ///
1105    /// #[tokio::main]
1106    /// async fn main() {
1107    ///     let (tx, mut rx) = oneshot::channel();
1108    ///
1109    ///     match rx.try_recv() {
1110    ///         // The channel is currently empty
1111    ///         Err(TryRecvError::Empty) => {}
1112    ///         _ => unreachable!(),
1113    ///     }
1114    ///
1115    ///     // Send a value
1116    ///     tx.send("hello").unwrap();
1117    ///
1118    ///     match rx.try_recv() {
1119    ///         Ok(value) => assert_eq!(value, "hello"),
1120    ///         _ => unreachable!(),
1121    ///     }
1122    /// }
1123    /// ```
1124    ///
1125    /// `try_recv` when the sender dropped before sending a value
1126    ///
1127    /// ```
1128    /// use tokio::sync::oneshot;
1129    /// use tokio::sync::oneshot::error::TryRecvError;
1130    ///
1131    /// #[tokio::main]
1132    /// async fn main() {
1133    ///     let (tx, mut rx) = oneshot::channel::<()>();
1134    ///
1135    ///     drop(tx);
1136    ///
1137    ///     match rx.try_recv() {
1138    ///         // The channel will never receive a value.
1139    ///         Err(TryRecvError::Closed) => {}
1140    ///         _ => unreachable!(),
1141    ///     }
1142    /// }
1143    /// ```
1144    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1145        let result = if let Some(inner) = self.inner.as_ref() {
1146            let state = State::load(&inner.state, Acquire);
1147
1148            if state.is_complete() {
1149                // SAFETY: If `state.is_complete()` returns true, then the
1150                // `VALUE_SENT` bit has been set and the sender side of the
1151                // channel will no longer attempt to access the inner
1152                // `UnsafeCell`. Therefore, it is now safe for us to access the
1153                // cell.
1154                match unsafe { inner.consume_value() } {
1155                    Some(value) => {
1156                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1157                        self.resource_span.in_scope(|| {
1158                            tracing::trace!(
1159                            target: "runtime::resource::state_update",
1160                            value_received = true,
1161                            value_received.op = "override",
1162                            )
1163                        });
1164                        Ok(value)
1165                    }
1166                    None => Err(TryRecvError::Closed),
1167                }
1168            } else if state.is_closed() {
1169                Err(TryRecvError::Closed)
1170            } else {
1171                // Not ready, this does not clear `inner`
1172                return Err(TryRecvError::Empty);
1173            }
1174        } else {
1175            Err(TryRecvError::Closed)
1176        };
1177
1178        self.inner = None;
1179        result
1180    }
1181
1182    /// Blocking receive to call outside of asynchronous contexts.
1183    ///
1184    /// # Panics
1185    ///
1186    /// This function panics if called within an asynchronous execution
1187    /// context.
1188    ///
1189    /// # Examples
1190    ///
1191    /// ```
1192    /// use std::thread;
1193    /// use tokio::sync::oneshot;
1194    ///
1195    /// #[tokio::main]
1196    /// async fn main() {
1197    ///     let (tx, rx) = oneshot::channel::<u8>();
1198    ///
1199    ///     let sync_code = thread::spawn(move || {
1200    ///         assert_eq!(Ok(10), rx.blocking_recv());
1201    ///     });
1202    ///
1203    ///     let _ = tx.send(10);
1204    ///     sync_code.join().unwrap();
1205    /// }
1206    /// ```
1207    #[track_caller]
1208    #[cfg(feature = "sync")]
1209    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1210    pub fn blocking_recv(self) -> Result<T, RecvError> {
1211        crate::future::block_on(self)
1212    }
1213}
1214
1215impl<T> Drop for Receiver<T> {
1216    fn drop(&mut self) {
1217        if let Some(inner) = self.inner.as_ref() {
1218            let state = inner.close();
1219
1220            if state.is_complete() {
1221                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1222                // so only the receiver can access the value.
1223                drop(unsafe { inner.consume_value() });
1224            }
1225
1226            #[cfg(all(tokio_unstable, feature = "tracing"))]
1227            self.resource_span.in_scope(|| {
1228                tracing::trace!(
1229                target: "runtime::resource::state_update",
1230                rx_dropped = true,
1231                rx_dropped.op = "override",
1232                )
1233            });
1234        }
1235    }
1236}
1237
1238impl<T> Future for Receiver<T> {
1239    type Output = Result<T, RecvError>;
1240
1241    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1242        // If `inner` is `None`, then `poll()` has already completed.
1243        #[cfg(all(tokio_unstable, feature = "tracing"))]
1244        let _res_span = self.resource_span.clone().entered();
1245        #[cfg(all(tokio_unstable, feature = "tracing"))]
1246        let _ao_span = self.async_op_span.clone().entered();
1247        #[cfg(all(tokio_unstable, feature = "tracing"))]
1248        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1249
1250        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1251            #[cfg(all(tokio_unstable, feature = "tracing"))]
1252            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1253
1254            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1255            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1256
1257            res
1258        } else {
1259            panic!("called after complete");
1260        };
1261
1262        self.inner = None;
1263        Ready(ret)
1264    }
1265}
1266
1267impl<T> Inner<T> {
1268    fn complete(&self) -> bool {
1269        let prev = State::set_complete(&self.state);
1270
1271        if prev.is_closed() {
1272            return false;
1273        }
1274
1275        if prev.is_rx_task_set() {
1276            // TODO: Consume waker?
1277            unsafe {
1278                self.rx_task.with_task(Waker::wake_by_ref);
1279            }
1280        }
1281
1282        true
1283    }
1284
1285    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1286        ready!(crate::trace::trace_leaf(cx));
1287        // Keep track of task budget
1288        let coop = ready!(crate::task::coop::poll_proceed(cx));
1289
1290        // Load the state
1291        let mut state = State::load(&self.state, Acquire);
1292
1293        if state.is_complete() {
1294            coop.made_progress();
1295            match unsafe { self.consume_value() } {
1296                Some(value) => Ready(Ok(value)),
1297                None => Ready(Err(RecvError(()))),
1298            }
1299        } else if state.is_closed() {
1300            coop.made_progress();
1301            Ready(Err(RecvError(())))
1302        } else {
1303            if state.is_rx_task_set() {
1304                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1305
1306                // Check if the task is still the same
1307                if !will_notify {
1308                    // Unset the task
1309                    state = State::unset_rx_task(&self.state);
1310                    if state.is_complete() {
1311                        // Set the flag again so that the waker is released in drop
1312                        State::set_rx_task(&self.state);
1313
1314                        coop.made_progress();
1315                        // SAFETY: If `state.is_complete()` returns true, then the
1316                        // `VALUE_SENT` bit has been set and the sender side of the
1317                        // channel will no longer attempt to access the inner
1318                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1319                        // cell.
1320                        return match unsafe { self.consume_value() } {
1321                            Some(value) => Ready(Ok(value)),
1322                            None => Ready(Err(RecvError(()))),
1323                        };
1324                    } else {
1325                        unsafe { self.rx_task.drop_task() };
1326                    }
1327                }
1328            }
1329
1330            if !state.is_rx_task_set() {
1331                // Attempt to set the task
1332                unsafe {
1333                    self.rx_task.set_task(cx);
1334                }
1335
1336                // Update the state
1337                state = State::set_rx_task(&self.state);
1338
1339                if state.is_complete() {
1340                    coop.made_progress();
1341                    match unsafe { self.consume_value() } {
1342                        Some(value) => Ready(Ok(value)),
1343                        None => Ready(Err(RecvError(()))),
1344                    }
1345                } else {
1346                    Pending
1347                }
1348            } else {
1349                Pending
1350            }
1351        }
1352    }
1353
1354    /// Called by `Receiver` to indicate that the value will never be received.
1355    fn close(&self) -> State {
1356        let prev = State::set_closed(&self.state);
1357
1358        if prev.is_tx_task_set() && !prev.is_complete() {
1359            unsafe {
1360                self.tx_task.with_task(Waker::wake_by_ref);
1361            }
1362        }
1363
1364        prev
1365    }
1366
1367    /// Consumes the value. This function does not check `state`.
1368    ///
1369    /// # Safety
1370    ///
1371    /// Calling this method concurrently on multiple threads will result in a
1372    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1373    /// sender *or* the receiver will call this method at a given point in time.
1374    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1375    /// if it is set, then only the receiver may call this method.
1376    unsafe fn consume_value(&self) -> Option<T> {
1377        self.value.with_mut(|ptr| (*ptr).take())
1378    }
1379
1380    /// Returns true if there is a value. This function does not check `state`.
1381    ///
1382    /// # Safety
1383    ///
1384    /// Calling this method concurrently on multiple threads will result in a
1385    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1386    /// sender *or* the receiver will call this method at a given point in time.
1387    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1388    /// if it is set, then only the receiver may call this method.
1389    unsafe fn has_value(&self) -> bool {
1390        self.value.with(|ptr| (*ptr).is_some())
1391    }
1392}
1393
1394unsafe impl<T: Send> Send for Inner<T> {}
1395unsafe impl<T: Send> Sync for Inner<T> {}
1396
1397fn mut_load(this: &mut AtomicUsize) -> usize {
1398    this.with_mut(|v| *v)
1399}
1400
1401impl<T> Drop for Inner<T> {
1402    fn drop(&mut self) {
1403        let state = State(mut_load(&mut self.state));
1404
1405        if state.is_rx_task_set() {
1406            unsafe {
1407                self.rx_task.drop_task();
1408            }
1409        }
1410
1411        if state.is_tx_task_set() {
1412            unsafe {
1413                self.tx_task.drop_task();
1414            }
1415        }
1416
1417        // SAFETY: we have `&mut self`, and therefore we have
1418        // exclusive access to the value.
1419        unsafe {
1420            // Note: the assertion holds because if the value has been sent by sender,
1421            // we must ensure that the value must have been consumed by the receiver before
1422            // dropping the `Inner`.
1423            debug_assert!(self.consume_value().is_none());
1424        }
1425    }
1426}
1427
1428impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1429    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1430        use std::sync::atomic::Ordering::Relaxed;
1431
1432        fmt.debug_struct("Inner")
1433            .field("state", &State::load(&self.state, Relaxed))
1434            .finish()
1435    }
1436}
1437
1438/// Indicates that a waker for the receiving task has been set.
1439///
1440/// # Safety
1441///
1442/// If this bit is not set, the `rx_task` field may be uninitialized.
1443const RX_TASK_SET: usize = 0b00001;
1444/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1445///
1446/// # Safety
1447///
1448/// This bit controls which side of the channel is permitted to access the
1449/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1450/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1451/// the sender.
1452const VALUE_SENT: usize = 0b00010;
1453const CLOSED: usize = 0b00100;
1454
1455/// Indicates that a waker for the sending task has been set.
1456///
1457/// # Safety
1458///
1459/// If this bit is not set, the `tx_task` field may be uninitialized.
1460const TX_TASK_SET: usize = 0b01000;
1461
1462impl State {
1463    fn new() -> State {
1464        State(0)
1465    }
1466
1467    fn is_complete(self) -> bool {
1468        self.0 & VALUE_SENT == VALUE_SENT
1469    }
1470
1471    fn set_complete(cell: &AtomicUsize) -> State {
1472        // This method is a compare-and-swap loop rather than a fetch-or like
1473        // other `set_$WHATEVER` methods on `State`. This is because we must
1474        // check if the state has been closed before setting the `VALUE_SENT`
1475        // bit.
1476        //
1477        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1478        // bit is already set, because `VALUE_SENT` will tell the receiver that
1479        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1480        // `set_complete`, if the channel was closed, the sender will _also_
1481        // access the `UnsafeCell` to take the value back out, so if a
1482        // `poll_recv` or `try_recv` call is occurring concurrently, both
1483        // threads may try to access the `UnsafeCell` if we were to set the
1484        // `VALUE_SENT` bit on a closed channel.
1485        let mut state = cell.load(Ordering::Relaxed);
1486        loop {
1487            if State(state).is_closed() {
1488                break;
1489            }
1490            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1491            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1492            // fences yet.
1493            match cell.compare_exchange_weak(
1494                state,
1495                state | VALUE_SENT,
1496                Ordering::AcqRel,
1497                Ordering::Acquire,
1498            ) {
1499                Ok(_) => break,
1500                Err(actual) => state = actual,
1501            }
1502        }
1503        State(state)
1504    }
1505
1506    fn is_rx_task_set(self) -> bool {
1507        self.0 & RX_TASK_SET == RX_TASK_SET
1508    }
1509
1510    fn set_rx_task(cell: &AtomicUsize) -> State {
1511        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1512        State(val | RX_TASK_SET)
1513    }
1514
1515    fn unset_rx_task(cell: &AtomicUsize) -> State {
1516        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1517        State(val & !RX_TASK_SET)
1518    }
1519
1520    fn is_closed(self) -> bool {
1521        self.0 & CLOSED == CLOSED
1522    }
1523
1524    fn set_closed(cell: &AtomicUsize) -> State {
1525        // Acquire because we want all later writes (attempting to poll) to be
1526        // ordered after this.
1527        let val = cell.fetch_or(CLOSED, Acquire);
1528        State(val)
1529    }
1530
1531    fn set_tx_task(cell: &AtomicUsize) -> State {
1532        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1533        State(val | TX_TASK_SET)
1534    }
1535
1536    fn unset_tx_task(cell: &AtomicUsize) -> State {
1537        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1538        State(val & !TX_TASK_SET)
1539    }
1540
1541    fn is_tx_task_set(self) -> bool {
1542        self.0 & TX_TASK_SET == TX_TASK_SET
1543    }
1544
1545    fn as_usize(self) -> usize {
1546        self.0
1547    }
1548
1549    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1550        let val = cell.load(order);
1551        State(val)
1552    }
1553}
1554
1555impl fmt::Debug for State {
1556    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1557        fmt.debug_struct("State")
1558            .field("is_complete", &self.is_complete())
1559            .field("is_closed", &self.is_closed())
1560            .field("is_rx_task_set", &self.is_rx_task_set())
1561            .field("is_tx_task_set", &self.is_tx_task_set())
1562            .finish()
1563    }
1564}