1use core::fmt;
2use core::future::Future;
3use core::mem::{replace, take};
4use core::pin::{pin, Pin};
5use core::task::{ready, Context, Poll, RawWaker, RawWakerVTable, Waker};
6
7use crate::alloc::prelude::*;
8use crate::async_vm_try;
9use crate::runtime::budget::Budget;
10use crate::runtime::{budget, Awaited};
11use crate::shared::AssertSend;
12use crate::sync::Arc;
13
14use super::{
15 Address, GeneratorState, Output, RuntimeContext, Unit, Value, Vm, VmDiagnostics, VmError,
16 VmErrorKind, VmHalt, VmHaltInfo,
17};
18
19static COMPLETE_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
20 |_| RawWaker::new(&(), &COMPLETE_WAKER_VTABLE),
21 |_| {},
22 |_| {},
23 |_| {},
24);
25
26static COMPLETE_WAKER: Waker =
28 unsafe { Waker::from_raw(RawWaker::new(&(), &COMPLETE_WAKER_VTABLE)) };
29
30#[derive(Debug, Clone, Copy, PartialEq)]
35#[non_exhaustive]
36pub(crate) enum ExecutionState {
37 Initial,
39 Resumed(Output),
41 Suspended,
43 Exited(Option<Address>),
45}
46
47impl fmt::Display for ExecutionState {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 ExecutionState::Initial => write!(f, "initial"),
51 ExecutionState::Resumed(out) => write!(f, "resumed({out})"),
52 ExecutionState::Suspended => write!(f, "suspended"),
53 ExecutionState::Exited(..) => write!(f, "exited"),
54 }
55 }
56}
57
58#[derive(TryClone)]
59#[try_clone(crate)]
60pub(crate) struct VmExecutionState {
61 pub(crate) context: Option<Arc<RuntimeContext>>,
62 pub(crate) unit: Option<Arc<Unit>>,
63}
64
65pub struct VmExecution<T> {
70 vm: T,
72 state: ExecutionState,
74 states: Vec<VmExecutionState>,
76}
77
78impl<T> VmExecution<T> {
79 #[inline]
81 pub(crate) fn new(vm: T) -> Self {
82 Self {
83 vm,
84 state: ExecutionState::Initial,
85 states: Vec::new(),
86 }
87 }
88
89 #[inline]
91 pub fn vm(&self) -> &Vm
92 where
93 T: AsRef<Vm>,
94 {
95 self.vm.as_ref()
96 }
97
98 #[inline]
100 pub fn vm_mut(&mut self) -> &mut Vm
101 where
102 T: AsMut<Vm>,
103 {
104 self.vm.as_mut()
105 }
106}
107
108impl<T> VmExecution<T>
109where
110 T: AsMut<Vm>,
111{
112 pub fn into_generator(self) -> VmGenerator<T> {
143 VmGenerator {
144 execution: Some(self),
145 }
146 }
147
148 pub fn into_stream(self) -> VmStream<T> {
182 VmStream {
183 execution: Some(self),
184 }
185 }
186}
187
188impl<T> VmExecution<T>
189where
190 T: AsMut<Vm>,
191{
192 pub fn complete(&mut self) -> Result<Value, VmError> {
202 self.resume().complete()?.into_complete()
203 }
204
205 pub async fn async_complete(&mut self) -> Result<Value, VmError> {
215 self.resume().await?.into_complete()
216 }
217
218 pub fn resume(&mut self) -> VmResume<'_, 'static, T> {
233 VmResume {
234 execution: self,
235 diagnostics: None,
236 awaited: None,
237 init: Some(Value::empty()),
238 }
239 }
240
241 pub(crate) fn end(&mut self) -> Result<Value, VmError> {
243 let ExecutionState::Exited(addr) = self.state else {
244 return Err(VmError::new(VmErrorKind::ExpectedExitedExecutionState {
245 actual: self.state,
246 }));
247 };
248
249 let value = match addr {
250 Some(addr) => self.vm.as_mut().stack().at(addr).clone(),
251 None => Value::unit(),
252 };
253
254 debug_assert!(self.states.is_empty(), "Execution states should be empty");
255 Ok(value)
256 }
257
258 #[tracing::instrument(skip_all)]
260 pub(crate) fn push_state(&mut self, state: VmExecutionState) -> Result<(), VmError> {
261 tracing::trace!("pushing suspended state");
262 let vm = self.vm.as_mut();
263 let context = state.context.map(|c| replace(vm.context_mut(), c));
264 let unit = state.unit.map(|u| replace(vm.unit_mut(), u));
265 self.states.try_push(VmExecutionState { context, unit })?;
266 Ok(())
267 }
268
269 #[tracing::instrument(skip_all)]
272 fn pop_state(&mut self) -> Result<(), VmError> {
273 tracing::trace!("popping suspended state");
274
275 let state = self.states.pop().ok_or(VmErrorKind::NoRunningVm)?;
276 let vm = self.vm.as_mut();
277
278 if let Some(context) = state.context {
279 *vm.context_mut() = context;
280 }
281
282 if let Some(unit) = state.unit {
283 *vm.unit_mut() = unit;
284 }
285
286 Ok(())
287 }
288}
289
290impl VmExecution<&mut Vm> {
291 pub fn into_owned(self) -> VmExecution<Vm> {
293 let stack = take(self.vm.stack_mut());
294 let head = Vm::with_stack(self.vm.context().clone(), self.vm.unit().clone(), stack);
295
296 VmExecution {
297 vm: head,
298 states: self.states,
299 state: self.state,
300 }
301 }
302}
303
304pub struct VmSendExecution(pub(crate) VmExecution<Vm>);
313
314unsafe impl Send for VmSendExecution {}
317
318impl VmSendExecution {
319 pub fn complete(mut self) -> impl Future<Output = Result<Value, VmError>> + Send + 'static {
325 let future = async move { self.0.resume().await.and_then(VmOutcome::into_complete) };
326
327 unsafe { AssertSend::new(future) }
330 }
331
332 #[deprecated = "Use `VmSendExecution::complete`"]
334 pub fn async_complete(self) -> impl Future<Output = Result<Value, VmError>> + Send + 'static {
335 self.complete()
336 }
337
338 pub fn complete_with_diagnostics(
344 mut self,
345 diagnostics: &mut dyn VmDiagnostics,
346 ) -> impl Future<Output = Result<Value, VmError>> + Send + '_ {
347 let future = async move {
348 self.0
349 .resume()
350 .with_diagnostics(diagnostics)
351 .await
352 .and_then(VmOutcome::into_complete)
353 };
354
355 unsafe { AssertSend::new(future) }
358 }
359
360 #[deprecated = "Use `VmSendExecution::complete_with_diagnostics`"]
362 pub fn async_complete_with_diagnostics(
363 self,
364 diagnostics: &mut dyn VmDiagnostics,
365 ) -> impl Future<Output = Result<Value, VmError>> + Send + '_ {
366 self.complete_with_diagnostics(diagnostics)
367 }
368}
369
370impl<T> TryClone for VmExecution<T>
371where
372 T: AsRef<Vm> + AsMut<Vm> + TryClone,
373{
374 #[inline]
375 fn try_clone(&self) -> Result<Self, rune_alloc::Error> {
376 Ok(Self {
377 vm: self.vm.try_clone()?,
378 state: self.state,
379 states: self.states.try_clone()?,
380 })
381 }
382}
383
384#[non_exhaustive]
386pub enum VmOutcome {
387 Complete(Value),
389 Yielded(Value),
391 Limited,
393}
394
395impl VmOutcome {
396 pub fn into_generator_state(self) -> Result<GeneratorState, VmError> {
404 match self {
405 VmOutcome::Complete(value) => Ok(GeneratorState::Complete(value)),
406 VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
407 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
408 halt: VmHaltInfo::Limited,
409 })),
410 }
411 }
412
413 pub fn into_complete(self) -> Result<Value, VmError> {
419 match self {
420 VmOutcome::Complete(value) => Ok(value),
421 VmOutcome::Yielded(..) => Err(VmError::new(VmErrorKind::Halted {
422 halt: VmHaltInfo::Yielded,
423 })),
424 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
425 halt: VmHaltInfo::Limited,
426 })),
427 }
428 }
429}
430
431pub struct VmResume<'this, 'diag, T> {
438 execution: &'this mut VmExecution<T>,
439 diagnostics: Option<&'diag mut dyn VmDiagnostics>,
440 init: Option<Value>,
441 awaited: Option<Awaited>,
442}
443
444impl<'this, 'diag, T> VmResume<'this, 'diag, T> {
445 pub fn with_budget(self, budget: usize) -> Budget<Self> {
447 budget::with(budget, self)
448 }
449
450 pub fn with_value(self, value: Value) -> VmResume<'this, 'diag, T> {
454 Self {
455 init: Some(value),
456 ..self
457 }
458 }
459
460 pub fn with_diagnostics<'a>(
462 self,
463 diagnostics: &'a mut dyn VmDiagnostics,
464 ) -> VmResume<'this, 'a, T> {
465 VmResume {
466 execution: self.execution,
467 diagnostics: Some(diagnostics),
468 init: self.init,
469 awaited: self.awaited,
470 }
471 }
472}
473
474impl<'this, 'diag, T> VmResume<'this, 'diag, T>
475where
476 T: AsMut<Vm>,
477{
478 pub fn complete(self) -> Result<VmOutcome, VmError> {
482 let this = pin!(self);
483 let mut cx = Context::from_waker(&COMPLETE_WAKER);
484
485 match this.poll(&mut cx) {
486 Poll::Ready(result) => result,
487 Poll::Pending => Err(VmError::new(VmErrorKind::Halted {
488 halt: VmHaltInfo::Awaited,
489 })),
490 }
491 }
492}
493
494impl<'this, 'diag, T> Future for VmResume<'this, 'diag, T>
495where
496 T: AsMut<Vm>,
497{
498 type Output = Result<VmOutcome, VmError>;
499
500 #[inline]
501 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 let this = unsafe { Pin::get_unchecked_mut(self) };
505
506 if let Some(value) = this.init.take() {
507 let state = replace(&mut this.execution.state, ExecutionState::Suspended);
508
509 if let ExecutionState::Resumed(out) = state {
510 let vm = this.execution.vm.as_mut();
511 async_vm_try!(vm.stack_mut().store(out, value));
512 }
513 }
514
515 loop {
516 let vm = this.execution.vm.as_mut();
517
518 if let Some(awaited) = &mut this.awaited {
519 let awaited = unsafe { Pin::new_unchecked(awaited) };
520 async_vm_try!(ready!(awaited.poll(cx, vm)));
521 this.awaited = None;
522 }
523
524 let result = vm.run(match this.diagnostics {
525 Some(ref mut value) => Some(&mut **value),
526 None => None,
527 });
528
529 match async_vm_try!(VmError::with_vm(result, vm)) {
530 VmHalt::Exited(addr) => {
531 this.execution.state = ExecutionState::Exited(addr);
532 }
533 VmHalt::Awaited(awaited) => {
534 this.awaited = Some(awaited);
535 continue;
536 }
537 VmHalt::VmCall(vm_call) => {
538 async_vm_try!(vm_call.into_execution(this.execution));
539 continue;
540 }
541 VmHalt::Yielded(addr, out) => {
542 let value = match addr {
543 Some(addr) => vm.stack().at(addr).clone(),
544 None => Value::unit(),
545 };
546
547 this.execution.state = ExecutionState::Resumed(out);
548 return Poll::Ready(Ok(VmOutcome::Yielded(value)));
549 }
550 VmHalt::Limited => {
551 return Poll::Ready(Ok(VmOutcome::Limited));
552 }
553 }
554
555 if this.execution.states.is_empty() {
556 let value = async_vm_try!(this.execution.end());
557 return Poll::Ready(Ok(VmOutcome::Complete(value)));
558 }
559
560 async_vm_try!(this.execution.pop_state());
561 }
562 }
563}
564
565pub struct VmGenerator<T> {
567 execution: Option<VmExecution<T>>,
568}
569
570impl<T> VmGenerator<T>
571where
572 T: AsMut<Vm>,
573{
574 pub fn next(&mut self) -> Result<Option<Value>, VmError> {
578 let Some(execution) = &mut self.execution else {
579 return Ok(None);
580 };
581
582 let outcome = execution.resume().complete()?;
583
584 match outcome {
585 VmOutcome::Complete(_) => {
586 self.execution = None;
587 Ok(None)
588 }
589 VmOutcome::Yielded(value) => Ok(Some(value)),
590 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
591 halt: VmHaltInfo::Limited,
592 })),
593 }
594 }
595
596 pub fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
600 let execution = self
601 .execution
602 .as_mut()
603 .ok_or(VmErrorKind::GeneratorComplete)?;
604
605 let outcome = execution.resume().with_value(value).complete()?;
606
607 match outcome {
608 VmOutcome::Complete(value) => {
609 self.execution = None;
610 Ok(GeneratorState::Complete(value))
611 }
612 VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
613 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
614 halt: VmHaltInfo::Limited,
615 })),
616 }
617 }
618}
619
620pub struct VmStream<T> {
622 execution: Option<VmExecution<T>>,
623}
624
625impl<T> VmStream<T>
626where
627 T: AsMut<Vm>,
628{
629 pub async fn next(&mut self) -> Result<Option<Value>, VmError> {
633 let Some(execution) = &mut self.execution else {
634 return Ok(None);
635 };
636
637 match execution.resume().await? {
638 VmOutcome::Complete(value) => {
639 self.execution = None;
640 Ok(Some(value))
641 }
642 VmOutcome::Yielded(..) => Ok(None),
643 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
644 halt: VmHaltInfo::Limited,
645 })),
646 }
647 }
648
649 pub async fn resume(&mut self, value: Value) -> Result<GeneratorState, VmError> {
653 let execution = self
654 .execution
655 .as_mut()
656 .ok_or(VmErrorKind::GeneratorComplete)?;
657
658 match execution.resume().with_value(value).await? {
659 VmOutcome::Complete(value) => {
660 self.execution = None;
661 Ok(GeneratorState::Complete(value))
662 }
663 VmOutcome::Yielded(value) => Ok(GeneratorState::Yielded(value)),
664 VmOutcome::Limited => Err(VmError::new(VmErrorKind::Halted {
665 halt: VmHaltInfo::Limited,
666 })),
667 }
668 }
669}