rune/runtime/stream.rs
1use core::fmt;
2
3use crate as rune;
4use crate::alloc::clone::TryClone;
5use crate::alloc::fmt::TryWrite;
6use crate::runtime::{
7 Formatter, GeneratorState, Mut, Value, Vm, VmErrorKind, VmExecution, VmResult,
8};
9use crate::Any;
10
11/// A stream produced by an async generator function.
12///
13/// Generator are async functions or closures which contain the `yield`
14/// expressions.
15///
16/// # Examples
17///
18/// ```rune
19/// use std::stream::Stream;
20///
21/// let f = async |n| {
22/// yield n;
23/// yield n + 1;
24/// };
25///
26/// let g = f(10);
27///
28/// assert!(g is Stream);
29/// ```
30#[derive(Any)]
31#[rune(impl_params = [Vm], item = ::std::stream)]
32pub struct Stream<T = Vm>
33where
34 T: AsRef<Vm> + AsMut<Vm>,
35{
36 execution: Option<VmExecution<T>>,
37}
38
39impl<T> Stream<T>
40where
41 T: AsRef<Vm> + AsMut<Vm>,
42{
43 /// Construct a stream from a virtual machine.
44 pub(crate) fn new(vm: T) -> Self {
45 Self {
46 execution: Some(VmExecution::new(vm)),
47 }
48 }
49
50 /// Construct a generator from a complete execution.
51 pub(crate) fn from_execution(execution: VmExecution<T>) -> Self {
52 Self {
53 execution: Some(execution),
54 }
55 }
56
57 /// Get the next value produced by this stream.
58 pub async fn next(&mut self) -> VmResult<Option<Value>> {
59 let Some(execution) = self.execution.as_mut() else {
60 return VmResult::Ok(None);
61 };
62
63 let state = if execution.is_resumed() {
64 vm_try!(execution.async_resume_with(Value::empty()).await)
65 } else {
66 vm_try!(execution.async_resume().await)
67 };
68
69 VmResult::Ok(match state {
70 GeneratorState::Yielded(value) => Some(value),
71 GeneratorState::Complete(_) => {
72 self.execution = None;
73 None
74 }
75 })
76 }
77
78 /// Resume the generator and return the next generator state.
79 pub async fn resume(&mut self, value: Value) -> VmResult<GeneratorState> {
80 let execution = vm_try!(self
81 .execution
82 .as_mut()
83 .ok_or(VmErrorKind::GeneratorComplete));
84
85 let state = if execution.is_resumed() {
86 vm_try!(execution.async_resume_with(value).await)
87 } else {
88 vm_try!(execution.async_resume().await)
89 };
90
91 if state.is_complete() {
92 self.execution = None;
93 }
94
95 VmResult::Ok(state)
96 }
97}
98
99impl Stream {
100 /// Get the next value produced by this stream through an asynchronous
101 /// iterator-like protocol.
102 ///
103 /// This function will resume execution until a value is produced through
104 /// `GeneratorState::Yielded(value)`, at which point it will return
105 /// `Some(value)`. Once `GeneratorState::Complete` is returned `None` will
106 /// be returned.
107 ///
108 /// # Examples
109 ///
110 /// ```rune
111 /// use std::ops::GeneratorState;
112 ///
113 /// async fn generate() {
114 /// yield 1;
115 /// yield 2;
116 /// }
117 ///
118 /// let g = generate();
119 ///
120 /// assert_eq!(g.next().await, Some(1));
121 /// assert_eq!(g.next().await, Some(2));
122 /// assert_eq!(g.next().await, None);
123 /// ``
124 #[rune::function(keep, instance, path = Self::next)]
125 pub(crate) async fn next_shared(mut this: Mut<Stream>) -> VmResult<Option<Value>> {
126 this.next().await
127 }
128
129 /// Resumes the execution of this stream.
130 ///
131 /// This function will resume execution of the stream or start execution if
132 /// it hasn't already. This call will return back into the stream's last
133 /// suspension point, resuming execution from the latest `yield`. The stream
134 /// will continue executing until it either yields or returns, at which
135 /// point this function will return.
136 ///
137 /// # Return value
138 ///
139 /// The `GeneratorState` enum returned from this function indicates what
140 /// state the stream is in upon returning. If the `Yielded` variant is
141 /// returned then the stream has reached a suspension point and a value has
142 /// been yielded out. Streams in this state are available for resumption at
143 /// a later point.
144 ///
145 /// If `Complete` is returned then the stream has completely finished with
146 /// the value provided. It is invalid for the stream to be resumed again.
147 ///
148 /// # Panics
149 ///
150 /// This function may panic if it is called after the `Complete` variant has
151 /// been returned previously. While stream literals in the language are
152 /// guaranteed to panic on resuming after `Complete`, this is not guaranteed
153 /// for all implementations of the `Stream`.
154 ///
155 /// # Examples
156 ///
157 /// ```rune
158 /// use std::ops::GeneratorState;
159 ///
160 /// async fn generate() {
161 /// let n = yield 1;
162 /// yield 2 + n;
163 /// }
164 ///
165 /// let g = generate();
166 ///
167 /// assert_eq!(g.resume(()).await, GeneratorState::Yielded(1));
168 /// assert_eq!(g.resume(1).await, GeneratorState::Yielded(3));
169 /// assert_eq!(g.resume(()).await, GeneratorState::Complete(()));
170 /// ``
171 #[rune::function(keep, instance, path = Self::resume)]
172 pub(crate) async fn resume_shared(
173 mut this: Mut<Stream>,
174 value: Value,
175 ) -> VmResult<GeneratorState> {
176 this.resume(value).await
177 }
178
179 /// Debug print this stream
180 ///
181 /// # Examples
182 ///
183 /// ```rune
184 /// use std::ops::GeneratorState;
185 ///
186 /// fn generate() {
187 /// let n = yield 1;
188 /// yield 2 + n;
189 /// }
190 ///
191 /// let a = generate();
192 ///
193 /// println!("{a:?}");
194 /// ``
195 #[rune::function(keep, instance, protocol = DEBUG_FMT)]
196 fn debug(&self, f: &mut Formatter) -> VmResult<()> {
197 vm_write!(f, "{self:?}")
198 }
199
200 /// Clone a stream.
201 ///
202 /// This clones the state of the stream too, allowing it to be resumed
203 /// independently.
204 ///
205 /// # Examples
206 ///
207 /// ```rune
208 /// use std::ops::GeneratorState;
209 ///
210 /// async fn generate() {
211 /// let n = yield 1;
212 /// yield 2 + n;
213 /// }
214 ///
215 /// let a = generate();
216 ///
217 /// assert_eq!(a.resume(()).await, GeneratorState::Yielded(1));
218 /// let b = a.clone();
219 /// assert_eq!(a.resume(2).await, GeneratorState::Yielded(4));
220 /// assert_eq!(b.resume(3).await, GeneratorState::Yielded(5));
221 ///
222 /// assert_eq!(a.resume(()).await, GeneratorState::Complete(()));
223 /// assert_eq!(b.resume(()).await, GeneratorState::Complete(()));
224 /// ``
225 #[rune::function(keep, instance, protocol = CLONE)]
226 fn clone(&self) -> VmResult<Self> {
227 VmResult::Ok(vm_try!(self.try_clone()))
228 }
229}
230
231impl Stream<&mut Vm> {
232 /// Convert the current stream into one which owns its virtual machine.
233 pub fn into_owned(self) -> Stream<Vm> {
234 Stream {
235 execution: self.execution.map(|e| e.into_owned()),
236 }
237 }
238}
239
240impl<T> fmt::Debug for Stream<T>
241where
242 T: AsRef<Vm> + AsMut<Vm>,
243{
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 f.debug_struct("Stream")
246 .field("completed", &self.execution.is_none())
247 .finish()
248 }
249}
250
251impl<T> TryClone for Stream<T>
252where
253 T: TryClone + AsRef<Vm> + AsMut<Vm>,
254{
255 fn try_clone(&self) -> crate::alloc::Result<Self> {
256 Ok(Self {
257 execution: self.execution.try_clone()?,
258 })
259 }
260}