rune/runtime/
stream.rs

1use core::fmt;
2
3use crate as rune;
4use crate::alloc;
5use crate::alloc::clone::TryClone;
6use crate::alloc::fmt::TryWrite;
7use crate::runtime::{
8    Formatter, GeneratorState, Mut, Value, Vm, VmError, VmErrorKind, VmExecution, VmHaltInfo,
9    VmOutcome,
10};
11use crate::Any;
12
13/// A stream produced by an async generator function.
14///
15/// Generator are async functions or closures which contain the `yield`
16/// expressions.
17///
18/// # Examples
19///
20/// ```rune
21/// use std::stream::Stream;
22///
23/// let f = async |n| {
24///     yield n;
25///     yield n + 1;
26/// };
27///
28/// let g = f(10);
29///
30/// assert!(g is Stream);
31/// ```
32#[derive(Any)]
33#[rune(item = ::std::stream)]
34pub struct Stream {
35    execution: Option<VmExecution<Vm>>,
36}
37
38impl Stream {
39    /// Construct a stream from a virtual machine.
40    pub(crate) fn new(vm: Vm) -> Self {
41        Self {
42            execution: Some(VmExecution::new(vm)),
43        }
44    }
45
46    /// Get the next value produced by this stream.
47    pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
48        let Some(execution) = self.execution.as_mut() else {
49            return Ok(None);
50        };
51
52        match execution.resume().await? {
53            VmOutcome::Complete(..) => {
54                self.execution = None;
55                Ok(None)
56            }
57            VmOutcome::Yielded(value) => Ok(Some(value)),
58            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
59                halt: VmHaltInfo::Limited,
60            })),
61        }
62    }
63
64    /// Resume the stream with a value and return the next [`GeneratorState`].
65    pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
66        let execution = self
67            .execution
68            .as_mut()
69            .ok_or(VmError::new(VmErrorKind::GeneratorComplete))?;
70
71        let outcome = execution.resume().with_value(value).await?;
72
73        match outcome {
74            VmOutcome::Complete(value) => {
75                self.execution = None;
76                Ok(GeneratorState::Complete(value))
77            }
78            VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
79            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
80                halt: VmHaltInfo::Limited,
81            })),
82        }
83    }
84}
85
86impl Stream {
87    /// Get the next value produced by this stream through an asynchronous
88    /// iterator-like protocol.
89    ///
90    /// This function will resume execution until a value is produced through
91    /// `GeneratorState::Yielded(value)`, at which point it will return
92    /// `Some(value)`. Once `GeneratorState::Complete` is returned `None` will
93    /// be returned.
94    ///
95    /// # Examples
96    ///
97    /// ```rune
98    /// use std::ops::GeneratorState;
99    ///
100    /// async fn generate() {
101    ///     yield 1;
102    ///     yield 2;
103    /// }
104    ///
105    /// let g = generate();
106    ///
107    /// assert_eq!(g.next().await, Some(1));
108    /// assert_eq!(g.next().await, Some(2));
109    /// assert_eq!(g.next().await, None);
110    /// ``
111    #[rune::function(keep, instance, path = Self::next)]
112    pub(crate) async fn next_shared(mut this: Mut<Stream>) -> Result<Option<Value>, VmError> {
113        this.next().await
114    }
115
116    /// Resumes the execution of this stream.
117    ///
118    /// This function will resume execution of the stream or start execution if
119    /// it hasn't already. This call will return back into the stream's last
120    /// suspension point, resuming execution from the latest `yield`. The stream
121    /// will continue executing until it either yields or returns, at which
122    /// point this function will return.
123    ///
124    /// # Return value
125    ///
126    /// The `GeneratorState` enum returned from this function indicates what
127    /// state the stream is in upon returning. If the `Yielded` variant is
128    /// returned then the stream has reached a suspension point and a value has
129    /// been yielded out. Streams in this state are available for resumption at
130    /// a later point.
131    ///
132    /// If `Complete` is returned then the stream has completely finished with
133    /// the value provided. It is invalid for the stream to be resumed again.
134    ///
135    /// # Panics
136    ///
137    /// This function may panic if it is called after the `Complete` variant has
138    /// been returned previously. While stream literals in the language are
139    /// guaranteed to panic on resuming after `Complete`, this is not guaranteed
140    /// for all implementations of the `Stream`.
141    ///
142    /// # Examples
143    ///
144    /// ```rune
145    /// use std::ops::GeneratorState;
146    ///
147    /// async fn generate() {
148    ///     let n = yield 1;
149    ///     yield 2 + n;
150    /// }
151    ///
152    /// let g = generate();
153    ///
154    /// assert_eq!(g.resume(()).await, GeneratorState::Yielded(1));
155    /// assert_eq!(g.resume(1).await, GeneratorState::Yielded(3));
156    /// assert_eq!(g.resume(()).await, GeneratorState::Complete(()));
157    /// ``
158    #[rune::function(keep, instance, path = Self::resume)]
159    pub(crate) async fn resume_shared(
160        mut this: Mut<Stream>,
161        value: Value,
162    ) -> Result<GeneratorState, VmError> {
163        this.resume(value).await
164    }
165
166    /// Debug print this stream
167    ///
168    /// # Examples
169    ///
170    /// ```rune
171    /// use std::ops::GeneratorState;
172    ///
173    /// fn generate() {
174    ///     let n = yield 1;
175    ///     yield 2 + n;
176    /// }
177    ///
178    /// let a = generate();
179    ///
180    /// println!("{a:?}");
181    /// ``
182    #[rune::function(keep, instance, protocol = DEBUG_FMT)]
183    fn debug(&self, f: &mut Formatter) -> alloc::Result<()> {
184        write!(f, "{self:?}")
185    }
186
187    /// Clone a stream.
188    ///
189    /// This clones the state of the stream too, allowing it to be resumed
190    /// independently.
191    ///
192    /// # Examples
193    ///
194    /// ```rune
195    /// use std::ops::GeneratorState;
196    ///
197    /// async fn generate() {
198    ///     let n = yield 1;
199    ///     yield 2 + n;
200    /// }
201    ///
202    /// let a = generate();
203    ///
204    /// assert_eq!(a.resume(()).await, GeneratorState::Yielded(1));
205    /// let b = a.clone();
206    /// assert_eq!(a.resume(2).await, GeneratorState::Yielded(4));
207    /// assert_eq!(b.resume(3).await, GeneratorState::Yielded(5));
208    ///
209    /// assert_eq!(a.resume(()).await, GeneratorState::Complete(()));
210    /// assert_eq!(b.resume(()).await, GeneratorState::Complete(()));
211    /// ``
212    #[rune::function(keep, instance, protocol = CLONE)]
213    fn clone(&self) -> alloc::Result<Self> {
214        self.try_clone()
215    }
216}
217
218impl fmt::Debug for Stream {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        f.debug_struct("Stream")
221            .field("completed", &self.execution.is_none())
222            .finish()
223    }
224}
225
226impl TryClone for Stream {
227    fn try_clone(&self) -> crate::alloc::Result<Self> {
228        Ok(Self {
229            execution: self.execution.try_clone()?,
230        })
231    }
232}