rune/runtime/
vm_execution.rs

1use core::fmt;
2use core::future::Future;
3use core::mem::{replace, take};
4use core::pin::{pin, Pin};
5use core::task::{ready, Context, Poll, RawWaker, RawWakerVTable, Waker};
6
7use crate::alloc::prelude::*;
8use crate::async_vm_try;
9use crate::runtime::budget::Budget;
10use crate::runtime::{budget, Awaited};
11use crate::shared::AssertSend;
12use crate::sync::Arc;
13
14use super::{
15    Address, GeneratorState, Output, RuntimeContext, Unit, Value, Vm, VmDiagnostics, VmError,
16    VmErrorKind, VmHalt, VmHaltInfo,
17};
18
19static COMPLETE_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
20    |_| RawWaker::new(&(), &COMPLETE_WAKER_VTABLE),
21    |_| {},
22    |_| {},
23    |_| {},
24);
25
26// SAFETY: This waker does nothing.
27static COMPLETE_WAKER: Waker =
28    unsafe { Waker::from_raw(RawWaker::new(&(), &COMPLETE_WAKER_VTABLE)) };
29
30/// The state of an execution. We keep track of this because it's important to
31/// correctly interact with functions that yield (like generators and streams)
32/// by initially just calling the function, then by providing a value pushed
33/// onto the stack.
34#[derive(Debug, Clone, Copy, PartialEq)]
35#[non_exhaustive]
36pub(crate) enum ExecutionState {
37    /// The initial state of an execution.
38    Initial,
39    /// execution is waiting.
40    Resumed(Output),
41    /// Suspended execution.
42    Suspended,
43    /// Execution exited.
44    Exited(Option<Address>),
45}
46
47impl fmt::Display for ExecutionState {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            ExecutionState::Initial => write!(f, "initial"),
51            ExecutionState::Resumed(out) => write!(f, "resumed({out})"),
52            ExecutionState::Suspended => write!(f, "suspended"),
53            ExecutionState::Exited(..) => write!(f, "exited"),
54        }
55    }
56}
57
58#[derive(TryClone)]
59#[try_clone(crate)]
60pub(crate) struct VmExecutionState {
61    pub(crate) context: Option<Arc<RuntimeContext>>,
62    pub(crate) unit: Option<Arc<Unit>>,
63}
64
65/// The execution environment for a virtual machine.
66///
67/// When an execution is dropped, the stack of the stack of the head machine
68/// will be cleared.
69pub struct VmExecution<T> {
70    /// The current head vm which holds the execution.
71    vm: T,
72    /// The state of an execution.
73    state: ExecutionState,
74    /// Indicates the current stack of suspended contexts.
75    states: Vec<VmExecutionState>,
76}
77
78impl<T> VmExecution<T> {
79    /// Construct an execution from a virtual machine.
80    #[inline]
81    pub(crate) fn new(vm: T) -> Self {
82        Self {
83            vm,
84            state: ExecutionState::Initial,
85            states: Vec::new(),
86        }
87    }
88
89    /// Get a reference to the current virtual machine.
90    #[inline]
91    pub fn vm(&self) -> &Vm
92    where
93        T: AsRef<Vm>,
94    {
95        self.vm.as_ref()
96    }
97
98    /// Get a mutable reference the current virtual machine.
99    #[inline]
100    pub fn vm_mut(&mut self) -> &mut Vm
101    where
102        T: AsMut<Vm>,
103    {
104        self.vm.as_mut()
105    }
106}
107
108impl<T> VmExecution<T>
109where
110    T: AsMut<Vm>,
111{
112    /// Coerce the current execution into a generator if appropriate.
113    ///
114    /// ```
115    /// use rune::Vm;
116    /// use rune::sync::Arc;
117    ///
118    /// let mut sources = rune::sources! {
119    ///     entry => {
120    ///         pub fn main() {
121    ///             yield 1;
122    ///             yield 2;
123    ///         }
124    ///     }
125    /// };
126    ///
127    /// let unit = rune::prepare(&mut sources).build()?;
128    /// let unit = Arc::try_new(unit)?;
129    /// let mut vm = Vm::without_runtime(unit)?;
130    ///
131    /// let mut generator = vm.execute(["main"], ())?.into_generator();
132    ///
133    /// let mut n = 1i64;
134    ///
135    /// while let Some(value) = generator.next()? {
136    ///     let value: i64 = rune::from_value(value)?;
137    ///     assert_eq!(value, n);
138    ///     n += 1;
139    /// }
140    /// # Ok::<_, rune::support::Error>(())
141    /// ```
142    pub fn into_generator(self) -> VmGenerator<T> {
143        VmGenerator {
144            execution: Some(self),
145        }
146    }
147
148    /// Coerce the current execution into a stream if appropriate.
149    ///
150    /// ```
151    /// use rune::Vm;
152    /// use rune::sync::Arc;
153    ///
154    /// # futures_executor::block_on(async move {
155    /// let mut sources = rune::sources! {
156    ///     entry => {
157    ///         pub async fn main() {
158    ///             yield 1;
159    ///             yield 2;
160    ///         }
161    ///     }
162    /// };
163    ///
164    /// let unit = rune::prepare(&mut sources).build()?;
165    /// let unit = Arc::try_new(unit)?;
166    /// let mut vm = Vm::without_runtime(unit)?;
167    ///
168    /// let mut stream = vm.execute(["main"], ())?.into_stream();
169    ///
170    /// let mut n = 1i64;
171    ///
172    /// while let Some(value) = stream.next().await? {
173    ///     let value: i64 = rune::from_value(value)?;
174    ///     assert_eq!(value, n);
175    ///     n += 1;
176    /// }
177    /// # Ok::<_, rune::support::Error>(())
178    /// # })?;
179    /// # Ok::<_, rune::support::Error>(())
180    /// ```
181    pub fn into_stream(self) -> VmStream<T> {
182        VmStream {
183            execution: Some(self),
184        }
185    }
186}
187
188impl<T> VmExecution<T>
189where
190    T: AsMut<Vm>,
191{
192    /// Synchronously complete the current execution.
193    ///
194    /// # Errors
195    ///
196    /// If anything except the completion of the execution is encountered, this
197    /// will result in an error.
198    ///
199    /// To handle other outcomes and more configurability see
200    /// [`VmExecution::resume`].
201    pub fn complete(&mut self) -> Result<Value, VmError> {
202        self.resume().complete()?.into_complete()
203    }
204
205    /// Asynchronously complete the current execution.
206    ///
207    /// # Errors
208    ///
209    /// If anything except the completion of the execution is encountered, this
210    /// will result in an error.
211    ///
212    /// To handle other outcomes and more configurability see
213    /// [`VmExecution::resume`].
214    pub async fn async_complete(&mut self) -> Result<Value, VmError> {
215        self.resume().await?.into_complete()
216    }
217
218    /// Resume the current execution.
219    ///
220    /// To complete this operation synchronously, use [`VmResume::complete`].
221    ///
222    /// ## Resume with a value
223    ///
224    /// To resume an execution with a value, use [`VmResume::with_value`]. This
225    /// requires that the execution has yielded first, otherwise an error will
226    /// be produced.
227    ///
228    /// ## Resume with diagnostics
229    ///
230    /// To associated [`VmDiagnostics`] with the execution, use
231    /// [`VmResume::with_diagnostics`].
232    pub fn resume(&mut self) -> VmResume<'_, 'static, T> {
233        VmResume {
234            execution: self,
235            diagnostics: None,
236            awaited: None,
237            init: Some(Value::empty()),
238        }
239    }
240
241    /// End execution and perform debug checks.
242    pub(crate) fn end(&mut self) -> Result<Value, VmError> {
243        let ExecutionState::Exited(addr) = self.state else {
244            return Err(VmError::new(VmErrorKind::ExpectedExitedExecutionState {
245                actual: self.state,
246            }));
247        };
248
249        let value = match addr {
250            Some(addr) => self.vm.as_mut().stack().at(addr).clone(),
251            None => Value::unit(),
252        };
253
254        debug_assert!(self.states.is_empty(), "Execution states should be empty");
255        Ok(value)
256    }
257
258    /// Push a virtual machine state onto the execution.
259    #[tracing::instrument(skip_all)]
260    pub(crate) fn push_state(&mut self, state: VmExecutionState) -> Result<(), VmError> {
261        tracing::trace!("pushing suspended state");
262        let vm = self.vm.as_mut();
263        let context = state.context.map(|c| replace(vm.context_mut(), c));
264        let unit = state.unit.map(|u| replace(vm.unit_mut(), u));
265        self.states.try_push(VmExecutionState { context, unit })?;
266        Ok(())
267    }
268
269    /// Pop a virtual machine state from the execution and transfer the top of
270    /// the stack from the popped machine.
271    #[tracing::instrument(skip_all)]
272    fn pop_state(&mut self) -> Result<(), VmError> {
273        tracing::trace!("popping suspended state");
274
275        let state = self.states.pop().ok_or(VmErrorKind::NoRunningVm)?;
276        let vm = self.vm.as_mut();
277
278        if let Some(context) = state.context {
279            *vm.context_mut() = context;
280        }
281
282        if let Some(unit) = state.unit {
283            *vm.unit_mut() = unit;
284        }
285
286        Ok(())
287    }
288}
289
290impl VmExecution<&mut Vm> {
291    /// Convert the current execution into one which owns its virtual machine.
292    pub fn into_owned(self) -> VmExecution<Vm> {
293        let stack = take(self.vm.stack_mut());
294        let head = Vm::with_stack(self.vm.context().clone(), self.vm.unit().clone(), stack);
295
296        VmExecution {
297            vm: head,
298            states: self.states,
299            state: self.state,
300        }
301    }
302}
303
304/// A wrapper that makes [`VmExecution`] [`Send`].
305///
306/// This is accomplished by preventing any [`Value`] from escaping the [`Vm`].
307/// As long as this is maintained, it is safe to send the execution across,
308/// threads, and therefore schedule the future associated with the execution on
309/// a thread pool like Tokio's through [tokio::spawn].
310///
311/// [tokio::spawn]: https://docs.rs/tokio/0/tokio/runtime/struct.Runtime.html#method.spawn
312pub struct VmSendExecution(pub(crate) VmExecution<Vm>);
313
314// Safety: we wrap all APIs around the [VmExecution], preventing values from
315// escaping from contained virtual machine.
316unsafe impl Send for VmSendExecution {}
317
318impl VmSendExecution {
319    /// Complete the current execution with support for async instructions.
320    ///
321    /// This requires that the result of the Vm is converted into a
322    /// [crate::FromValue] that also implements [Send],  which prevents non-Send
323    /// values from escaping from the virtual machine.
324    pub fn complete(mut self) -> impl Future<Output = Result<Value, VmError>> + Send + 'static {
325        let future = async move { self.0.resume().await.and_then(VmOutcome::into_complete) };
326
327        // Safety: we wrap all APIs around the [VmExecution], preventing values
328        // from escaping from contained virtual machine.
329        unsafe { AssertSend::new(future) }
330    }
331
332    /// Alias for [`VmSendExecution::complete`].
333    #[deprecated = "Use `VmSendExecution::complete`"]
334    pub fn async_complete(self) -> impl Future<Output = Result<Value, VmError>> + Send + 'static {
335        self.complete()
336    }
337
338    /// Complete the current execution with support for async instructions.
339    ///
340    /// This requires that the result of the Vm is converted into a
341    /// [crate::FromValue] that also implements [Send],  which prevents non-Send
342    /// values from escaping from the virtual machine.
343    pub fn complete_with_diagnostics(
344        mut self,
345        diagnostics: &mut dyn VmDiagnostics,
346    ) -> impl Future<Output = Result<Value, VmError>> + Send + '_ {
347        let future = async move {
348            self.0
349                .resume()
350                .with_diagnostics(diagnostics)
351                .await
352                .and_then(VmOutcome::into_complete)
353        };
354
355        // Safety: we wrap all APIs around the [VmExecution], preventing values
356        // from escaping from contained virtual machine.
357        unsafe { AssertSend::new(future) }
358    }
359
360    /// Alias for [`VmSendExecution::complete_with_diagnostics`].
361    #[deprecated = "Use `VmSendExecution::complete_with_diagnostics`"]
362    pub fn async_complete_with_diagnostics(
363        self,
364        diagnostics: &mut dyn VmDiagnostics,
365    ) -> impl Future<Output = Result<Value, VmError>> + Send + '_ {
366        self.complete_with_diagnostics(diagnostics)
367    }
368}
369
370impl<T> TryClone for VmExecution<T>
371where
372    T: AsRef<Vm> + AsMut<Vm> + TryClone,
373{
374    #[inline]
375    fn try_clone(&self) -> Result<Self, rune_alloc::Error> {
376        Ok(Self {
377            vm: self.vm.try_clone()?,
378            state: self.state,
379            states: self.states.try_clone()?,
380        })
381    }
382}
383
384/// The outcome of completing an execution through a [`VmResume`] operation.
385#[non_exhaustive]
386pub enum VmOutcome {
387    /// A value has been produced by the execution returning.
388    Complete(Value),
389    /// A value has been yielded by the execution.
390    Yielded(Value),
391    /// The execution has been limited.
392    Limited,
393}
394
395impl VmOutcome {
396    /// Convert the outcome into a [`GeneratorState`].
397    ///
398    /// # Errors
399    ///
400    /// If the execution is not in a state compatible with producing a generator
401    /// state, such as having been completed or yielded, this will produce an
402    /// error.
403    pub fn into_generator_state(self) -> Result<GeneratorState, VmError> {
404        match self {
405            VmOutcome::Complete(value) => Ok(GeneratorState::Complete(value)),
406            VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
407            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
408                halt: VmHaltInfo::Limited,
409            })),
410        }
411    }
412
413    /// Convert the outcome into a completed value.
414    ///
415    /// # Errors
416    ///
417    /// If the execution hasn't returned, this will produce an error.
418    pub fn into_complete(self) -> Result<Value, VmError> {
419        match self {
420            VmOutcome::Complete(value) => Ok(value),
421            VmOutcome::Yielded(..) => Err(VmError::new(VmErrorKind::Halted {
422                halt: VmHaltInfo::Yielded,
423            })),
424            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
425                halt: VmHaltInfo::Limited,
426            })),
427        }
428    }
429}
430
431/// An execution that has been resumed.
432///
433/// This can either be completed as a future, which allows the execution to
434/// perform asynchronous operations, or it can be completed by calling
435/// [`VmResume::complete`] which will produce an error in case asynchronous
436/// operations that need to be suspended are encountered.
437pub struct VmResume<'this, 'diag, T> {
438    execution: &'this mut VmExecution<T>,
439    diagnostics: Option<&'diag mut dyn VmDiagnostics>,
440    init: Option<Value>,
441    awaited: Option<Awaited>,
442}
443
444impl<'this, 'diag, T> VmResume<'this, 'diag, T> {
445    /// Associated a budget with the resumed execution.
446    pub fn with_budget(self, budget: usize) -> Budget<Self> {
447        budget::with(budget, self)
448    }
449
450    /// Associate a value with the resumed execution.
451    ///
452    /// This is necessary to provide a value for a generator which has yielded.
453    pub fn with_value(self, value: Value) -> VmResume<'this, 'diag, T> {
454        Self {
455            init: Some(value),
456            ..self
457        }
458    }
459
460    /// Associate diagnostics with the execution.
461    pub fn with_diagnostics<'a>(
462        self,
463        diagnostics: &'a mut dyn VmDiagnostics,
464    ) -> VmResume<'this, 'a, T> {
465        VmResume {
466            execution: self.execution,
467            diagnostics: Some(diagnostics),
468            init: self.init,
469            awaited: self.awaited,
470        }
471    }
472}
473
474impl<'this, 'diag, T> VmResume<'this, 'diag, T>
475where
476    T: AsMut<Vm>,
477{
478    /// Try to synchronously complete the run, returning the generator state it produced.
479    ///
480    /// This will error if the execution is suspended through awaiting.
481    pub fn complete(self) -> Result<VmOutcome, VmError> {
482        let this = pin!(self);
483        let mut cx = Context::from_waker(&COMPLETE_WAKER);
484
485        match this.poll(&mut cx) {
486            Poll::Ready(result) => result,
487            Poll::Pending => Err(VmError::new(VmErrorKind::Halted {
488                halt: VmHaltInfo::Awaited,
489            })),
490        }
491    }
492}
493
494impl<'this, 'diag, T> Future for VmResume<'this, 'diag, T>
495where
496    T: AsMut<Vm>,
497{
498    type Output = Result<VmOutcome, VmError>;
499
500    #[inline]
501    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502        // SAFETY: We are ensuring that we never move this value or any
503        // projected fields.
504        let this = unsafe { Pin::get_unchecked_mut(self) };
505
506        if let Some(value) = this.init.take() {
507            let state = replace(&mut this.execution.state, ExecutionState::Suspended);
508
509            if let ExecutionState::Resumed(out) = state {
510                let vm = this.execution.vm.as_mut();
511                async_vm_try!(vm.stack_mut().store(out, value));
512            }
513        }
514
515        loop {
516            let vm = this.execution.vm.as_mut();
517
518            if let Some(awaited) = &mut this.awaited {
519                let awaited = unsafe { Pin::new_unchecked(awaited) };
520                async_vm_try!(ready!(awaited.poll(cx, vm)));
521                this.awaited = None;
522            }
523
524            let result = vm.run(match this.diagnostics {
525                Some(ref mut value) => Some(&mut **value),
526                None => None,
527            });
528
529            match async_vm_try!(VmError::with_vm(result, vm)) {
530                VmHalt::Exited(addr) => {
531                    this.execution.state = ExecutionState::Exited(addr);
532                }
533                VmHalt::Awaited(awaited) => {
534                    this.awaited = Some(awaited);
535                    continue;
536                }
537                VmHalt::VmCall(vm_call) => {
538                    async_vm_try!(vm_call.into_execution(this.execution));
539                    continue;
540                }
541                VmHalt::Yielded(addr, out) => {
542                    let value = match addr {
543                        Some(addr) => vm.stack().at(addr).clone(),
544                        None => Value::unit(),
545                    };
546
547                    this.execution.state = ExecutionState::Resumed(out);
548                    return Poll::Ready(Ok(VmOutcome::Yielded(value)));
549                }
550                VmHalt::Limited => {
551                    return Poll::Ready(Ok(VmOutcome::Limited));
552                }
553            }
554
555            if this.execution.states.is_empty() {
556                let value = async_vm_try!(this.execution.end());
557                return Poll::Ready(Ok(VmOutcome::Complete(value)));
558            }
559
560            async_vm_try!(this.execution.pop_state());
561        }
562    }
563}
564
565/// A [`VmExecution`] that can be used with a generator api.
566pub struct VmGenerator<T> {
567    execution: Option<VmExecution<T>>,
568}
569
570impl<T> VmGenerator<T>
571where
572    T: AsMut<Vm>,
573{
574    /// Get the next value produced by this generator.
575    ///
576    /// See [`VmExecution::into_generator`].
577    pub fn next(&mut self) -> Result<Option<Value>, VmError> {
578        let Some(execution) = &mut self.execution else {
579            return Ok(None);
580        };
581
582        let outcome = execution.resume().complete()?;
583
584        match outcome {
585            VmOutcome::Complete(_) => {
586                self.execution = None;
587                Ok(None)
588            }
589            VmOutcome::Yielded(value) => Ok(Some(value)),
590            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
591                halt: VmHaltInfo::Limited,
592            })),
593        }
594    }
595
596    /// Resume the generator with a value and get the next [`GeneratorState`].
597    ///
598    /// See [`VmExecution::into_generator`].
599    pub fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
600        let execution = self
601            .execution
602            .as_mut()
603            .ok_or(VmErrorKind::GeneratorComplete)?;
604
605        let outcome = execution.resume().with_value(value).complete()?;
606
607        match outcome {
608            VmOutcome::Complete(value) => {
609                self.execution = None;
610                Ok(GeneratorState::Complete(value))
611            }
612            VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
613            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
614                halt: VmHaltInfo::Limited,
615            })),
616        }
617    }
618}
619
620/// A [`VmExecution`] that can be used with a stream api.
621pub struct VmStream<T> {
622    execution: Option<VmExecution<T>>,
623}
624
625impl<T> VmStream<T>
626where
627    T: AsMut<Vm>,
628{
629    /// Get the next value produced by this stream.
630    ///
631    /// See [`VmExecution::into_stream`].
632    pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
633        let Some(execution) = &mut self.execution else {
634            return Ok(None);
635        };
636
637        match execution.resume().await? {
638            VmOutcome::Complete(value) => {
639                self.execution = None;
640                Ok(Some(value))
641            }
642            VmOutcome::Yielded(..) => Ok(None),
643            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
644                halt: VmHaltInfo::Limited,
645            })),
646        }
647    }
648
649    /// Resume the stream with a value and return the next [`GeneratorState`].
650    ///
651    /// See [`VmExecution::into_stream`].
652    pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
653        let execution = self
654            .execution
655            .as_mut()
656            .ok_or(VmErrorKind::GeneratorComplete)?;
657
658        match execution.resume().with_value(value).await? {
659            VmOutcome::Complete(value) => {
660                self.execution = None;
661                Ok(GeneratorState::Complete(value))
662            }
663            VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
664            VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
665                halt: VmHaltInfo::Limited,
666            })),
667        }
668    }
669}