rune/runtime/
stream.rs

1use core::fmt;
2
3use crate as rune;
4use crate::alloc::clone::TryClone;
5use crate::alloc::fmt::TryWrite;
6use crate::runtime::{
7    Formatter, GeneratorState, Mut, Value, Vm, VmErrorKind, VmExecution, VmResult,
8};
9use crate::Any;
10
11/// A stream produced by an async generator function.
12///
13/// Generator are async functions or closures which contain the `yield`
14/// expressions.
15///
16/// # Examples
17///
18/// ```rune
19/// use std::stream::Stream;
20///
21/// let f = async |n| {
22///     yield n;
23///     yield n + 1;
24/// };
25///
26/// let g = f(10);
27///
28/// assert!(g is Stream);
29/// ```
30#[derive(Any)]
31#[rune(impl_params = [Vm], item = ::std::stream)]
32pub struct Stream<T = Vm>
33where
34    T: AsRef<Vm> + AsMut<Vm>,
35{
36    execution: Option<VmExecution<T>>,
37}
38
39impl<T> Stream<T>
40where
41    T: AsRef<Vm> + AsMut<Vm>,
42{
43    /// Construct a stream from a virtual machine.
44    pub(crate) fn new(vm: T) -> Self {
45        Self {
46            execution: Some(VmExecution::new(vm)),
47        }
48    }
49
50    /// Construct a generator from a complete execution.
51    pub(crate) fn from_execution(execution: VmExecution<T>) -> Self {
52        Self {
53            execution: Some(execution),
54        }
55    }
56
57    /// Get the next value produced by this stream.
58    pub async fn next(&mut self) -> VmResult<Option<Value>> {
59        let Some(execution) = self.execution.as_mut() else {
60            return VmResult::Ok(None);
61        };
62
63        let state = if execution.is_resumed() {
64            vm_try!(execution.async_resume_with(Value::empty()).await)
65        } else {
66            vm_try!(execution.async_resume().await)
67        };
68
69        VmResult::Ok(match state {
70            GeneratorState::Yielded(value) => Some(value),
71            GeneratorState::Complete(_) => {
72                self.execution = None;
73                None
74            }
75        })
76    }
77
78    /// Resume the generator and return the next generator state.
79    pub async fn resume(&mut self, value: Value) -> VmResult<GeneratorState> {
80        let execution = vm_try!(self
81            .execution
82            .as_mut()
83            .ok_or(VmErrorKind::GeneratorComplete));
84
85        let state = if execution.is_resumed() {
86            vm_try!(execution.async_resume_with(value).await)
87        } else {
88            vm_try!(execution.async_resume().await)
89        };
90
91        if state.is_complete() {
92            self.execution = None;
93        }
94
95        VmResult::Ok(state)
96    }
97}
98
99impl Stream {
100    /// Get the next value produced by this stream through an asynchronous
101    /// iterator-like protocol.
102    ///
103    /// This function will resume execution until a value is produced through
104    /// `GeneratorState::Yielded(value)`, at which point it will return
105    /// `Some(value)`. Once `GeneratorState::Complete` is returned `None` will
106    /// be returned.
107    ///
108    /// # Examples
109    ///
110    /// ```rune
111    /// use std::ops::GeneratorState;
112    ///
113    /// async fn generate() {
114    ///     yield 1;
115    ///     yield 2;
116    /// }
117    ///
118    /// let g = generate();
119    ///
120    /// assert_eq!(g.next().await, Some(1));
121    /// assert_eq!(g.next().await, Some(2));
122    /// assert_eq!(g.next().await, None);
123    /// ``
124    #[rune::function(keep, instance, path = Self::next)]
125    pub(crate) async fn next_shared(mut this: Mut<Stream>) -> VmResult<Option<Value>> {
126        this.next().await
127    }
128
129    /// Resumes the execution of this stream.
130    ///
131    /// This function will resume execution of the stream or start execution if
132    /// it hasn't already. This call will return back into the stream's last
133    /// suspension point, resuming execution from the latest `yield`. The stream
134    /// will continue executing until it either yields or returns, at which
135    /// point this function will return.
136    ///
137    /// # Return value
138    ///
139    /// The `GeneratorState` enum returned from this function indicates what
140    /// state the stream is in upon returning. If the `Yielded` variant is
141    /// returned then the stream has reached a suspension point and a value has
142    /// been yielded out. Streams in this state are available for resumption at
143    /// a later point.
144    ///
145    /// If `Complete` is returned then the stream has completely finished with
146    /// the value provided. It is invalid for the stream to be resumed again.
147    ///
148    /// # Panics
149    ///
150    /// This function may panic if it is called after the `Complete` variant has
151    /// been returned previously. While stream literals in the language are
152    /// guaranteed to panic on resuming after `Complete`, this is not guaranteed
153    /// for all implementations of the `Stream`.
154    ///
155    /// # Examples
156    ///
157    /// ```rune
158    /// use std::ops::GeneratorState;
159    ///
160    /// async fn generate() {
161    ///     let n = yield 1;
162    ///     yield 2 + n;
163    /// }
164    ///
165    /// let g = generate();
166    ///
167    /// assert_eq!(g.resume(()).await, GeneratorState::Yielded(1));
168    /// assert_eq!(g.resume(1).await, GeneratorState::Yielded(3));
169    /// assert_eq!(g.resume(()).await, GeneratorState::Complete(()));
170    /// ``
171    #[rune::function(keep, instance, path = Self::resume)]
172    pub(crate) async fn resume_shared(
173        mut this: Mut<Stream>,
174        value: Value,
175    ) -> VmResult<GeneratorState> {
176        this.resume(value).await
177    }
178
179    /// Debug print this stream
180    ///
181    /// # Examples
182    ///
183    /// ```rune
184    /// use std::ops::GeneratorState;
185    ///
186    /// fn generate() {
187    ///     let n = yield 1;
188    ///     yield 2 + n;
189    /// }
190    ///
191    /// let a = generate();
192    ///
193    /// println!("{a:?}");
194    /// ``
195    #[rune::function(keep, instance, protocol = DEBUG_FMT)]
196    fn debug(&self, f: &mut Formatter) -> VmResult<()> {
197        vm_write!(f, "{self:?}")
198    }
199
200    /// Clone a stream.
201    ///
202    /// This clones the state of the stream too, allowing it to be resumed
203    /// independently.
204    ///
205    /// # Examples
206    ///
207    /// ```rune
208    /// use std::ops::GeneratorState;
209    ///
210    /// async fn generate() {
211    ///     let n = yield 1;
212    ///     yield 2 + n;
213    /// }
214    ///
215    /// let a = generate();
216    ///
217    /// assert_eq!(a.resume(()).await, GeneratorState::Yielded(1));
218    /// let b = a.clone();
219    /// assert_eq!(a.resume(2).await, GeneratorState::Yielded(4));
220    /// assert_eq!(b.resume(3).await, GeneratorState::Yielded(5));
221    ///
222    /// assert_eq!(a.resume(()).await, GeneratorState::Complete(()));
223    /// assert_eq!(b.resume(()).await, GeneratorState::Complete(()));
224    /// ``
225    #[rune::function(keep, instance, protocol = CLONE)]
226    fn clone(&self) -> VmResult<Self> {
227        VmResult::Ok(vm_try!(self.try_clone()))
228    }
229}
230
231impl Stream<&mut Vm> {
232    /// Convert the current stream into one which owns its virtual machine.
233    pub fn into_owned(self) -> Stream<Vm> {
234        Stream {
235            execution: self.execution.map(|e| e.into_owned()),
236        }
237    }
238}
239
240impl<T> fmt::Debug for Stream<T>
241where
242    T: AsRef<Vm> + AsMut<Vm>,
243{
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        f.debug_struct("Stream")
246            .field("completed", &self.execution.is_none())
247            .finish()
248    }
249}
250
251impl<T> TryClone for Stream<T>
252where
253    T: TryClone + AsRef<Vm> + AsMut<Vm>,
254{
255    fn try_clone(&self) -> crate::alloc::Result<Self> {
256        Ok(Self {
257            execution: self.execution.try_clone()?,
258        })
259    }
260}