rune/runtime/stream.rs
1use core::fmt;
2
3use crate as rune;
4use crate::alloc;
5use crate::alloc::clone::TryClone;
6use crate::alloc::fmt::TryWrite;
7use crate::runtime::{
8 Formatter, GeneratorState, Mut, Value, Vm, VmError, VmErrorKind, VmExecution, VmHaltInfo,
9 VmOutcome,
10};
11use crate::Any;
12
13/// A stream produced by an async generator function.
14///
15/// Generator are async functions or closures which contain the `yield`
16/// expressions.
17///
18/// # Examples
19///
20/// ```rune
21/// use std::stream::Stream;
22///
23/// let f = async |n| {
24/// yield n;
25/// yield n + 1;
26/// };
27///
28/// let g = f(10);
29///
30/// assert!(g is Stream);
31/// ```
32#[derive(Any)]
33#[rune(item = ::std::stream)]
34pub struct Stream {
35 execution: Option<VmExecution<Vm>>,
36}
37
38impl Stream {
39 /// Construct a stream from a virtual machine.
40 pub(crate) fn new(vm: Vm) -> Self {
41 Self {
42 execution: Some(VmExecution::new(vm)),
43 }
44 }
45
46 /// Get the next value produced by this stream.
47 pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
48 let Some(execution) = self.execution.as_mut() else {
49 return Ok(None);
50 };
51
52 match execution.resume().await? {
53 VmOutcome::Complete(..) => {
54 self.execution = None;
55 Ok(None)
56 }
57 VmOutcome::Yielded(value) => Ok(Some(value)),
58 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
59 halt: VmHaltInfo::Limited,
60 })),
61 }
62 }
63
64 /// Resume the stream with a value and return the next [`GeneratorState`].
65 pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
66 let execution = self
67 .execution
68 .as_mut()
69 .ok_or(VmError::new(VmErrorKind::GeneratorComplete))?;
70
71 let outcome = execution.resume().with_value(value).await?;
72
73 match outcome {
74 VmOutcome::Complete(value) => {
75 self.execution = None;
76 Ok(GeneratorState::Complete(value))
77 }
78 VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
79 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
80 halt: VmHaltInfo::Limited,
81 })),
82 }
83 }
84}
85
86impl Stream {
87 /// Get the next value produced by this stream through an asynchronous
88 /// iterator-like protocol.
89 ///
90 /// This function will resume execution until a value is produced through
91 /// `GeneratorState::Yielded(value)`, at which point it will return
92 /// `Some(value)`. Once `GeneratorState::Complete` is returned `None` will
93 /// be returned.
94 ///
95 /// # Examples
96 ///
97 /// ```rune
98 /// use std::ops::GeneratorState;
99 ///
100 /// async fn generate() {
101 /// yield 1;
102 /// yield 2;
103 /// }
104 ///
105 /// let g = generate();
106 ///
107 /// assert_eq!(g.next().await, Some(1));
108 /// assert_eq!(g.next().await, Some(2));
109 /// assert_eq!(g.next().await, None);
110 /// ``
111 #[rune::function(keep, instance, path = Self::next)]
112 pub(crate) async fn next_shared(mut this: Mut<Stream>) -> Result<Option<Value>, VmError> {
113 this.next().await
114 }
115
116 /// Resumes the execution of this stream.
117 ///
118 /// This function will resume execution of the stream or start execution if
119 /// it hasn't already. This call will return back into the stream's last
120 /// suspension point, resuming execution from the latest `yield`. The stream
121 /// will continue executing until it either yields or returns, at which
122 /// point this function will return.
123 ///
124 /// # Return value
125 ///
126 /// The `GeneratorState` enum returned from this function indicates what
127 /// state the stream is in upon returning. If the `Yielded` variant is
128 /// returned then the stream has reached a suspension point and a value has
129 /// been yielded out. Streams in this state are available for resumption at
130 /// a later point.
131 ///
132 /// If `Complete` is returned then the stream has completely finished with
133 /// the value provided. It is invalid for the stream to be resumed again.
134 ///
135 /// # Panics
136 ///
137 /// This function may panic if it is called after the `Complete` variant has
138 /// been returned previously. While stream literals in the language are
139 /// guaranteed to panic on resuming after `Complete`, this is not guaranteed
140 /// for all implementations of the `Stream`.
141 ///
142 /// # Examples
143 ///
144 /// ```rune
145 /// use std::ops::GeneratorState;
146 ///
147 /// async fn generate() {
148 /// let n = yield 1;
149 /// yield 2 + n;
150 /// }
151 ///
152 /// let g = generate();
153 ///
154 /// assert_eq!(g.resume(()).await, GeneratorState::Yielded(1));
155 /// assert_eq!(g.resume(1).await, GeneratorState::Yielded(3));
156 /// assert_eq!(g.resume(()).await, GeneratorState::Complete(()));
157 /// ``
158 #[rune::function(keep, instance, path = Self::resume)]
159 pub(crate) async fn resume_shared(
160 mut this: Mut<Stream>,
161 value: Value,
162 ) -> Result<GeneratorState, VmError> {
163 this.resume(value).await
164 }
165
166 /// Debug print this stream
167 ///
168 /// # Examples
169 ///
170 /// ```rune
171 /// use std::ops::GeneratorState;
172 ///
173 /// fn generate() {
174 /// let n = yield 1;
175 /// yield 2 + n;
176 /// }
177 ///
178 /// let a = generate();
179 ///
180 /// println!("{a:?}");
181 /// ``
182 #[rune::function(keep, instance, protocol = DEBUG_FMT)]
183 fn debug(&self, f: &mut Formatter) -> alloc::Result<()> {
184 write!(f, "{self:?}")
185 }
186
187 /// Clone a stream.
188 ///
189 /// This clones the state of the stream too, allowing it to be resumed
190 /// independently.
191 ///
192 /// # Examples
193 ///
194 /// ```rune
195 /// use std::ops::GeneratorState;
196 ///
197 /// async fn generate() {
198 /// let n = yield 1;
199 /// yield 2 + n;
200 /// }
201 ///
202 /// let a = generate();
203 ///
204 /// assert_eq!(a.resume(()).await, GeneratorState::Yielded(1));
205 /// let b = a.clone();
206 /// assert_eq!(a.resume(2).await, GeneratorState::Yielded(4));
207 /// assert_eq!(b.resume(3).await, GeneratorState::Yielded(5));
208 ///
209 /// assert_eq!(a.resume(()).await, GeneratorState::Complete(()));
210 /// assert_eq!(b.resume(()).await, GeneratorState::Complete(()));
211 /// ``
212 #[rune::function(keep, instance, protocol = CLONE)]
213 fn clone(&self) -> alloc::Result<Self> {
214 self.try_clone()
215 }
216}
217
218impl fmt::Debug for Stream {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 f.debug_struct("Stream")
221 .field("completed", &self.execution.is_none())
222 .finish()
223 }
224}
225
226impl TryClone for Stream {
227 fn try_clone(&self) -> crate::alloc::Result<Self> {
228 Ok(Self {
229 execution: self.execution.try_clone()?,
230 })
231 }
232}