fix(execution-engine): fix invalid iteration over stream (#362)

This PR is mostly a revertion of #357, that is needed to make stream work correctly in fold itrerations.

Closes #363.
This commit is contained in:
Mike Voronov 2022-10-11 01:41:22 +03:00 committed by GitHub
parent eafdec5d86
commit bf8aee7f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 319 additions and 227 deletions

View File

@ -1,3 +1,14 @@
## Version 0.31.0 (2022-10-11)
[PR 362](https://github.com/fluencelabs/aquavm/pull/362):
Partially revert [PR 340](https://github.com/fluencelabs/aquavm/pull/340) due to fixes in [PR 358](https://github.com/fluencelabs/aquavm/pull/358)
[PR 360](https://github.com/fluencelabs/aquavm/pull/360):
Allow using stream without insertion
[PR 358](https://github.com/fluencelabs/aquavm/pull/358):
Implement a temporary fix for fold with canon
## Version 0.30.0 (2022-09-28)
[PR 340](https://github.com/fluencelabs/aquavm/pull/340):

6
Cargo.lock generated
View File

@ -13,7 +13,7 @@ dependencies = [
[[package]]
name = "air"
version = "0.30.0"
version = "0.31.0"
dependencies = [
"air-execution-info-collector",
"air-interpreter-data",
@ -75,7 +75,7 @@ dependencies = [
[[package]]
name = "air-interpreter"
version = "0.30.0"
version = "0.31.0"
dependencies = [
"air",
"air-interpreter-interface",
@ -91,7 +91,7 @@ dependencies = [
[[package]]
name = "air-interpreter-data"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"air-parser",
"air-utils",

View File

@ -1,6 +1,6 @@
[package]
name = "air-interpreter"
version = "0.30.0"
version = "0.31.0"
description = "Crate-wrapper for air"
authors = ["Fluence Labs"]
edition = "2018"

View File

@ -1,6 +1,6 @@
[package]
name = "air"
version = "0.30.0"
version = "0.31.0"
description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network"
authors = ["Fluence Labs"]
edition = "2018"

View File

@ -32,7 +32,7 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> {
// any error. It's highly important to distinguish between global and restricted streams
// at the end of execution to make a correct data.
let instruction_result = self.instruction.execute(exec_ctx, trace_ctx);
let epilog_result = epilog(self, exec_ctx, trace_ctx);
let epilog_result = epilog(self, exec_ctx);
match (instruction_result, epilog_result) {
(Ok(()), Ok(())) => Ok(()),
@ -62,13 +62,11 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
exec_ctx.tracker.meet_new(position);
}
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> {
let position = new.span.left;
match &new.argument {
NewArgument::Stream(stream) => {
exec_ctx
.streams
.meet_scope_end(stream.name.to_string(), position, trace_ctx)?;
exec_ctx.streams.meet_scope_end(stream.name.to_string(), position)?;
Ok(())
}
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),

View File

@ -17,13 +17,10 @@
use super::ExecutionResult;
use super::ValueAggregate;
use crate::execution_step::CatchableError;
use crate::ExecutionError;
use crate::JValue;
use crate::UncatchableError;
use air_interpreter_data::TracePos;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use std::collections::HashMap;
use std::fmt::Formatter;
@ -38,25 +35,18 @@ pub struct Stream {
/// obtained values from a current_data that were not present in prev_data becomes a new generation.
values: Vec<Vec<ValueAggregate>>,
/// Count of values from previous data.
previous_gens_count: usize,
/// This map is intended to support canonicalized stream creation, such streams has
/// corresponding value positions in a data and this field are used to create such streams.
values_by_pos: HashMap<TracePos, StreamValueLocation>,
}
impl Stream {
pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self {
pub(crate) fn from_generations_count(previous_count: usize) -> Self {
let last_generation_count = 1;
// TODO: bubble up an overflow error instead of expect
let overall_count = previous_count
.checked_add(current_count)
.and_then(|value| value.checked_add(last_generation_count))
.expect("it shouldn't overflow");
let overall_gens_count = previous_count + last_generation_count;
Self {
values: vec![vec![]; overall_count],
previous_gens_count: previous_count,
values: vec![vec![]; overall_gens_count],
values_by_pos: HashMap::new(),
}
}
@ -69,7 +59,6 @@ impl Stream {
};
Self {
values: vec![vec![value]],
previous_gens_count: 0,
values_by_pos,
}
}
@ -85,7 +74,7 @@ impl Stream {
let generation = match (generation, source) {
(Generation::Last, _) => self.values.len() - 1,
(Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize,
(Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize,
(_, ValueSource::CurrentData) => self.values.len() - 1,
};
if generation >= self.values.len() {
@ -206,26 +195,6 @@ impl Stream {
Some(iter)
}
/// Removes empty generations updating data and returns final generation count.
pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult<usize> {
self.remove_empty_generations();
for (generation, values) in self.values.iter().enumerate() {
for value in values.iter() {
trace_ctx
.update_generation(value.trace_pos, generation as u32)
.map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?;
}
}
Ok(self.values.len())
}
/// Removes empty generations from current values.
fn remove_empty_generations(&mut self) {
self.values.retain(|values| !values.is_empty());
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -327,7 +296,7 @@ mod test {
fn test_slice_iter() {
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into());
let mut stream = Stream::from_generations_count(2, 0);
let mut stream = Stream::from_generations_count(2);
stream
.add_value(value_1, Generation::Nth(0), ValueSource::PreviousData)
@ -351,7 +320,7 @@ mod test {
#[test]
fn test_slice_on_empty_stream() {
let stream = Stream::from_generations_count(2, 0);
let stream = Stream::from_generations_count(2);
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
assert!(slice.is_none());
@ -370,7 +339,7 @@ mod test {
fn generation_from_current_data() {
let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into());
let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into());
let mut stream = Stream::from_generations_count(5, 5);
let mut stream = Stream::from_generations_count(5);
stream
.add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData)

View File

@ -20,7 +20,6 @@ use crate::ToErrorCode;
use air_interpreter_data::TracePos;
use air_interpreter_data::Value;
use air_trace_handler::merger::MergerApResult;
use air_trace_handler::GenerationCompatificationError;
use air_trace_handler::TraceHandlerError;
use strum::IntoEnumIterator;
use strum_macros::EnumDiscriminants;
@ -42,10 +41,6 @@ pub enum UncatchableError {
instruction: String,
},
/// These errors are related to internal bug in the interpreter when result trace is corrupted.
#[error(transparent)]
GenerationCompatificationError(#[from] GenerationCompatificationError),
/// Fold state wasn't found for such iterator name.
#[error("fold state not found for this iterable '{0}'")]
FoldStateNotFound(String),

View File

@ -66,19 +66,9 @@ pub(crate) struct ExecutionCtx<'i> {
}
impl<'i> ExecutionCtx<'i> {
pub(crate) fn new(
prev_data: &InterpreterData,
current_data: &InterpreterData,
call_results: CallResults,
run_parameters: RunParameters,
) -> Self {
pub(crate) fn new(prev_data: &InterpreterData, call_results: CallResults, run_parameters: RunParameters) -> Self {
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
let streams = Streams::from_data(
&prev_data.global_streams,
&current_data.global_streams,
prev_data.restricted_streams.clone(),
current_data.restricted_streams.clone(),
);
let streams = Streams::from_data(&prev_data.global_streams, prev_data.restricted_streams.clone());
Self {
run_parameters,

View File

@ -20,7 +20,6 @@ mod utils;
use crate::execution_step::ExecutionResult;
use crate::execution_step::Stream;
use crate::ExecutionError;
use stream_descriptor::*;
pub(crate) use stream_value_descriptor::StreamValueDescriptor;
@ -28,7 +27,6 @@ use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::TraceHandler;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
@ -44,10 +42,6 @@ pub(crate) struct Streams {
/// should have at the scope start.
previous_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations from current data that a restricted stream
/// should have at the scope start.
current_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
new_restricted_stream_gens: RestrictedStreamGens,
@ -56,16 +50,13 @@ pub(crate) struct Streams {
impl Streams {
pub(crate) fn from_data(
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
previous_restricted_stream_gens: RestrictedStreamGens,
current_restricted_stream_gens: RestrictedStreamGens,
) -> Self {
let streams = utils::merge_global_streams(previous_global_streams, current_global_streams);
let streams = utils::prepare_global_streams(previous_global_streams);
Self {
streams,
previous_restricted_stream_gens,
current_restricted_stream_gens,
new_restricted_stream_gens: <_>::default(),
}
}
@ -112,10 +103,9 @@ impl Streams {
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
let name = name.into();
let (prev_gens_count, current_gens_count) =
self.stream_generation_from_data(&name, span.left, iteration as usize);
let prev_gens_count = self.stream_generation_from_data(&name, span.left, iteration as usize);
let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let new_stream = Stream::from_generations_count(prev_gens_count as usize);
let new_descriptor = StreamDescriptor::restricted(new_stream, span);
match self.streams.entry(name) {
Occupied(mut entry) => {
@ -127,12 +117,7 @@ impl Streams {
}
}
pub(crate) fn meet_scope_end(
&mut self,
name: String,
position: AirPos,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
pub(crate) fn meet_scope_end(&mut self, name: String, position: AirPos) -> ExecutionResult<()> {
// unwraps are safe here because met_scope_end must be called after met_scope_start
let stream_descriptors = self.streams.get_mut(&name).unwrap();
// delete a stream after exit from a scope
@ -141,57 +126,37 @@ impl Streams {
// streams should contain only non-empty stream embodiments
self.streams.remove(&name);
}
let gens_count = last_descriptor.stream.compactify(trace_ctx)?;
self.collect_stream_generation(name, position, gens_count as u32);
self.collect_stream_generation(name, position, last_descriptor.stream.generations_count() as u32);
Ok(())
}
/// This method must be called at the end of execution, because it contains logic to collect
/// all global streams depending on their presence in a streams field.
pub(crate) fn into_streams_data(
self,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> {
pub(crate) fn into_streams_data(self) -> (GlobalStreamGens, RestrictedStreamGens) {
// since it's called at the end of execution, streams contains only global ones,
// because all private's been deleted after exiting a scope
let global_streams = self
.streams
.into_iter()
.map(|(name, mut descriptors)| -> Result<_, ExecutionError> {
.map(|(name, mut descriptors)| {
// unwrap is safe here because of invariant that streams contains non-empty vectors,
// moreover it must contain only one value, because this method is called at the end
// of the execution
let stream = descriptors.pop().unwrap().stream;
let gens_count = stream.compactify(trace_ctx)?;
Ok((name, gens_count as u32))
(name, stream.generations_count() as u32)
})
.collect::<Result<GlobalStreamGens, _>>()?;
.collect::<GlobalStreamGens>();
Ok((global_streams, self.new_restricted_stream_gens))
(global_streams, self.new_restricted_stream_gens)
}
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
let current_generation =
Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration)
.unwrap_or_default();
(previous_generation, current_generation)
}
fn restricted_stream_generation(
restricted_stream_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<u32> {
restricted_stream_gens
fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> u32 {
self.previous_restricted_stream_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
.unwrap_or_default()
}
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) {

View File

@ -21,29 +21,15 @@ use air_interpreter_data::GlobalStreamGens;
use std::collections::HashMap;
pub(super) fn merge_global_streams(
pub(super) fn prepare_global_streams(
previous_global_streams: &GlobalStreamGens,
current_global_streams: &GlobalStreamGens,
) -> HashMap<String, Vec<StreamDescriptor>> {
let mut global_streams = previous_global_streams
previous_global_streams
.iter()
.map(|(stream_name, &prev_gens_count)| {
let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default();
let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize);
let global_stream = Stream::from_generations_count(prev_gens_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
for (stream_name, &current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(stream_name) {
continue;
}
let global_stream = Stream::from_generations_count(0, current_gens_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
global_streams.insert(stream_name.clone(), vec![descriptor]);
}
global_streams
.collect::<HashMap<_, _>>()
}

View File

@ -17,7 +17,6 @@
use super::FarewellError;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::TraceHandler;
use crate::ExecutionError;
use crate::InterpreterOutcome;
use crate::ToErrorCode;
use crate::INTERPRETER_SUCCESS;
@ -82,18 +81,11 @@ pub(crate) fn from_execution_error(
#[tracing::instrument(skip(exec_ctx, trace_handler), level = "info")]
fn populate_outcome_from_contexts(
exec_ctx: ExecutionCtx<'_>,
mut trace_handler: TraceHandler,
trace_handler: TraceHandler,
ret_code: i64,
error_message: String,
) -> InterpreterOutcome {
let maybe_gens = exec_ctx
.streams
.into_streams_data(&mut trace_handler)
.map_err(execution_error_into_outcome);
let (global_streams, restricted_streams) = match maybe_gens {
Ok(gens) => gens,
Err(outcome) => return outcome,
};
let (global_streams, restricted_streams) = exec_ctx.streams.into_streams_data();
let data = InterpreterData::from_execution_result(
trace_handler.into_result_trace(),
@ -116,12 +108,6 @@ fn populate_outcome_from_contexts(
InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests)
}
// this method is called only if there is an internal error in the interpreter and
// new execution trace was corrupted
fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome {
InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![])
}
/// Deduplicate values in a supplied vector.
fn dedup<T: Eq + Hash + Debug>(mut vec: Vec<T>) -> Vec<T> {
use std::collections::HashSet;

View File

@ -45,7 +45,7 @@ pub(crate) fn prepare<'i>(
let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?;
let exec_ctx = make_exec_ctx(&prev_data, &current_data, call_results, run_parameters)?;
let exec_ctx = make_exec_ctx(&prev_data, call_results, run_parameters)?;
let trace_handler = TraceHandler::from_data(prev_data, current_data);
let result = PreparationDescriptor {
@ -66,13 +66,12 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult<InterpreterData> {
#[tracing::instrument(skip_all)]
fn make_exec_ctx(
prev_data: &InterpreterData,
current_data: &InterpreterData,
call_results: &[u8],
run_parameters: RunParameters,
) -> PreparationResult<ExecutionCtx<'static>> {
let call_results = serde_json::from_slice(call_results)
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
let ctx = ExecutionCtx::new(prev_data, current_data, call_results, run_parameters);
let ctx = ExecutionCtx::new(prev_data, call_results, run_parameters);
Ok(ctx)
}

View File

@ -290,8 +290,8 @@ fn fold_merge() {
.get("$stream_2")
.expect("$stream_2 should be present in data");
assert_eq!(*stream_1_generations, 8);
assert_eq!(*stream_2_generations, 6);
assert_eq!(*stream_1_generations, 4);
assert_eq!(*stream_2_generations, 3);
let mut fold_states_count = 0;
let mut calls_count = HashMap::new();

View File

@ -0,0 +1,127 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_test_framework::TestExecutor;
use air_test_utils::prelude::*;
#[test]
fn merging_fold_iterations_extensively() {
let script = r#"
(seq
(seq
(call "client" ("get" "data") [] permutations) ; ok = [["p1",[[["p1",1],["p2",2],["p3",3]],[["p1",4],["p3",5],["p2",6]]]],["p2",[[["p2",7],["p1",8],["p3",9]],[["p2",10],["p3",11],["p1",12]]]],["p3",[[["p3",13],["p1",14],["p2",15]],[["p3",16],["p2",17],["p1",18]]]]]
(seq
(fold permutations pair
(seq
(fold pair.$.[1] peer_ids
(seq
(seq
(call pair.$.[0] ("op" "noop") []) ; ok = null
(ap peer_ids $inner)
)
(next peer_ids)
)
)
(next pair)
)
)
(seq
(canon "relay" $inner #inner)
(fold $inner ns
(par
(fold ns pair
(seq
(seq
(call pair.$.[0] ("op" "noop") []) ; ok = null
(ap pair.$.[1] $result)
)
(next pair)
)
)
(next ns)
)
)
)
)
)
(seq
(new $monotonic_stream
(seq
(fold $result elem
(seq
(ap elem $monotonic_stream)
(seq
(canon "relay" $monotonic_stream #canon_stream)
(xor
(match #canon_stream.length 18
(null)
)
(next elem)
)
)
)
)
(canon "relay" $result #joined_result)
)
)
(call "client" ("return" "") [#inner #joined_result]) ; ok = null
)
)
"#;
let engine = TestExecutor::new(
TestRunParameters::from_init_peer_id("client"),
vec![],
vec!["relay", "p1", "p2", "p3"].into_iter().map(Into::into),
&script,
)
.unwrap();
let mut queue = std::collections::vec_deque::VecDeque::new();
let mut relay_outcomes = Vec::<RawAVMOutcome>::new();
queue.push_back("client".to_string());
while !queue.is_empty() {
let peer = queue.pop_front().unwrap();
if let Some(outcomes) = engine.execution_iter(peer.as_str()) {
for outcome in outcomes {
assert_eq!(outcome.ret_code, 0, "{:?}", outcome);
for peer in &outcome.next_peer_pks {
queue.push_back(peer.clone());
}
if peer == "relay" {
relay_outcomes.push(outcome);
}
}
} else {
println!("peer: {}, no executions", peer);
}
}
let last_relay_data = relay_outcomes.last().unwrap();
let last_relay_trace = trace_from_result(last_relay_data);
let last_fold = last_relay_trace
.iter()
.filter_map(|state| match state {
ExecutedState::Fold(fold_result) => Some(fold_result),
_ => None,
})
.last()
.unwrap();
assert_eq!(last_fold.lore.len(), 18);
}

View File

@ -15,6 +15,7 @@
*/
mod ap_with_fold;
mod merging;
mod recursive_streams;
mod streams;
mod streams_early_exit;

View File

@ -115,9 +115,9 @@ fn par_early_exit() {
executed_state::par(5, 1),
executed_state::par(3, 1),
executed_state::par(1, 1),
executed_state::stream_string("1", 1),
executed_state::stream_string("2", 2),
executed_state::stream_string("1", 1),
executed_state::stream_string("1", 0),
executed_state::stream_string("2", 0),
executed_state::stream_string("1", 0),
executed_state::stream_string("success result from fallible_call_service", 0),
executed_state::service_failed(1, "failed result from fallible_call_service"),
executed_state::stream_string("success result from fallible_call_service", 0),
@ -155,7 +155,7 @@ fn par_early_exit() {
trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual {
prev_value: Value::Stream {
value: rc!(json!("1")),
generation: 1,
generation: 0,
},
current_value: Value::Stream {
value: rc!(json!("non_exist_value")),
@ -302,44 +302,44 @@ fn fold_par_early_exit() {
executed_state::scalar_string_array(vec!["c1", "c2"]),
executed_state::scalar_string_array(vec!["d1", "d2"]),
executed_state::stream_string("a1", 0),
executed_state::stream_string("a2", 1),
executed_state::stream_string("a2", 0),
executed_state::stream_string("b1", 0),
executed_state::stream_string("b2", 1),
executed_state::stream_string("b2", 0),
executed_state::stream_string("c1", 0),
executed_state::stream_string("c2", 1),
executed_state::stream_string("c2", 0),
executed_state::stream_string("d1", 0),
executed_state::stream_string("d2", 1),
executed_state::stream_string("d2", 0),
executed_state::par(69, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(48, 0)),
executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(82, 0)),
executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)),
]),
executed_state::par(33, 0),
executed_state::par(33, 34),
executed_state::fold(vec![
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(32, 0)),
executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(48, 0)),
executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)),
]),
executed_state::par(15, 0),
executed_state::par(15, 16),
executed_state::par(13, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(25, 0)),
executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(31, 0)),
executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)),
]),
executed_state::par(5, 0),
executed_state::par(5, 6),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(23, 0)),
executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(25, 0)),
executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)),
]),
executed_state::par(1, 0),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(5, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(29, 0)),
executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(31, 0)),
executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)),
]),
executed_state::par(1, 0),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
@ -347,7 +347,7 @@ fn fold_par_early_exit() {
executed_state::par(15, 0),
executed_state::par(13, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(41, 0)),
executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(47, 0)),
executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)),
]),
];

View File

@ -130,7 +130,7 @@ fn canon_fixes_stream_correct() {
executed_state::stream_number(1, 0),
executed_state::par(1, 1),
executed_state::stream_number(2, 1),
executed_state::stream_number(3, 2),
executed_state::stream_number(3, 1),
executed_state::scalar_number(4),
executed_state::canon(
json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "peer_id_3", "service_id": ""},

View File

@ -55,7 +55,7 @@ fn issue_302() {
executed_state::par(1, 3),
executed_state::stream_number(2, 1),
executed_state::stream_number(1, 0),
executed_state::stream_number(0, 2),
executed_state::stream_number(0, 1),
executed_state::scalar(json!([1, 2, 0])),
];
assert_eq!(actual_trace.deref(), expected_trace);

View File

@ -0,0 +1,112 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_test_utils::prelude::*;
//https://github.com/fluencelabs/aquavm/issues/363
#[test]
fn issue_363() {
let client_peer_id = "client";
let mut client_vm = create_avm(
set_variable_call_service(json!([
["p1", [[["p1", 1], ["p2", 3]], [["p1", 4], ["p2", 5]]]],
["p2", [[["p2", 13], ["p1", 14]], [["p2", 16], ["p1", 18]]]]
])),
client_peer_id,
);
let p1_peer_id = "p1";
let mut p1_vm = create_avm(set_variable_call_service(json!("p1")), p1_peer_id);
let p2_peer_id = "p2";
let mut p2_vm = create_avm(set_variable_call_service(json!("p2")), p2_peer_id);
let script = r#"
(seq
(seq
(call "client" ("get" "data") [] permutations)
(seq
(fold permutations pair
(seq
(fold pair.$.[1] peer_ids
(seq
(seq
(call pair.$.[0] ("op" "noop") [])
(ap peer_ids $inner)
)
(next peer_ids)
)
)
(next pair)
)
)
(seq
(null)
(fold $inner ns
(par
(fold ns pair
(seq
(seq
(call pair.$.[0] ("op" "noop") [])
(ap pair.$.[1] $result)
)
(next pair)
)
)
(next ns)
)
)
)
)
) (null)
)
"#;
let client_result = checked_call_vm!(client_vm, <_>::default(), script, "", "");
let p1_result_1 = checked_call_vm!(p1_vm, <_>::default(), script, "", client_result.data.clone());
let p2_result_1 = checked_call_vm!(p2_vm, <_>::default(), script, "", p1_result_1.data.clone());
let p2_trace_1 = trace_from_result(&p2_result_1);
let fold_position = TracePos::from(9);
let fold_p2 = p2_trace_1.get(fold_position).unwrap();
if let ExecutedState::Fold(fold) = fold_p2 {
assert_eq!(fold.lore.len(), 4);
assert_eq!(fold.lore[0].subtraces_desc[0].subtrace_len, 2);
assert_eq!(fold.lore[1].subtraces_desc[0].subtrace_len, 2);
assert_eq!(fold.lore[2].subtraces_desc[0].subtrace_len, 4);
assert_eq!(fold.lore[3].subtraces_desc[0].subtrace_len, 4);
} else {
panic!("expected fold at pos 9")
}
let p1_result_2 = checked_call_vm!(
p1_vm,
<_>::default(),
script,
p1_result_1.data.clone(),
p2_result_1.data.clone()
);
let p1_trace_2 = trace_from_result(&p1_result_2);
let fold_p1 = p1_trace_2.get(fold_position).unwrap();
if let ExecutedState::Fold(fold) = fold_p1 {
assert_eq!(fold.lore.len(), 4);
assert_eq!(fold.lore[0].subtraces_desc[0].subtrace_len, 4);
assert_eq!(fold.lore[1].subtraces_desc[0].subtrace_len, 4);
assert_eq!(fold.lore[2].subtraces_desc[0].subtrace_len, 5);
assert_eq!(fold.lore[3].subtraces_desc[0].subtrace_len, 5);
} else {
panic!("expected fold at pos 9")
}
}

View File

@ -35,3 +35,4 @@ mod issue_331;
mod issue_346;
mod issue_348;
mod issue_356;
mod issue_363;

View File

@ -1,3 +1,8 @@
## Version 0.4.0
[PR 356](https://github.com/fluencelabs/aquavm/pull/358):
- temporary fix of a bug with fold and canon compatibility
## Version 0.3.0
[PR 292](https://github.com/fluencelabs/aquavm/pull/292):

View File

@ -1,7 +1,7 @@
[package]
name = "air-interpreter-data"
description = "Data format of the AIR interpreter"
version = "0.3.0"
version = "0.4.0"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"

View File

@ -18,8 +18,6 @@ use super::data_keeper::KeeperError;
use super::merger::MergeError;
use super::state_automata::StateFSMError;
use air_interpreter_data::ExecutedState;
use air_interpreter_data::TracePos;
use thiserror::Error as ThisError;
/// Errors arose out of merging previous data with a new.
@ -35,25 +33,3 @@ pub enum TraceHandlerError {
#[error(transparent)]
StateFSMError(#[from] StateFSMError),
}
#[derive(ThisError, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum GenerationCompatificationError {
#[error("trying to change generation of an invalid trace position {0}")]
TracePosPointsToNowhere(TracePos),
#[error(
"trying to change generation of a state {state} on {position} position, the state doesn't contain generation"
)]
TracePosPointsToInvalidState { position: TracePos, state: ExecutedState },
}
impl GenerationCompatificationError {
pub fn points_to_nowhere(position: TracePos) -> Self {
GenerationCompatificationError::TracePosPointsToNowhere(position)
}
pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self {
GenerationCompatificationError::TracePosPointsToInvalidState { position, state }
}
}

View File

@ -55,34 +55,6 @@ impl TraceHandler {
(prev_len, current_len)
}
pub fn update_generation(
&mut self,
trace_pos: TracePos,
generation: u32,
) -> Result<(), GenerationCompatificationError> {
let state = self
.data_keeper
.result_trace
.get_mut(trace_pos)
.ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?;
match state {
ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation],
ExecutedState::Call(CallResult::Executed(Value::Stream {
generation: call_generation,
..
})) => *call_generation = generation,
state => {
return Err(GenerationCompatificationError::points_to_invalid_state(
trace_pos,
state.clone(),
))
}
}
Ok(())
}
}
impl TraceHandler {

View File

@ -32,7 +32,6 @@ mod handler;
pub mod merger;
mod state_automata;
pub use errors::GenerationCompatificationError;
pub use errors::TraceHandlerError;
pub use handler::TraceHandler;
pub use state_automata::SubgraphType;