futures_util/stream/stream/
take_until.rs
1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11pin_project! {
14 #[must_use = "streams do nothing unless polled"]
16 pub struct TakeUntil<St: Stream, Fut: Future> {
17 #[pin]
18 stream: St,
19 #[pin]
22 fut: Option<Fut>,
23 fut_result: Option<Fut::Output>,
25 free: bool,
27 }
28}
29
30impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
31where
32 St: Stream + fmt::Debug,
33 St::Item: fmt::Debug,
34 Fut: Future + fmt::Debug,
35{
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
38 }
39}
40
41impl<St, Fut> TakeUntil<St, Fut>
42where
43 St: Stream,
44 Fut: Future,
45{
46 pub(super) fn new(stream: St, fut: Fut) -> Self {
47 Self { stream, fut: Some(fut), fut_result: None, free: false }
48 }
49
50 delegate_access_inner!(stream, St, ());
51
52 pub fn take_future(&mut self) -> Option<Fut> {
57 if self.fut.is_some() {
58 self.free = true;
59 }
60
61 self.fut.take()
62 }
63
64 pub fn take_result(&mut self) -> Option<Fut::Output> {
101 self.fut_result.take()
102 }
103
104 pub fn is_stopped(&self) -> bool {
107 !self.free && self.fut.is_none()
108 }
109}
110
111impl<St, Fut> Stream for TakeUntil<St, Fut>
112where
113 St: Stream,
114 Fut: Future,
115{
116 type Item = St::Item;
117
118 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
119 let mut this = self.project();
120
121 if let Some(f) = this.fut.as_mut().as_pin_mut() {
122 if let Poll::Ready(result) = f.poll(cx) {
123 this.fut.set(None);
124 *this.fut_result = Some(result);
125 }
126 }
127
128 if !*this.free && this.fut.is_none() {
129 Poll::Ready(None)
131 } else {
132 let item = ready!(this.stream.poll_next(cx));
134 if item.is_none() {
135 this.fut.set(None);
136 }
137 Poll::Ready(item)
138 }
139 }
140
141 fn size_hint(&self) -> (usize, Option<usize>) {
142 if self.is_stopped() {
143 return (0, Some(0));
144 }
145
146 self.stream.size_hint()
147 }
148}
149
150impl<St, Fut> FusedStream for TakeUntil<St, Fut>
151where
152 St: Stream,
153 Fut: Future,
154{
155 fn is_terminated(&self) -> bool {
156 self.is_stopped()
157 }
158}
159
160#[cfg(feature = "sink")]
162impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
163where
164 S: Stream + Sink<Item>,
165 Fut: Future,
166{
167 type Error = S::Error;
168
169 delegate_sink!(stream, Item);
170}