futures_util/stream/stream/
take_until.rs

1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11// FIXME: docs, tests
12
13pin_project! {
14    /// Stream for the [`take_until`](super::StreamExt::take_until) method.
15    #[must_use = "streams do nothing unless polled"]
16    pub struct TakeUntil<St: Stream, Fut: Future> {
17        #[pin]
18        stream: St,
19        // Contains the inner Future on start and None once the inner Future is resolved
20        // or taken out by the user.
21        #[pin]
22        fut: Option<Fut>,
23        // Contains fut's return value once fut is resolved
24        fut_result: Option<Fut::Output>,
25        // Whether the future was taken out by the user.
26        free: bool,
27    }
28}
29
30impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
31where
32    St: Stream + fmt::Debug,
33    St::Item: fmt::Debug,
34    Fut: Future + fmt::Debug,
35{
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
38    }
39}
40
41impl<St, Fut> TakeUntil<St, Fut>
42where
43    St: Stream,
44    Fut: Future,
45{
46    pub(super) fn new(stream: St, fut: Fut) -> Self {
47        Self { stream, fut: Some(fut), fut_result: None, free: false }
48    }
49
50    delegate_access_inner!(stream, St, ());
51
52    /// Extract the stopping future out of the combinator.
53    /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
54    /// Taking out the future means the combinator will be yielding
55    /// elements from the wrapped stream without ever stopping it.
56    pub fn take_future(&mut self) -> Option<Fut> {
57        if self.fut.is_some() {
58            self.free = true;
59        }
60
61        self.fut.take()
62    }
63
64    /// Once the stopping future is resolved, this method can be used
65    /// to extract the value returned by the stopping future.
66    ///
67    /// This may be used to retrieve arbitrary data from the stopping
68    /// future, for example a reason why the stream was stopped.
69    ///
70    /// This method will return `None` if the future isn't resolved yet,
71    /// or if the result was already taken out.
72    ///
73    /// # Examples
74    ///
75    /// ```
76    /// # futures::executor::block_on(async {
77    /// use futures::future;
78    /// use futures::stream::{self, StreamExt};
79    /// use futures::task::Poll;
80    ///
81    /// let stream = stream::iter(1..=10);
82    ///
83    /// let mut i = 0;
84    /// let stop_fut = future::poll_fn(|_cx| {
85    ///     i += 1;
86    ///     if i <= 5 {
87    ///         Poll::Pending
88    ///     } else {
89    ///         Poll::Ready("reason")
90    ///     }
91    /// });
92    ///
93    /// let mut stream = stream.take_until(stop_fut);
94    /// let _ = stream.by_ref().collect::<Vec<_>>().await;
95    ///
96    /// let result = stream.take_result().unwrap();
97    /// assert_eq!(result, "reason");
98    /// # });
99    /// ```
100    pub fn take_result(&mut self) -> Option<Fut::Output> {
101        self.fut_result.take()
102    }
103
104    /// Whether the stream was stopped yet by the stopping future
105    /// being resolved.
106    pub fn is_stopped(&self) -> bool {
107        !self.free && self.fut.is_none()
108    }
109}
110
111impl<St, Fut> Stream for TakeUntil<St, Fut>
112where
113    St: Stream,
114    Fut: Future,
115{
116    type Item = St::Item;
117
118    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
119        let mut this = self.project();
120
121        if let Some(f) = this.fut.as_mut().as_pin_mut() {
122            if let Poll::Ready(result) = f.poll(cx) {
123                this.fut.set(None);
124                *this.fut_result = Some(result);
125            }
126        }
127
128        if !*this.free && this.fut.is_none() {
129            // Future resolved, inner stream stopped
130            Poll::Ready(None)
131        } else {
132            // Future either not resolved yet or taken out by the user
133            let item = ready!(this.stream.poll_next(cx));
134            if item.is_none() {
135                this.fut.set(None);
136            }
137            Poll::Ready(item)
138        }
139    }
140
141    fn size_hint(&self) -> (usize, Option<usize>) {
142        if self.is_stopped() {
143            return (0, Some(0));
144        }
145
146        self.stream.size_hint()
147    }
148}
149
150impl<St, Fut> FusedStream for TakeUntil<St, Fut>
151where
152    St: Stream,
153    Fut: Future,
154{
155    fn is_terminated(&self) -> bool {
156        self.is_stopped()
157    }
158}
159
160// Forwarding impl of Sink from the underlying stream
161#[cfg(feature = "sink")]
162impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
163where
164    S: Stream + Sink<Item>,
165    Fut: Future,
166{
167    type Error = S::Error;
168
169    delegate_sink!(stream, Item);
170}