rune/runtime/
vm_execution.rs

1use core::fmt;
2use core::future::Future;
3use core::mem::{replace, take};
4
5use ::rust_alloc::sync::Arc;
6
7use crate::alloc::prelude::*;
8use crate::runtime::budget;
9use crate::runtime::{
10    Generator, GeneratorState, InstAddress, Output, RuntimeContext, Stream, Unit, Value, Vm,
11    VmErrorKind, VmHalt, VmHaltInfo, VmResult,
12};
13use crate::shared::AssertSend;
14
15use super::VmDiagnostics;
16
17/// The state of an execution. We keep track of this because it's important to
18/// correctly interact with functions that yield (like generators and streams)
19/// by initially just calling the function, then by providing a value pushed
20/// onto the stack.
21#[derive(Debug, Clone, Copy, PartialEq)]
22#[non_exhaustive]
23pub(crate) enum ExecutionState {
24    /// The initial state of an execution.
25    Initial,
26    /// execution is waiting.
27    Resumed(Output),
28    /// Suspended execution.
29    Suspended,
30    /// Execution exited.
31    Exited(Option<InstAddress>),
32}
33
34impl fmt::Display for ExecutionState {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        match self {
37            ExecutionState::Initial => write!(f, "initial"),
38            ExecutionState::Resumed(out) => write!(f, "resumed({out})"),
39            ExecutionState::Suspended => write!(f, "suspended"),
40            ExecutionState::Exited(..) => write!(f, "exited"),
41        }
42    }
43}
44
45#[derive(TryClone)]
46#[try_clone(crate)]
47pub(crate) struct VmExecutionState {
48    pub(crate) context: Option<Arc<RuntimeContext>>,
49    pub(crate) unit: Option<Arc<Unit>>,
50}
51
52/// The execution environment for a virtual machine.
53///
54/// When an execution is dropped, the stack of the stack of the head machine
55/// will be cleared.
56pub struct VmExecution<T = Vm>
57where
58    T: AsRef<Vm> + AsMut<Vm>,
59{
60    /// The current head vm which holds the execution.
61    head: T,
62    /// The state of an execution.
63    state: ExecutionState,
64    /// Indicates the current stack of suspended contexts.
65    states: Vec<VmExecutionState>,
66}
67
68impl<T> VmExecution<T>
69where
70    T: AsRef<Vm> + AsMut<Vm>,
71{
72    /// Construct an execution from a virtual machine.
73    pub(crate) fn new(head: T) -> Self {
74        Self {
75            head,
76            state: ExecutionState::Initial,
77            states: Vec::new(),
78        }
79    }
80
81    /// Test if the current execution state is resumed.
82    pub(crate) fn is_resumed(&self) -> bool {
83        matches!(self.state, ExecutionState::Resumed(..))
84    }
85
86    /// Coerce the current execution into a generator if appropriate.
87    ///
88    /// ```
89    /// use rune::Vm;
90    /// use std::sync::Arc;
91    ///
92    /// let mut sources = rune::sources! {
93    ///     entry => {
94    ///         pub fn main() {
95    ///             yield 1;
96    ///             yield 2;
97    ///         }
98    ///     }
99    /// };
100    ///
101    /// let unit = rune::prepare(&mut sources).build()?;
102    ///
103    /// let mut vm = Vm::without_runtime(Arc::new(unit));
104    /// let mut generator = vm.execute(["main"], ())?.into_generator();
105    ///
106    /// let mut n = 1i64;
107    ///
108    /// while let Some(value) = generator.next().into_result()? {
109    ///     let value: i64 = rune::from_value(value)?;
110    ///     assert_eq!(value, n);
111    ///     n += 1;
112    /// }
113    /// # Ok::<_, rune::support::Error>(())
114    /// ```
115    pub fn into_generator(self) -> Generator<T> {
116        Generator::from_execution(self)
117    }
118
119    /// Coerce the current execution into a stream if appropriate.
120    ///
121    /// ```
122    /// use rune::Vm;
123    /// use std::sync::Arc;
124    ///
125    /// # futures_executor::block_on(async move {
126    /// let mut sources = rune::sources! {
127    ///     entry => {
128    ///         pub async fn main() {
129    ///             yield 1;
130    ///             yield 2;
131    ///         }
132    ///     }
133    /// };
134    ///
135    /// let unit = rune::prepare(&mut sources).build()?;
136    ///
137    /// let mut vm = Vm::without_runtime(Arc::new(unit));
138    /// let mut stream = vm.execute(["main"], ())?.into_stream();
139    ///
140    /// let mut n = 1i64;
141    ///
142    /// while let Some(value) = stream.next().await.into_result()? {
143    ///     let value: i64 = rune::from_value(value)?;
144    ///     assert_eq!(value, n);
145    ///     n += 1;
146    /// }
147    /// # Ok::<_, rune::support::Error>(())
148    /// # })?;
149    /// # Ok::<_, rune::support::Error>(())
150    /// ```
151    pub fn into_stream(self) -> Stream<T> {
152        Stream::from_execution(self)
153    }
154
155    /// Get a reference to the current virtual machine.
156    pub fn vm(&self) -> &Vm {
157        self.head.as_ref()
158    }
159
160    /// Get a mutable reference the current virtual machine.
161    pub fn vm_mut(&mut self) -> &mut Vm {
162        self.head.as_mut()
163    }
164
165    /// Complete the current execution without support for async instructions.
166    ///
167    /// This will error if the execution is suspended through yielding.
168    pub async fn async_complete(&mut self) -> VmResult<Value> {
169        match vm_try!(self.async_resume().await) {
170            GeneratorState::Complete(value) => VmResult::Ok(value),
171            GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted {
172                halt: VmHaltInfo::Yielded,
173            }),
174        }
175    }
176
177    /// Complete the current execution without support for async instructions.
178    ///
179    /// If any async instructions are encountered, this will error. This will
180    /// also error if the execution is suspended through yielding.
181    pub fn complete(&mut self) -> VmResult<Value> {
182        self.complete_with_diagnostics(None)
183    }
184
185    /// Complete the current execution without support for async instructions.
186    ///
187    /// If any async instructions are encountered, this will error. This will
188    /// also error if the execution is suspended through yielding.
189    pub fn complete_with_diagnostics(
190        &mut self,
191        diagnostics: Option<&mut dyn VmDiagnostics>,
192    ) -> VmResult<Value> {
193        match vm_try!(self.resume_with_diagnostics(diagnostics)) {
194            GeneratorState::Complete(value) => VmResult::Ok(value),
195            GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted {
196                halt: VmHaltInfo::Yielded,
197            }),
198        }
199    }
200
201    /// Resume the current execution with the given value and resume
202    /// asynchronous execution.
203    pub async fn async_resume_with(&mut self, value: Value) -> VmResult<GeneratorState> {
204        let state = replace(&mut self.state, ExecutionState::Suspended);
205
206        let ExecutionState::Resumed(out) = state else {
207            return VmResult::err(VmErrorKind::ExpectedExecutionState { actual: state });
208        };
209
210        vm_try!(out.store(self.head.as_mut().stack_mut(), value));
211        self.inner_async_resume(None).await
212    }
213
214    /// Resume the current execution with support for async instructions.
215    ///
216    /// If the function being executed is a generator or stream this will resume
217    /// it while returning a unit from the current `yield`.
218    pub async fn async_resume(&mut self) -> VmResult<GeneratorState> {
219        self.async_resume_with_diagnostics(None).await
220    }
221
222    /// Resume the current execution with support for async instructions.
223    ///
224    /// If the function being executed is a generator or stream this will resume
225    /// it while returning a unit from the current `yield`.
226    pub async fn async_resume_with_diagnostics(
227        &mut self,
228        diagnostics: Option<&mut dyn VmDiagnostics>,
229    ) -> VmResult<GeneratorState> {
230        if let ExecutionState::Resumed(out) = self.state {
231            vm_try!(out.store(self.head.as_mut().stack_mut(), Value::unit));
232        }
233
234        self.inner_async_resume(diagnostics).await
235    }
236
237    async fn inner_async_resume(
238        &mut self,
239        mut diagnostics: Option<&mut dyn VmDiagnostics>,
240    ) -> VmResult<GeneratorState> {
241        loop {
242            let vm = self.head.as_mut();
243
244            match vm_try!(vm
245                .run(match diagnostics {
246                    Some(ref mut value) => Some(&mut **value),
247                    None => None,
248                })
249                .with_vm(vm))
250            {
251                VmHalt::Exited(addr) => {
252                    self.state = ExecutionState::Exited(addr);
253                }
254                VmHalt::Awaited(awaited) => {
255                    vm_try!(awaited.into_vm(vm).await);
256                    continue;
257                }
258                VmHalt::VmCall(vm_call) => {
259                    vm_try!(vm_call.into_execution(self));
260                    continue;
261                }
262                VmHalt::Yielded(addr, out) => {
263                    let value = match addr {
264                        Some(addr) => vm.stack().at(addr).clone(),
265                        None => Value::unit(),
266                    };
267
268                    self.state = ExecutionState::Resumed(out);
269                    return VmResult::Ok(GeneratorState::Yielded(value));
270                }
271                halt => {
272                    return VmResult::err(VmErrorKind::Halted {
273                        halt: halt.into_info(),
274                    })
275                }
276            }
277
278            if self.states.is_empty() {
279                let value = vm_try!(self.end());
280                return VmResult::Ok(GeneratorState::Complete(value));
281            }
282
283            vm_try!(self.pop_state());
284        }
285    }
286
287    /// Resume the current execution with the given value and resume synchronous
288    /// execution.
289    #[tracing::instrument(skip_all, fields(?value))]
290    pub fn resume_with(&mut self, value: Value) -> VmResult<GeneratorState> {
291        let state = replace(&mut self.state, ExecutionState::Suspended);
292
293        let ExecutionState::Resumed(out) = state else {
294            return VmResult::err(VmErrorKind::ExpectedExecutionState { actual: state });
295        };
296
297        vm_try!(out.store(self.head.as_mut().stack_mut(), value));
298        self.inner_resume(None)
299    }
300
301    /// Resume the current execution without support for async instructions.
302    ///
303    /// If the function being executed is a generator or stream this will resume
304    /// it while returning a unit from the current `yield`.
305    ///
306    /// If any async instructions are encountered, this will error.
307    pub fn resume(&mut self) -> VmResult<GeneratorState> {
308        self.resume_with_diagnostics(None)
309    }
310
311    /// Resume the current execution without support for async instructions.
312    ///
313    /// If the function being executed is a generator or stream this will resume
314    /// it while returning a unit from the current `yield`.
315    ///
316    /// If any async instructions are encountered, this will error.
317    #[tracing::instrument(skip_all, fields(diagnostics=diagnostics.is_some()))]
318    pub fn resume_with_diagnostics(
319        &mut self,
320        diagnostics: Option<&mut dyn VmDiagnostics>,
321    ) -> VmResult<GeneratorState> {
322        if let ExecutionState::Resumed(out) = replace(&mut self.state, ExecutionState::Suspended) {
323            vm_try!(out.store(self.head.as_mut().stack_mut(), Value::unit()));
324        }
325
326        self.inner_resume(diagnostics)
327    }
328
329    fn inner_resume(
330        &mut self,
331        mut diagnostics: Option<&mut dyn VmDiagnostics>,
332    ) -> VmResult<GeneratorState> {
333        loop {
334            let len = self.states.len();
335            let vm = self.head.as_mut();
336
337            match vm_try!(vm
338                .run(match diagnostics {
339                    Some(ref mut value) => Some(&mut **value),
340                    None => None,
341                })
342                .with_vm(vm))
343            {
344                VmHalt::Exited(addr) => {
345                    self.state = ExecutionState::Exited(addr);
346                }
347                VmHalt::VmCall(vm_call) => {
348                    vm_try!(vm_call.into_execution(self));
349                    continue;
350                }
351                VmHalt::Yielded(addr, out) => {
352                    let value = match addr {
353                        Some(addr) => vm.stack().at(addr).clone(),
354                        None => Value::unit(),
355                    };
356
357                    self.state = ExecutionState::Resumed(out);
358                    return VmResult::Ok(GeneratorState::Yielded(value));
359                }
360                halt => {
361                    return VmResult::err(VmErrorKind::Halted {
362                        halt: halt.into_info(),
363                    });
364                }
365            }
366
367            if len == 0 {
368                let value = vm_try!(self.end());
369                return VmResult::Ok(GeneratorState::Complete(value));
370            }
371
372            vm_try!(self.pop_state());
373        }
374    }
375
376    /// Step the single execution for one step without support for async
377    /// instructions.
378    ///
379    /// If any async instructions are encountered, this will error.
380    pub fn step(&mut self) -> VmResult<Option<Value>> {
381        let len = self.states.len();
382        let vm = self.head.as_mut();
383
384        match vm_try!(budget::with(1, || vm.run(None).with_vm(vm)).call()) {
385            VmHalt::Exited(addr) => {
386                self.state = ExecutionState::Exited(addr);
387            }
388            VmHalt::VmCall(vm_call) => {
389                vm_try!(vm_call.into_execution(self));
390                return VmResult::Ok(None);
391            }
392            VmHalt::Limited => return VmResult::Ok(None),
393            halt => {
394                return VmResult::err(VmErrorKind::Halted {
395                    halt: halt.into_info(),
396                })
397            }
398        }
399
400        if len == 0 {
401            let value = vm_try!(self.end());
402            return VmResult::Ok(Some(value));
403        }
404
405        vm_try!(self.pop_state());
406        VmResult::Ok(None)
407    }
408
409    /// Step the single execution for one step with support for async
410    /// instructions.
411    pub async fn async_step(&mut self) -> VmResult<Option<Value>> {
412        let vm = self.head.as_mut();
413
414        match vm_try!(budget::with(1, || vm.run(None).with_vm(vm)).call()) {
415            VmHalt::Exited(addr) => {
416                self.state = ExecutionState::Exited(addr);
417            }
418            VmHalt::Awaited(awaited) => {
419                vm_try!(awaited.into_vm(vm).await);
420                return VmResult::Ok(None);
421            }
422            VmHalt::VmCall(vm_call) => {
423                vm_try!(vm_call.into_execution(self));
424                return VmResult::Ok(None);
425            }
426            VmHalt::Limited => return VmResult::Ok(None),
427            halt => {
428                return VmResult::err(VmErrorKind::Halted {
429                    halt: halt.into_info(),
430                });
431            }
432        }
433
434        if self.states.is_empty() {
435            let value = vm_try!(self.end());
436            return VmResult::Ok(Some(value));
437        }
438
439        vm_try!(self.pop_state());
440        VmResult::Ok(None)
441    }
442
443    /// End execution and perform debug checks.
444    pub(crate) fn end(&mut self) -> VmResult<Value> {
445        let ExecutionState::Exited(addr) = self.state else {
446            return VmResult::err(VmErrorKind::ExpectedExitedExecutionState { actual: self.state });
447        };
448
449        let value = match addr {
450            Some(addr) => self.head.as_ref().stack().at(addr).clone(),
451            None => Value::unit(),
452        };
453
454        debug_assert!(self.states.is_empty(), "Execution states should be empty");
455        VmResult::Ok(value)
456    }
457
458    /// Push a virtual machine state onto the execution.
459    #[tracing::instrument(skip_all)]
460    pub(crate) fn push_state(&mut self, state: VmExecutionState) -> VmResult<()> {
461        tracing::trace!("pushing suspended state");
462        let vm = self.head.as_mut();
463        let context = state.context.map(|c| replace(vm.context_mut(), c));
464        let unit = state.unit.map(|u| replace(vm.unit_mut(), u));
465        vm_try!(self.states.try_push(VmExecutionState { context, unit }));
466        VmResult::Ok(())
467    }
468
469    /// Pop a virtual machine state from the execution and transfer the top of
470    /// the stack from the popped machine.
471    #[tracing::instrument(skip_all)]
472    fn pop_state(&mut self) -> VmResult<()> {
473        tracing::trace!("popping suspended state");
474
475        let state = vm_try!(self.states.pop().ok_or(VmErrorKind::NoRunningVm));
476        let vm = self.head.as_mut();
477
478        if let Some(context) = state.context {
479            *vm.context_mut() = context;
480        }
481
482        if let Some(unit) = state.unit {
483            *vm.unit_mut() = unit;
484        }
485
486        VmResult::Ok(())
487    }
488}
489
490impl VmExecution<&mut Vm> {
491    /// Convert the current execution into one which owns its virtual machine.
492    pub fn into_owned(self) -> VmExecution<Vm> {
493        let stack = take(self.head.stack_mut());
494        let head = Vm::with_stack(self.head.context().clone(), self.head.unit().clone(), stack);
495
496        VmExecution {
497            head,
498            states: self.states,
499            state: self.state,
500        }
501    }
502}
503
504/// A wrapper that makes [`VmExecution`] [`Send`].
505///
506/// This is accomplished by preventing any [`Value`] from escaping the [`Vm`].
507/// As long as this is maintained, it is safe to send the execution across,
508/// threads, and therefore schedule the future associated with the execution on
509/// a thread pool like Tokio's through [tokio::spawn].
510///
511/// [tokio::spawn]: https://docs.rs/tokio/0/tokio/runtime/struct.Runtime.html#method.spawn
512pub struct VmSendExecution(pub(crate) VmExecution<Vm>);
513
514// Safety: we wrap all APIs around the [VmExecution], preventing values from
515// escaping from contained virtual machine.
516unsafe impl Send for VmSendExecution {}
517
518impl VmSendExecution {
519    /// Complete the current execution with support for async instructions.
520    ///
521    /// This requires that the result of the Vm is converted into a
522    /// [crate::FromValue] that also implements [Send],  which prevents non-Send
523    /// values from escaping from the virtual machine.
524    pub fn async_complete(mut self) -> impl Future<Output = VmResult<Value>> + Send + 'static {
525        let future = async move {
526            let result = vm_try!(self.0.async_resume().await);
527
528            match result {
529                GeneratorState::Complete(value) => VmResult::Ok(value),
530                GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted {
531                    halt: VmHaltInfo::Yielded,
532                }),
533            }
534        };
535
536        // Safety: we wrap all APIs around the [VmExecution], preventing values
537        // from escaping from contained virtual machine.
538        unsafe { AssertSend::new(future) }
539    }
540
541    /// Complete the current execution with support for async instructions.
542    ///
543    /// This requires that the result of the Vm is converted into a
544    /// [crate::FromValue] that also implements [Send],  which prevents non-Send
545    /// values from escaping from the virtual machine.
546    pub fn async_complete_with_diagnostics(
547        mut self,
548        diagnostics: Option<&mut dyn VmDiagnostics>,
549    ) -> impl Future<Output = VmResult<Value>> + Send + '_ {
550        let future = async move {
551            let result = vm_try!(self.0.async_resume_with_diagnostics(diagnostics).await);
552
553            match result {
554                GeneratorState::Complete(value) => VmResult::Ok(value),
555                GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted {
556                    halt: VmHaltInfo::Yielded,
557                }),
558            }
559        };
560
561        // Safety: we wrap all APIs around the [VmExecution], preventing values
562        // from escaping from contained virtual machine.
563        unsafe { AssertSend::new(future) }
564    }
565}
566
567impl<T> TryClone for VmExecution<T>
568where
569    T: AsRef<Vm> + AsMut<Vm> + TryClone,
570{
571    #[inline]
572    fn try_clone(&self) -> Result<Self, rune_alloc::Error> {
573        Ok(Self {
574            head: self.head.try_clone()?,
575            state: self.state,
576            states: self.states.try_clone()?,
577        })
578    }
579}