mirror of
https://github.com/fluencelabs/aquavm
synced 2025-03-15 20:40:50 +00:00
fix(execution-engine): save order between current generations (#366)
Co-authored-by: Valery Antopol <valery.antopol@gmail.com>
This commit is contained in:
parent
651b569a13
commit
a60b61e1a1
@ -33,7 +33,7 @@ impl StreamCursor {
|
|||||||
pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
|
pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
|
||||||
let iterables =
|
let iterables =
|
||||||
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
|
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
|
||||||
self.last_seen_generation = stream.generations_count() as u32;
|
self.last_seen_generation = stream.last_non_empty_generation() as u32;
|
||||||
|
|
||||||
iterables
|
iterables
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> {
|
|||||||
// any error. It's highly important to distinguish between global and restricted streams
|
// any error. It's highly important to distinguish between global and restricted streams
|
||||||
// at the end of execution to make a correct data.
|
// at the end of execution to make a correct data.
|
||||||
let instruction_result = self.instruction.execute(exec_ctx, trace_ctx);
|
let instruction_result = self.instruction.execute(exec_ctx, trace_ctx);
|
||||||
let epilog_result = epilog(self, exec_ctx);
|
let epilog_result = epilog(self, exec_ctx, trace_ctx);
|
||||||
|
|
||||||
match (instruction_result, epilog_result) {
|
match (instruction_result, epilog_result) {
|
||||||
(Ok(()), Ok(())) => Ok(()),
|
(Ok(()), Ok(())) => Ok(()),
|
||||||
@ -62,11 +62,13 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
|
|||||||
exec_ctx.tracker.meet_new(position);
|
exec_ctx.tracker.meet_new(position);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> {
|
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||||
let position = new.span.left;
|
let position = new.span.left;
|
||||||
match &new.argument {
|
match &new.argument {
|
||||||
NewArgument::Stream(stream) => {
|
NewArgument::Stream(stream) => {
|
||||||
exec_ctx.streams.meet_scope_end(stream.name.to_string(), position)?;
|
exec_ctx
|
||||||
|
.streams
|
||||||
|
.meet_scope_end(stream.name.to_string(), position, trace_ctx)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),
|
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),
|
||||||
|
@ -17,10 +17,13 @@
|
|||||||
use super::ExecutionResult;
|
use super::ExecutionResult;
|
||||||
use super::ValueAggregate;
|
use super::ValueAggregate;
|
||||||
use crate::execution_step::CatchableError;
|
use crate::execution_step::CatchableError;
|
||||||
|
use crate::ExecutionError;
|
||||||
use crate::JValue;
|
use crate::JValue;
|
||||||
|
use crate::UncatchableError;
|
||||||
|
|
||||||
use air_interpreter_data::TracePos;
|
use air_interpreter_data::TracePos;
|
||||||
use air_trace_handler::merger::ValueSource;
|
use air_trace_handler::merger::ValueSource;
|
||||||
|
use air_trace_handler::TraceHandler;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Formatter;
|
use std::fmt::Formatter;
|
||||||
@ -35,18 +38,26 @@ pub struct Stream {
|
|||||||
/// obtained values from a current_data that were not present in prev_data becomes a new generation.
|
/// obtained values from a current_data that were not present in prev_data becomes a new generation.
|
||||||
values: Vec<Vec<ValueAggregate>>,
|
values: Vec<Vec<ValueAggregate>>,
|
||||||
|
|
||||||
|
/// Count of values from previous data.
|
||||||
|
previous_gens_count: usize,
|
||||||
|
|
||||||
/// This map is intended to support canonicalized stream creation, such streams has
|
/// This map is intended to support canonicalized stream creation, such streams has
|
||||||
/// corresponding value positions in a data and this field are used to create such streams.
|
/// corresponding value positions in a data and this field are used to create such streams.
|
||||||
values_by_pos: HashMap<TracePos, StreamValueLocation>,
|
values_by_pos: HashMap<TracePos, StreamValueLocation>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream {
|
impl Stream {
|
||||||
pub(crate) fn from_generations_count(previous_count: usize) -> Self {
|
pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self {
|
||||||
let last_generation_count = 1;
|
let last_generation_count = 1;
|
||||||
// TODO: bubble up an overflow error instead of expect
|
// TODO: bubble up an overflow error instead of expect
|
||||||
let overall_gens_count = previous_count + last_generation_count;
|
let overall_count = previous_count
|
||||||
|
.checked_add(current_count)
|
||||||
|
.and_then(|value| value.checked_add(last_generation_count))
|
||||||
|
.expect("it shouldn't overflow");
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
values: vec![vec![]; overall_gens_count],
|
values: vec![vec![]; overall_count],
|
||||||
|
previous_gens_count: previous_count,
|
||||||
values_by_pos: HashMap::new(),
|
values_by_pos: HashMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -59,6 +70,7 @@ impl Stream {
|
|||||||
};
|
};
|
||||||
Self {
|
Self {
|
||||||
values: vec![vec![value]],
|
values: vec![vec![value]],
|
||||||
|
previous_gens_count: 0,
|
||||||
values_by_pos,
|
values_by_pos,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -74,7 +86,7 @@ impl Stream {
|
|||||||
let generation = match (generation, source) {
|
let generation = match (generation, source) {
|
||||||
(Generation::Last, _) => self.values.len() - 1,
|
(Generation::Last, _) => self.values.len() - 1,
|
||||||
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
|
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
|
||||||
(_, ValueSource::CurrentData) => self.values.len() - 1,
|
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize,
|
||||||
};
|
};
|
||||||
|
|
||||||
if generation >= self.values.len() {
|
if generation >= self.values.len() {
|
||||||
@ -90,7 +102,21 @@ impl Stream {
|
|||||||
|
|
||||||
pub(crate) fn generations_count(&self) -> usize {
|
pub(crate) fn generations_count(&self) -> usize {
|
||||||
// the last generation could be empty due to the logic of from_generations_count ctor
|
// the last generation could be empty due to the logic of from_generations_count ctor
|
||||||
self.values.iter().filter(|gen| !gen.is_empty()).count()
|
if self.values.last().unwrap().is_empty() {
|
||||||
|
self.values.len() - 1
|
||||||
|
} else {
|
||||||
|
self.values.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn last_non_empty_generation(&self) -> usize {
|
||||||
|
self.values
|
||||||
|
.iter()
|
||||||
|
.rposition(|generation| !generation.is_empty())
|
||||||
|
// it's safe to add + 1 here, because this function is called when
|
||||||
|
// there is a new state was added with add_new_generation_if_non_empty
|
||||||
|
.map(|non_empty_gens| non_empty_gens + 1)
|
||||||
|
.unwrap_or_else(|| self.generations_count())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a new empty generation if the latest isn't empty.
|
/// Add a new empty generation if the latest isn't empty.
|
||||||
@ -195,6 +221,26 @@ impl Stream {
|
|||||||
|
|
||||||
Some(iter)
|
Some(iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes empty generations updating data and returns final generation count.
|
||||||
|
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<usize> {
|
||||||
|
self.remove_empty_generations();
|
||||||
|
|
||||||
|
for (generation, values) in self.values.iter().enumerate() {
|
||||||
|
for value in values.iter() {
|
||||||
|
trace_ctx
|
||||||
|
.update_generation(value.trace_pos, generation as u32)
|
||||||
|
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.values.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes empty generations from current values.
|
||||||
|
fn remove_empty_generations(&mut self) {
|
||||||
|
self.values.retain(|values| !values.is_empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||||
@ -296,7 +342,7 @@ mod test {
|
|||||||
fn test_slice_iter() {
|
fn test_slice_iter() {
|
||||||
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
||||||
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
|
||||||
let mut stream = Stream::from_generations_count(2);
|
let mut stream = Stream::from_generations_count(2, 0);
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.add_value(value_1, Generation::Nth(0), ValueSource::PreviousData)
|
.add_value(value_1, Generation::Nth(0), ValueSource::PreviousData)
|
||||||
@ -320,7 +366,7 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_slice_on_empty_stream() {
|
fn test_slice_on_empty_stream() {
|
||||||
let stream = Stream::from_generations_count(2);
|
let stream = Stream::from_generations_count(2, 0);
|
||||||
|
|
||||||
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
|
||||||
assert!(slice.is_none());
|
assert!(slice.is_none());
|
||||||
@ -339,7 +385,7 @@ mod test {
|
|||||||
fn generation_from_current_data() {
|
fn generation_from_current_data() {
|
||||||
let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into());
|
let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into());
|
||||||
let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into());
|
let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into());
|
||||||
let mut stream = Stream::from_generations_count(5);
|
let mut stream = Stream::from_generations_count(5, 5);
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData)
|
.add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData)
|
||||||
@ -349,7 +395,7 @@ mod test {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let generations_count = stream.generations_count();
|
let generations_count = stream.generations_count();
|
||||||
assert_eq!(generations_count, 2);
|
assert_eq!(generations_count, 10);
|
||||||
|
|
||||||
let mut iter = stream.iter(Generation::Last).unwrap();
|
let mut iter = stream.iter(Generation::Last).unwrap();
|
||||||
let stream_value_1 = iter.next().unwrap();
|
let stream_value_1 = iter.next().unwrap();
|
||||||
|
@ -20,6 +20,7 @@ use crate::ToErrorCode;
|
|||||||
use air_interpreter_data::TracePos;
|
use air_interpreter_data::TracePos;
|
||||||
use air_interpreter_data::Value;
|
use air_interpreter_data::Value;
|
||||||
use air_trace_handler::merger::MergerApResult;
|
use air_trace_handler::merger::MergerApResult;
|
||||||
|
use air_trace_handler::GenerationCompatificationError;
|
||||||
use air_trace_handler::TraceHandlerError;
|
use air_trace_handler::TraceHandlerError;
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
use strum_macros::EnumDiscriminants;
|
use strum_macros::EnumDiscriminants;
|
||||||
@ -41,6 +42,10 @@ pub enum UncatchableError {
|
|||||||
instruction: String,
|
instruction: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// These errors are related to internal bug in the interpreter when result trace is corrupted.
|
||||||
|
#[error(transparent)]
|
||||||
|
GenerationCompatificationError(#[from] GenerationCompatificationError),
|
||||||
|
|
||||||
/// Fold state wasn't found for such iterator name.
|
/// Fold state wasn't found for such iterator name.
|
||||||
#[error("fold state not found for this iterable '{0}'")]
|
#[error("fold state not found for this iterable '{0}'")]
|
||||||
FoldStateNotFound(String),
|
FoldStateNotFound(String),
|
||||||
|
@ -66,9 +66,19 @@ pub(crate) struct ExecutionCtx<'i> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<'i> ExecutionCtx<'i> {
|
impl<'i> ExecutionCtx<'i> {
|
||||||
pub(crate) fn new(prev_data: &InterpreterData, call_results: CallResults, run_parameters: RunParameters) -> Self {
|
pub(crate) fn new(
|
||||||
|
prev_data: &InterpreterData,
|
||||||
|
current_data: &InterpreterData,
|
||||||
|
call_results: CallResults,
|
||||||
|
run_parameters: RunParameters,
|
||||||
|
) -> Self {
|
||||||
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
|
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
|
||||||
let streams = Streams::from_data(&prev_data.global_streams, prev_data.restricted_streams.clone());
|
let streams = Streams::from_data(
|
||||||
|
&prev_data.global_streams,
|
||||||
|
¤t_data.global_streams,
|
||||||
|
prev_data.restricted_streams.clone(),
|
||||||
|
current_data.restricted_streams.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
run_parameters,
|
run_parameters,
|
||||||
|
@ -20,6 +20,7 @@ mod utils;
|
|||||||
|
|
||||||
use crate::execution_step::ExecutionResult;
|
use crate::execution_step::ExecutionResult;
|
||||||
use crate::execution_step::Stream;
|
use crate::execution_step::Stream;
|
||||||
|
use crate::ExecutionError;
|
||||||
use stream_descriptor::*;
|
use stream_descriptor::*;
|
||||||
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
|
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
|
||||||
|
|
||||||
@ -27,6 +28,7 @@ use air_interpreter_data::GlobalStreamGens;
|
|||||||
use air_interpreter_data::RestrictedStreamGens;
|
use air_interpreter_data::RestrictedStreamGens;
|
||||||
use air_parser::ast::Span;
|
use air_parser::ast::Span;
|
||||||
use air_parser::AirPos;
|
use air_parser::AirPos;
|
||||||
|
use air_trace_handler::TraceHandler;
|
||||||
|
|
||||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@ -42,6 +44,10 @@ pub(crate) struct Streams {
|
|||||||
/// should have at the scope start.
|
/// should have at the scope start.
|
||||||
previous_restricted_stream_gens: RestrictedStreamGens,
|
previous_restricted_stream_gens: RestrictedStreamGens,
|
||||||
|
|
||||||
|
/// Contains stream generations from current data that a restricted stream
|
||||||
|
/// should have at the scope start.
|
||||||
|
current_restricted_stream_gens: RestrictedStreamGens,
|
||||||
|
|
||||||
/// Contains stream generations that each private stream had at the scope end.
|
/// Contains stream generations that each private stream had at the scope end.
|
||||||
/// Then it's placed into data
|
/// Then it's placed into data
|
||||||
new_restricted_stream_gens: RestrictedStreamGens,
|
new_restricted_stream_gens: RestrictedStreamGens,
|
||||||
@ -50,13 +56,16 @@ pub(crate) struct Streams {
|
|||||||
impl Streams {
|
impl Streams {
|
||||||
pub(crate) fn from_data(
|
pub(crate) fn from_data(
|
||||||
previous_global_streams: &GlobalStreamGens,
|
previous_global_streams: &GlobalStreamGens,
|
||||||
|
current_global_streams: &GlobalStreamGens,
|
||||||
previous_restricted_stream_gens: RestrictedStreamGens,
|
previous_restricted_stream_gens: RestrictedStreamGens,
|
||||||
|
current_restricted_stream_gens: RestrictedStreamGens,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let streams = utils::prepare_global_streams(previous_global_streams);
|
let streams = utils::merge_global_streams(previous_global_streams, current_global_streams);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
streams,
|
streams,
|
||||||
previous_restricted_stream_gens,
|
previous_restricted_stream_gens,
|
||||||
|
current_restricted_stream_gens,
|
||||||
new_restricted_stream_gens: <_>::default(),
|
new_restricted_stream_gens: <_>::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -103,9 +112,10 @@ impl Streams {
|
|||||||
|
|
||||||
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
|
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
|
||||||
let name = name.into();
|
let name = name.into();
|
||||||
let prev_gens_count = self.stream_generation_from_data(&name, span.left, iteration as usize);
|
let (prev_gens_count, current_gens_count) =
|
||||||
|
self.stream_generation_from_data(&name, span.left, iteration as usize);
|
||||||
|
|
||||||
let new_stream = Stream::from_generations_count(prev_gens_count as usize);
|
let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
|
||||||
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
|
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
|
||||||
match self.streams.entry(name) {
|
match self.streams.entry(name) {
|
||||||
Occupied(mut entry) => {
|
Occupied(mut entry) => {
|
||||||
@ -117,7 +127,12 @@ impl Streams {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn meet_scope_end(&mut self, name: String, position: AirPos) -> ExecutionResult<()> {
|
pub(crate) fn meet_scope_end(
|
||||||
|
&mut self,
|
||||||
|
name: String,
|
||||||
|
position: AirPos,
|
||||||
|
trace_ctx: &mut TraceHandler,
|
||||||
|
) -> ExecutionResult<()> {
|
||||||
// unwraps are safe here because met_scope_end must be called after met_scope_start
|
// unwraps are safe here because met_scope_end must be called after met_scope_start
|
||||||
let stream_descriptors = self.streams.get_mut(&name).unwrap();
|
let stream_descriptors = self.streams.get_mut(&name).unwrap();
|
||||||
// delete a stream after exit from a scope
|
// delete a stream after exit from a scope
|
||||||
@ -126,37 +141,57 @@ impl Streams {
|
|||||||
// streams should contain only non-empty stream embodiments
|
// streams should contain only non-empty stream embodiments
|
||||||
self.streams.remove(&name);
|
self.streams.remove(&name);
|
||||||
}
|
}
|
||||||
|
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;
|
||||||
|
|
||||||
self.collect_stream_generation(name, position, last_descriptor.stream.generations_count() as u32);
|
self.collect_stream_generation(name, position, gens_count as u32);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This method must be called at the end of execution, because it contains logic to collect
|
/// This method must be called at the end of execution, because it contains logic to collect
|
||||||
/// all global streams depending on their presence in a streams field.
|
/// all global streams depending on their presence in a streams field.
|
||||||
pub(crate) fn into_streams_data(self) -> (GlobalStreamGens, RestrictedStreamGens) {
|
pub(crate) fn into_streams_data(
|
||||||
|
self,
|
||||||
|
trace_ctx: &mut TraceHandler,
|
||||||
|
) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> {
|
||||||
// since it's called at the end of execution, streams contains only global ones,
|
// since it's called at the end of execution, streams contains only global ones,
|
||||||
// because all private's been deleted after exiting a scope
|
// because all private's been deleted after exiting a scope
|
||||||
let global_streams = self
|
let global_streams = self
|
||||||
.streams
|
.streams
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(name, mut descriptors)| {
|
.map(|(name, mut descriptors)| -> Result<_, ExecutionError> {
|
||||||
// unwrap is safe here because of invariant that streams contains non-empty vectors,
|
// unwrap is safe here because of invariant that streams contains non-empty vectors,
|
||||||
// moreover it must contain only one value, because this method is called at the end
|
// moreover it must contain only one value, because this method is called at the end
|
||||||
// of the execution
|
// of the execution
|
||||||
let stream = descriptors.pop().unwrap().stream;
|
let stream = descriptors.pop().unwrap().stream;
|
||||||
(name, stream.generations_count() as u32)
|
let gens_count = stream.compactify(trace_ctx)?;
|
||||||
|
Ok((name, gens_count as u32))
|
||||||
})
|
})
|
||||||
.collect::<GlobalStreamGens>();
|
.collect::<Result<GlobalStreamGens, _>>()?;
|
||||||
|
|
||||||
(global_streams, self.new_restricted_stream_gens)
|
Ok((global_streams, self.new_restricted_stream_gens))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> u32 {
|
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) {
|
||||||
self.previous_restricted_stream_gens
|
let previous_generation =
|
||||||
|
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
|
||||||
|
.unwrap_or_default();
|
||||||
|
let current_generation =
|
||||||
|
Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
(previous_generation, current_generation)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn restricted_stream_generation(
|
||||||
|
restricted_stream_gens: &RestrictedStreamGens,
|
||||||
|
name: &str,
|
||||||
|
position: AirPos,
|
||||||
|
iteration: usize,
|
||||||
|
) -> Option<u32> {
|
||||||
|
restricted_stream_gens
|
||||||
.get(name)
|
.get(name)
|
||||||
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
|
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
|
||||||
.copied()
|
.copied()
|
||||||
.unwrap_or_default()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {
|
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {
|
||||||
|
@ -21,15 +21,29 @@ use air_interpreter_data::GlobalStreamGens;
|
|||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
pub(super) fn prepare_global_streams(
|
pub(super) fn merge_global_streams(
|
||||||
previous_global_streams: &GlobalStreamGens,
|
previous_global_streams: &GlobalStreamGens,
|
||||||
|
current_global_streams: &GlobalStreamGens,
|
||||||
) -> HashMap<String, Vec<StreamDescriptor>> {
|
) -> HashMap<String, Vec<StreamDescriptor>> {
|
||||||
previous_global_streams
|
let mut global_streams = previous_global_streams
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(stream_name, &prev_gens_count)| {
|
.map(|(stream_name, &prev_gens_count)| {
|
||||||
let global_stream = Stream::from_generations_count(prev_gens_count as usize);
|
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
|
||||||
|
let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
|
||||||
let descriptor = StreamDescriptor::global(global_stream);
|
let descriptor = StreamDescriptor::global(global_stream);
|
||||||
(stream_name.to_string(), vec![descriptor])
|
(stream_name.to_string(), vec![descriptor])
|
||||||
})
|
})
|
||||||
.collect::<HashMap<_, _>>()
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
for (stream_name, ¤t_gens_count) in current_global_streams {
|
||||||
|
if previous_global_streams.contains_key(stream_name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
|
||||||
|
let descriptor = StreamDescriptor::global(global_stream);
|
||||||
|
global_streams.insert(stream_name.clone(), vec![descriptor]);
|
||||||
|
}
|
||||||
|
|
||||||
|
global_streams
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
use super::FarewellError;
|
use super::FarewellError;
|
||||||
use crate::execution_step::ExecutionCtx;
|
use crate::execution_step::ExecutionCtx;
|
||||||
use crate::execution_step::TraceHandler;
|
use crate::execution_step::TraceHandler;
|
||||||
|
use crate::ExecutionError;
|
||||||
use crate::InterpreterOutcome;
|
use crate::InterpreterOutcome;
|
||||||
use crate::ToErrorCode;
|
use crate::ToErrorCode;
|
||||||
use crate::INTERPRETER_SUCCESS;
|
use crate::INTERPRETER_SUCCESS;
|
||||||
@ -81,11 +82,18 @@ pub(crate) fn from_execution_error(
|
|||||||
#[tracing::instrument(skip(exec_ctx, trace_handler), level = "info")]
|
#[tracing::instrument(skip(exec_ctx, trace_handler), level = "info")]
|
||||||
fn populate_outcome_from_contexts(
|
fn populate_outcome_from_contexts(
|
||||||
exec_ctx: ExecutionCtx<'_>,
|
exec_ctx: ExecutionCtx<'_>,
|
||||||
trace_handler: TraceHandler,
|
mut trace_handler: TraceHandler,
|
||||||
ret_code: i64,
|
ret_code: i64,
|
||||||
error_message: String,
|
error_message: String,
|
||||||
) -> InterpreterOutcome {
|
) -> InterpreterOutcome {
|
||||||
let (global_streams, restricted_streams) = exec_ctx.streams.into_streams_data();
|
let maybe_gens = exec_ctx
|
||||||
|
.streams
|
||||||
|
.into_streams_data(&mut trace_handler)
|
||||||
|
.map_err(execution_error_into_outcome);
|
||||||
|
let (global_streams, restricted_streams) = match maybe_gens {
|
||||||
|
Ok(gens) => gens,
|
||||||
|
Err(outcome) => return outcome,
|
||||||
|
};
|
||||||
|
|
||||||
let data = InterpreterData::from_execution_result(
|
let data = InterpreterData::from_execution_result(
|
||||||
trace_handler.into_result_trace(),
|
trace_handler.into_result_trace(),
|
||||||
@ -108,6 +116,12 @@ fn populate_outcome_from_contexts(
|
|||||||
InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests)
|
InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is called only if there is an internal error in the interpreter and
|
||||||
|
// new execution trace was corrupted
|
||||||
|
fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome {
|
||||||
|
InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![])
|
||||||
|
}
|
||||||
|
|
||||||
/// Deduplicate values in a supplied vector.
|
/// Deduplicate values in a supplied vector.
|
||||||
fn dedup<T: Eq + Hash + Debug>(mut vec: Vec<T>) -> Vec<T> {
|
fn dedup<T: Eq + Hash + Debug>(mut vec: Vec<T>) -> Vec<T> {
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
@ -45,7 +45,7 @@ pub(crate) fn prepare<'i>(
|
|||||||
|
|
||||||
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
|
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
|
||||||
|
|
||||||
let exec_ctx = make_exec_ctx(&prev_data, call_results, run_parameters)?;
|
let exec_ctx = make_exec_ctx(&prev_data, ¤t_data, call_results, run_parameters)?;
|
||||||
let trace_handler = TraceHandler::from_data(prev_data, current_data);
|
let trace_handler = TraceHandler::from_data(prev_data, current_data);
|
||||||
|
|
||||||
let result = PreparationDescriptor {
|
let result = PreparationDescriptor {
|
||||||
@ -66,12 +66,13 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult<InterpreterData> {
|
|||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
fn make_exec_ctx(
|
fn make_exec_ctx(
|
||||||
prev_data: &InterpreterData,
|
prev_data: &InterpreterData,
|
||||||
|
current_data: &InterpreterData,
|
||||||
call_results: &[u8],
|
call_results: &[u8],
|
||||||
run_parameters: RunParameters,
|
run_parameters: RunParameters,
|
||||||
) -> PreparationResult<ExecutionCtx<'static>> {
|
) -> PreparationResult<ExecutionCtx<'static>> {
|
||||||
let call_results = serde_json::from_slice(call_results)
|
let call_results = serde_json::from_slice(call_results)
|
||||||
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
|
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
|
||||||
|
|
||||||
let ctx = ExecutionCtx::new(prev_data, call_results, run_parameters);
|
let ctx = ExecutionCtx::new(prev_data, current_data, call_results, run_parameters);
|
||||||
Ok(ctx)
|
Ok(ctx)
|
||||||
}
|
}
|
||||||
|
@ -290,8 +290,8 @@ fn fold_merge() {
|
|||||||
.get("$stream_2")
|
.get("$stream_2")
|
||||||
.expect("$stream_2 should be present in data");
|
.expect("$stream_2 should be present in data");
|
||||||
|
|
||||||
assert_eq!(*stream_1_generations, 4);
|
assert_eq!(*stream_1_generations, 8);
|
||||||
assert_eq!(*stream_2_generations, 3);
|
assert_eq!(*stream_2_generations, 6);
|
||||||
|
|
||||||
let mut fold_states_count = 0;
|
let mut fold_states_count = 0;
|
||||||
let mut calls_count = HashMap::new();
|
let mut calls_count = HashMap::new();
|
||||||
|
@ -125,3 +125,140 @@ fn merging_fold_iterations_extensively() {
|
|||||||
|
|
||||||
assert_eq!(last_fold.lore.len(), 18);
|
assert_eq!(last_fold.lore.len(), 18);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merging_fold_iterations_extensively_2() {
|
||||||
|
let script = r#"
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "client" ("get" "data") [] permutations) ; ok = [["p1",[[["p1",1],["p2",2],["p3",3]],[["p1",4],["p3",5],["p2",6]]]],["p2",[[["p2",7],["p1",8],["p3",9]],[["p2",10],["p3",11],["p1",12]]]],["p3",[[["p3",13],["p1",14],["p2",15]],[["p3",16],["p2",17],["p1",18]]]]]
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(fold permutations pair
|
||||||
|
(seq
|
||||||
|
(null)
|
||||||
|
(seq
|
||||||
|
(fold pair.$.[1] pid-num-arr
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call pair.$.[0] ("op" "noop") []) ; ok = null
|
||||||
|
(ap pid-num-arr $pid-num-arrs)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(null)
|
||||||
|
(next pid-num-arr)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(next pair)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(call "p1" ("test" "print") [$pid-num-arrs]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(canon "p1" $pid-num-arrs #pid-num-arrs)
|
||||||
|
(call "p1" ("test" "print") [$pid-num-arrs]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
(new $result
|
||||||
|
(fold $pid-num-arrs pid-num-arr
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "p1" ("test" "print") [pid-num-arr]) ; behaviour = echo
|
||||||
|
(fold pid-num-arr pid-num
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(null)
|
||||||
|
(seq
|
||||||
|
(call pid-num.$.[0] ("op" "noop") []) ; ok = null
|
||||||
|
(ap pid-num.$.[1] $result)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(canon pid-num.$.[0] $result #mon_res)
|
||||||
|
(call pid-num.$.[0] ("test" "print") [#mon_res]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
(next pid-num)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(canon "p1" $result #mon_res)
|
||||||
|
(call "p1" ("test" "print") [#mon_res]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
(xor
|
||||||
|
(match #mon_res.length 18
|
||||||
|
(call "p1" ("test" "print") [#mon_res.length]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(call "p1" ("test" "print") ["not enought length"]) ; behaviour = echo
|
||||||
|
(next pid-num-arr)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(call "p1" ("op" "noop") ["final p1"]) ; behaviour = echo
|
||||||
|
(seq
|
||||||
|
(canon "client" $result #end_result)
|
||||||
|
(call "p1" ("return" "") [#end_result]) ; behaviour = echo
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let engine = TestExecutor::new(
|
||||||
|
TestRunParameters::from_init_peer_id("client"),
|
||||||
|
vec![],
|
||||||
|
vec!["relay", "p1", "p2", "p3"].into_iter().map(Into::into),
|
||||||
|
&script,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut queue = std::collections::vec_deque::VecDeque::new();
|
||||||
|
let mut p1_outcomes = Vec::<RawAVMOutcome>::new();
|
||||||
|
queue.push_back("client".to_string());
|
||||||
|
|
||||||
|
while !queue.is_empty() {
|
||||||
|
let peer = queue.pop_front().unwrap();
|
||||||
|
if let Some(outcomes) = engine.execution_iter(peer.as_str()) {
|
||||||
|
for outcome in outcomes {
|
||||||
|
assert_eq!(outcome.ret_code, 0, "{:?}", outcome);
|
||||||
|
|
||||||
|
for peer in &outcome.next_peer_pks {
|
||||||
|
if !queue.contains(peer) {
|
||||||
|
queue.push_back(peer.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if peer == "p1" {
|
||||||
|
p1_outcomes.push(outcome);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!("peer: {}, no executions", peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let last_p1_data = p1_outcomes.last().unwrap();
|
||||||
|
let last_p1_trace = trace_from_result(last_p1_data);
|
||||||
|
let last_fold = last_p1_trace
|
||||||
|
.iter()
|
||||||
|
.filter_map(|state| match state {
|
||||||
|
ExecutedState::Fold(fold_result) => Some(fold_result),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.last()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(last_fold.lore.len(), 6);
|
||||||
|
}
|
||||||
|
@ -115,9 +115,9 @@ fn par_early_exit() {
|
|||||||
executed_state::par(5, 1),
|
executed_state::par(5, 1),
|
||||||
executed_state::par(3, 1),
|
executed_state::par(3, 1),
|
||||||
executed_state::par(1, 1),
|
executed_state::par(1, 1),
|
||||||
executed_state::stream_string("1", 0),
|
executed_state::stream_string("1", 1),
|
||||||
executed_state::stream_string("2", 0),
|
executed_state::stream_string("2", 2),
|
||||||
executed_state::stream_string("1", 0),
|
executed_state::stream_string("1", 1),
|
||||||
executed_state::stream_string("success result from fallible_call_service", 0),
|
executed_state::stream_string("success result from fallible_call_service", 0),
|
||||||
executed_state::service_failed(1, "failed result from fallible_call_service"),
|
executed_state::service_failed(1, "failed result from fallible_call_service"),
|
||||||
executed_state::stream_string("success result from fallible_call_service", 0),
|
executed_state::stream_string("success result from fallible_call_service", 0),
|
||||||
@ -155,7 +155,7 @@ fn par_early_exit() {
|
|||||||
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
|
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
|
||||||
prev_value: Value::Stream {
|
prev_value: Value::Stream {
|
||||||
value: rc!(json!("1")),
|
value: rc!(json!("1")),
|
||||||
generation: 0,
|
generation: 1,
|
||||||
},
|
},
|
||||||
current_value: Value::Stream {
|
current_value: Value::Stream {
|
||||||
value: rc!(json!("non_exist_value")),
|
value: rc!(json!("non_exist_value")),
|
||||||
@ -302,44 +302,44 @@ fn fold_par_early_exit() {
|
|||||||
executed_state::scalar_string_array(vec!["c1", "c2"]),
|
executed_state::scalar_string_array(vec!["c1", "c2"]),
|
||||||
executed_state::scalar_string_array(vec!["d1", "d2"]),
|
executed_state::scalar_string_array(vec!["d1", "d2"]),
|
||||||
executed_state::stream_string("a1", 0),
|
executed_state::stream_string("a1", 0),
|
||||||
executed_state::stream_string("a2", 0),
|
executed_state::stream_string("a2", 1),
|
||||||
executed_state::stream_string("b1", 0),
|
executed_state::stream_string("b1", 0),
|
||||||
executed_state::stream_string("b2", 0),
|
executed_state::stream_string("b2", 1),
|
||||||
executed_state::stream_string("c1", 0),
|
executed_state::stream_string("c1", 0),
|
||||||
executed_state::stream_string("c2", 0),
|
executed_state::stream_string("c2", 1),
|
||||||
executed_state::stream_string("d1", 0),
|
executed_state::stream_string("d1", 0),
|
||||||
executed_state::stream_string("d2", 0),
|
executed_state::stream_string("d2", 1),
|
||||||
executed_state::par(69, 1),
|
executed_state::par(69, 1),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(82, 0)),
|
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(48, 0)),
|
||||||
executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)),
|
executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)),
|
||||||
]),
|
]),
|
||||||
executed_state::par(33, 34),
|
executed_state::par(33, 0),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(48, 0)),
|
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(32, 0)),
|
||||||
executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)),
|
executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)),
|
||||||
]),
|
]),
|
||||||
executed_state::par(15, 16),
|
executed_state::par(15, 0),
|
||||||
executed_state::par(13, 1),
|
executed_state::par(13, 1),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(31, 0)),
|
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(25, 0)),
|
||||||
executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)),
|
executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)),
|
||||||
]),
|
]),
|
||||||
executed_state::par(5, 6),
|
executed_state::par(5, 0),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(25, 0)),
|
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(23, 0)),
|
||||||
executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)),
|
executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)),
|
||||||
]),
|
]),
|
||||||
executed_state::par(1, 2),
|
executed_state::par(1, 0),
|
||||||
executed_state::scalar_string(unit_call_service_result),
|
executed_state::scalar_string(unit_call_service_result),
|
||||||
executed_state::par(1, 0),
|
executed_state::par(1, 0),
|
||||||
executed_state::scalar_string(unit_call_service_result),
|
executed_state::scalar_string(unit_call_service_result),
|
||||||
executed_state::par(5, 0),
|
executed_state::par(5, 0),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(31, 0)),
|
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(29, 0)),
|
||||||
executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)),
|
executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)),
|
||||||
]),
|
]),
|
||||||
executed_state::par(1, 2),
|
executed_state::par(1, 0),
|
||||||
executed_state::scalar_string(unit_call_service_result),
|
executed_state::scalar_string(unit_call_service_result),
|
||||||
executed_state::par(1, 0),
|
executed_state::par(1, 0),
|
||||||
executed_state::scalar_string(unit_call_service_result),
|
executed_state::scalar_string(unit_call_service_result),
|
||||||
@ -347,7 +347,7 @@ fn fold_par_early_exit() {
|
|||||||
executed_state::par(15, 0),
|
executed_state::par(15, 0),
|
||||||
executed_state::par(13, 1),
|
executed_state::par(13, 1),
|
||||||
executed_state::fold(vec![
|
executed_state::fold(vec![
|
||||||
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(47, 0)),
|
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(41, 0)),
|
||||||
executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)),
|
executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)),
|
||||||
]),
|
]),
|
||||||
];
|
];
|
||||||
|
@ -130,7 +130,7 @@ fn canon_fixes_stream_correct() {
|
|||||||
executed_state::stream_number(1, 0),
|
executed_state::stream_number(1, 0),
|
||||||
executed_state::par(1, 1),
|
executed_state::par(1, 1),
|
||||||
executed_state::stream_number(2, 1),
|
executed_state::stream_number(2, 1),
|
||||||
executed_state::stream_number(3, 1),
|
executed_state::stream_number(3, 2),
|
||||||
executed_state::scalar_number(4),
|
executed_state::scalar_number(4),
|
||||||
executed_state::canon(
|
executed_state::canon(
|
||||||
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "peer_id_3", "service_id": ""},
|
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "peer_id_3", "service_id": ""},
|
||||||
|
@ -55,7 +55,7 @@ fn issue_302() {
|
|||||||
executed_state::par(1, 3),
|
executed_state::par(1, 3),
|
||||||
executed_state::stream_number(2, 1),
|
executed_state::stream_number(2, 1),
|
||||||
executed_state::stream_number(1, 0),
|
executed_state::stream_number(1, 0),
|
||||||
executed_state::stream_number(0, 1),
|
executed_state::stream_number(0, 2),
|
||||||
executed_state::scalar(json!([1, 2, 0])),
|
executed_state::scalar(json!([1, 2, 0])),
|
||||||
];
|
];
|
||||||
assert_eq!(actual_trace.deref(), expected_trace);
|
assert_eq!(actual_trace.deref(), expected_trace);
|
||||||
|
@ -18,6 +18,8 @@ use super::data_keeper::KeeperError;
|
|||||||
use super::merger::MergeError;
|
use super::merger::MergeError;
|
||||||
use super::state_automata::StateFSMError;
|
use super::state_automata::StateFSMError;
|
||||||
|
|
||||||
|
use air_interpreter_data::ExecutedState;
|
||||||
|
use air_interpreter_data::TracePos;
|
||||||
use thiserror::Error as ThisError;
|
use thiserror::Error as ThisError;
|
||||||
|
|
||||||
/// Errors arose out of merging previous data with a new.
|
/// Errors arose out of merging previous data with a new.
|
||||||
@ -33,3 +35,25 @@ pub enum TraceHandlerError {
|
|||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
StateFSMError(#[from] StateFSMError),
|
StateFSMError(#[from] StateFSMError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(ThisError, Debug)]
|
||||||
|
#[allow(clippy::enum_variant_names)]
|
||||||
|
pub enum GenerationCompatificationError {
|
||||||
|
#[error("trying to change generation of an invalid trace position {0}")]
|
||||||
|
TracePosPointsToNowhere(TracePos),
|
||||||
|
|
||||||
|
#[error(
|
||||||
|
"trying to change generation of a state {state} on {position} position, the state doesn't contain generation"
|
||||||
|
)]
|
||||||
|
TracePosPointsToInvalidState { position: TracePos, state: ExecutedState },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GenerationCompatificationError {
|
||||||
|
pub fn points_to_nowhere(position: TracePos) -> Self {
|
||||||
|
GenerationCompatificationError::TracePosPointsToNowhere(position)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self {
|
||||||
|
GenerationCompatificationError::TracePosPointsToInvalidState { position, state }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -55,6 +55,34 @@ impl TraceHandler {
|
|||||||
|
|
||||||
(prev_len, current_len)
|
(prev_len, current_len)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update_generation(
|
||||||
|
&mut self,
|
||||||
|
trace_pos: TracePos,
|
||||||
|
generation: u32,
|
||||||
|
) -> Result<(), GenerationCompatificationError> {
|
||||||
|
let state = self
|
||||||
|
.data_keeper
|
||||||
|
.result_trace
|
||||||
|
.get_mut(trace_pos)
|
||||||
|
.ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?;
|
||||||
|
|
||||||
|
match state {
|
||||||
|
ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation],
|
||||||
|
ExecutedState::Call(CallResult::Executed(Value::Stream {
|
||||||
|
generation: call_generation,
|
||||||
|
..
|
||||||
|
})) => *call_generation = generation,
|
||||||
|
state => {
|
||||||
|
return Err(GenerationCompatificationError::points_to_invalid_state(
|
||||||
|
trace_pos,
|
||||||
|
state.clone(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TraceHandler {
|
impl TraceHandler {
|
||||||
|
@ -32,6 +32,7 @@ mod handler;
|
|||||||
pub mod merger;
|
pub mod merger;
|
||||||
mod state_automata;
|
mod state_automata;
|
||||||
|
|
||||||
|
pub use errors::GenerationCompatificationError;
|
||||||
pub use errors::TraceHandlerError;
|
pub use errors::TraceHandlerError;
|
||||||
pub use handler::TraceHandler;
|
pub use handler::TraceHandler;
|
||||||
pub use state_automata::SubgraphType;
|
pub use state_automata::SubgraphType;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user