fix(execution-engine) Restricted stream bugfix (#337)

Use proper stream generation structures for call results merged from current data.

Closes #302.

Co-authored-by: vms <michail.vms@gmail.com>
This commit is contained in:
Ivan Boldyrev 2022-09-29 23:10:24 +03:00 committed by GitHub
parent 8889291af8
commit 2e98712cb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 163 additions and 76 deletions

View File

@ -23,6 +23,7 @@ use air_interpreter_data::CallResult;
use air_interpreter_data::TracePos;
use air_interpreter_data::Value;
use air_parser::ast::CallOutputValue;
use air_trace_handler::PreparationScheme;
use air_trace_handler::TraceHandler;
/// Writes result of a local `Call` instruction to `ExecutionCtx` at `output`.
@ -50,23 +51,41 @@ pub(crate) fn set_local_result<'i>(
}
pub(crate) fn set_result_from_value<'i>(
value: Value,
value: &mut Value,
tetraplet: RcSecurityTetraplet,
trace_pos: TracePos,
scheme: PreparationScheme,
output: &CallOutputValue<'i>,
exec_ctx: &mut ExecutionCtx<'i>,
) -> ExecutionResult<()> {
match (output, value) {
(CallOutputValue::Scalar(scalar), Value::Scalar(value)) => {
let result = ValueAggregate::new(value, tetraplet, trace_pos);
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
}
(CallOutputValue::Stream(stream), Value::Stream { value, generation }) => {
let result = ValueAggregate::new(value, tetraplet, trace_pos);
let generation = Generation::Nth(generation);
let _ = exec_ctx
(
CallOutputValue::Stream(stream),
Value::Stream {
value,
generation: stream_generation,
},
) => {
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
let generation = match scheme {
PreparationScheme::Both | PreparationScheme::Previous => {
assert_ne!(*stream_generation, u32::MAX, "Should be valid");
Generation::Nth(*stream_generation)
}
PreparationScheme::Current => {
assert_eq!(*stream_generation, u32::MAX, "Shouldn't be valid");
Generation::Last
}
};
let generation = exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)?;
// Update value's generation
*stream_generation = generation;
}
// it isn't needed to check there that output and value matches because
// it's been already checked in trace handler

View File

@ -24,6 +24,7 @@ use air_interpreter_data::Sender;
use air_interpreter_data::TracePos;
use air_interpreter_interface::CallServiceResult;
use air_parser::ast::CallOutputValue;
use air_trace_handler::PreparationScheme;
use air_trace_handler::TraceHandler;
use fstrings::f;
@ -40,14 +41,15 @@ pub(crate) struct StateDescriptor {
pub(super) fn handle_prev_state<'i>(
tetraplet: &RcSecurityTetraplet,
output: &CallOutputValue<'i>,
prev_result: CallResult,
mut prev_result: CallResult,
trace_pos: TracePos,
scheme: PreparationScheme,
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<StateDescriptor> {
use CallResult::*;
match &prev_result {
match &mut prev_result {
// this call was failed on one of the previous executions,
// here it's needed to bubble this special error up
CallServiceFailed(ret_code, err_msg) => {
@ -84,8 +86,8 @@ pub(super) fn handle_prev_state<'i>(
Ok(StateDescriptor::cant_execute_now(prev_result))
}
// this instruction's been already executed
Executed(value) => {
set_result_from_value(value.clone(), tetraplet.clone(), trace_pos, output, exec_ctx)?;
Executed(ref mut value) => {
set_result_from_value(&mut *value, tetraplet.clone(), trace_pos, scheme, output, exec_ctx)?;
trace_ctx.meet_call_end(prev_result);
Ok(StateDescriptor::executed())

View File

@ -156,8 +156,13 @@ impl<'i> ResolvedCall<'i> {
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<StateDescriptor> {
let (call_result, trace_pos) = match trace_to_exec_err!(trace_ctx.meet_call_start(&self.output), raw_call)? {
MergerCallResult::CallResult { value, trace_pos } => (value, trace_pos),
let prev_result = trace_ctx.meet_call_start(&self.output);
let (call_result, trace_pos, scheme) = match trace_to_exec_err!(prev_result, raw_call)? {
MergerCallResult::CallResult {
value,
trace_pos,
scheme,
} => (value, trace_pos, scheme),
MergerCallResult::Empty => return Ok(StateDescriptor::no_previous_state()),
};
@ -166,6 +171,7 @@ impl<'i> ResolvedCall<'i> {
&self.output,
call_result,
trace_pos,
scheme,
exec_ctx,
trace_ctx,
)

View File

@ -16,7 +16,10 @@
use air_test_utils::prelude::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::ops::Deref;
#[test]
fn merge_streams_in_two_fold() {
@ -130,7 +133,7 @@ fn merge_streams_in_two_fold() {
request_sent_by(vm_1_peer_id),
];
assert_eq!(actual_trace_3, expected_trace_3);
assert_eq!(actual_trace_3.deref(), expected_trace_3);
assert!(result_3.next_peer_pks.is_empty());
let actual_trace_4 = trace_from_result(&result_4);

View File

@ -16,6 +16,10 @@
use air_test_utils::prelude::*;
use pretty_assertions::assert_eq;
use std::ops::Deref;
#[test]
fn empty_stream() {
fn arg_type_check_closure() -> CallServiceClosure {
@ -140,7 +144,7 @@ fn stream_merging_v0() {
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
];
assert_eq!(actual_trace_2, expected_trace_2);
assert_eq!(actual_trace_2.deref(), expected_trace_2);
let executor_result_3 = checked_call_vm!(
executor,

View File

@ -18,6 +18,9 @@ use air_test_utils::prelude::*;
use fstrings::f;
use fstrings::format_args_f;
use pretty_assertions::assert_eq;
use std::ops::Deref;
#[test]
fn canon_moves_execution_flow() {
@ -124,7 +127,7 @@ fn canon_fixes_stream_correct() {
executed_state::scalar(json!([2, 3])),
executed_state::scalar(json!([2, 3])),
];
assert_eq!(vm_1_result_2_trace, expected_vm_1_result_2_trace);
assert_eq!(vm_1_result_2_trace.deref(), expected_vm_1_result_2_trace);
}
#[test]

View File

@ -15,9 +15,10 @@
*/
use air_test_utils::prelude::*;
use pretty_assertions::assert_eq;
use std::ops::Deref;
#[test]
// test for github.com/fluencelabs/aquavm/issues/222
fn issue_222() {
@ -67,15 +68,15 @@ fn issue_222() {
let expected_trace = vec![
executed_state::par(3, 3),
executed_state::par(1, 1),
executed_state::stream(json!([1]), 0),
executed_state::stream(json!([1]), 1),
executed_state::stream(json!([2]), 0),
executed_state::fold(vec![
executed_state::subtrace_lore(2, SubTraceDesc::new(5.into(), 1), SubTraceDesc::new(7.into(), 0)),
executed_state::subtrace_lore(3, SubTraceDesc::new(6.into(), 1), SubTraceDesc::new(7.into(), 0)),
executed_state::subtrace_lore(3, SubTraceDesc::new(5.into(), 1), SubTraceDesc::new(6.into(), 0)),
executed_state::subtrace_lore(2, SubTraceDesc::new(6.into(), 1), SubTraceDesc::new(7.into(), 0)),
]),
executed_state::scalar(json!([1])),
executed_state::scalar(json!([2])),
executed_state::scalar(json!([1])),
];
assert_eq!(actual_trace, expected_trace);
assert_eq!(actual_trace.deref(), expected_trace);
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_test_utils::prelude::*;
use pretty_assertions::assert_eq;
use std::ops::Deref;
#[test]
// test for github.com/fluencelabs/aquavm/issues/302
fn issue_302() {
let peer_id_1 = "peer_id_1";
let mut peer_vm_1 = create_avm(echo_call_service(), peer_id_1);
let peer_id_2 = "peer_id_2";
let mut peer_vm_2 = create_avm(echo_call_service(), peer_id_2);
let peer_id_3 = "peer_id_3";
let mut peer_vm_3 = create_avm(echo_call_service(), peer_id_3);
let script = f!(r#"
(new $stream
(par
(call "{peer_id_1}" ("" "") [2] $stream)
(seq
(call "{peer_id_2}" ("" "") [1] $stream)
(seq
(call "{peer_id_3}" ("" "") [0] $stream)
(call "{peer_id_2}" ("" "") [$stream])
)
)
)
)
"#);
let result_1 = checked_call_vm!(peer_vm_2, <_>::default(), &script, "", "");
let result_2 = checked_call_vm!(peer_vm_1, <_>::default(), &script, "", result_1.data.clone());
let result_3 = checked_call_vm!(peer_vm_3, <_>::default(), &script, "", result_2.data);
let result_4 = checked_call_vm!(peer_vm_2, <_>::default(), &script, result_1.data, result_3.data);
let actual_trace = trace_from_result(&result_4);
let expected_trace = vec![
executed_state::par(1, 3),
executed_state::stream_number(2, 1),
executed_state::stream_number(1, 0),
executed_state::stream_number(0, 1),
executed_state::scalar(json!([1, 2, 0])),
];
assert_eq!(actual_trace.deref(), expected_trace);
}

View File

@ -28,6 +28,7 @@ mod issue_222;
mod issue_241;
mod issue_295;
mod issue_300;
mod issue_302;
mod issue_304;
mod issue_306;
mod issue_331;

View File

@ -20,16 +20,12 @@ use super::KeeperResult;
use super::TraceSlider;
use crate::TracePos;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::InterpreterData;
use std::collections::HashMap;
/// Contains all necessary information about data.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct MergeCtx {
pub slider: TraceSlider,
pub streams: GlobalStreamGens,
}
impl MergeCtx {
@ -37,19 +33,13 @@ impl MergeCtx {
pub(crate) fn from_trace(trace: ExecutionTrace) -> Self {
let slider = TraceSlider::new(trace);
Self {
slider,
streams: HashMap::new(),
}
Self { slider }
}
pub(crate) fn from_data(data: InterpreterData) -> Self {
let slider = TraceSlider::new(data.trace);
Self {
slider,
streams: data.global_streams,
}
Self { slider }
}
pub(crate) fn try_get_generation(&self, position: TracePos) -> KeeperResult<u32> {
@ -72,8 +62,4 @@ impl MergeCtx {
state => Err(KeeperError::NoStreamState { state: state.clone() }),
}
}
pub(crate) fn stream_generation(&self, stream_name: &str) -> Option<u32> {
self.streams.get(stream_name).copied()
}
}

View File

@ -42,6 +42,7 @@ pub use merger::MergeError;
pub use merger::MergerApResult;
pub use merger::MergerCallResult;
pub use merger::MergerCanonResult;
pub use merger::PreparationScheme;
pub use state_automata::SubgraphType;
pub type TraceHandlerResult<T> = std::result::Result<T, TraceHandlerError>;

View File

@ -27,10 +27,13 @@ const EXPECTED_STATE_NAME: &str = "call";
pub enum MergerCallResult {
/// There is no corresponding state in a trace for this call.
Empty,
/// There was a state in at least one of the contexts. If there were two states in
/// both contexts, they were successfully merged.
CallResult { value: CallResult, trace_pos: TracePos },
CallResult {
value: CallResult,
trace_pos: TracePos,
scheme: PreparationScheme,
},
}
pub(crate) fn try_merge_next_state_as_call(
@ -48,7 +51,7 @@ pub(crate) fn try_merge_next_state_as_call(
(Some(Call(prev_call)), Some(Call(current_call))) => (prev_call, current_call),
// this special case is needed to merge stream generation in a right way
(None, Some(Call(CallResult::Executed(value)))) => {
let call_result = merge_current_executed(value, value_type, data_keeper)?;
let call_result = merge_current_executed(value, value_type)?;
return Ok(prepare_call_result(call_result, Current, data_keeper));
}
(None, Some(Call(current_call))) => return Ok(prepare_call_result(current_call, Current, data_keeper)),
@ -63,8 +66,8 @@ pub(crate) fn try_merge_next_state_as_call(
}
};
let merged_call = merge_call_result(prev_call, current_call, value_type, data_keeper)?;
let call_result = prepare_call_result(merged_call, Both, data_keeper);
let (merged_call, scheme) = merge_call_result(prev_call, current_call, value_type)?;
let call_result = prepare_call_result(merged_call, scheme, data_keeper);
try_match_value_type(&call_result, value_type)?;
Ok(call_result)
@ -74,28 +77,28 @@ fn merge_call_result(
prev_call: CallResult,
current_call: CallResult,
value_type: ValueType<'_>,
data_keeper: &DataKeeper,
) -> MergeResult<CallResult> {
) -> MergeResult<(CallResult, PreparationScheme)> {
use CallResult::*;
use PreparationScheme::*;
let merged_state = match (prev_call, current_call) {
let (merged_state, scheme) = match (prev_call, current_call) {
(prev @ CallServiceFailed(..), current @ CallServiceFailed(..)) => {
check_equal(&prev, &current)?;
prev
(prev, Previous)
}
(RequestSentBy(_), current @ CallServiceFailed(..)) => current,
(prev @ CallServiceFailed(..), RequestSentBy(_)) => prev,
(RequestSentBy(_), current @ CallServiceFailed(..)) => (current, Current),
(prev @ CallServiceFailed(..), RequestSentBy(_)) => (prev, Previous),
// senders shouldn't be checked for equality, for more info please look at
// github.com/fluencelabs/aquavm/issues/137
(prev @ RequestSentBy(_), RequestSentBy(_)) => prev,
(prev @ RequestSentBy(_), RequestSentBy(_)) => (prev, Previous),
// this special case is needed to merge stream generation in a right way
(RequestSentBy(_), Executed(value)) => merge_current_executed(value, value_type, data_keeper)?,
(prev @ Executed(..), RequestSentBy(_)) => prev,
(Executed(prev_value), Executed(current_value)) => merge_executed(prev_value, current_value)?,
(RequestSentBy(_), Executed(value)) => (merge_current_executed(value, value_type)?, Current),
(prev @ Executed(..), RequestSentBy(_)) => (prev, Previous),
(Executed(prev_value), Executed(current_value)) => (merge_executed(prev_value, current_value)?, Both),
(prev_call, current_call) => return Err(CallResultError::incompatible_calls(prev_call, current_call)),
};
Ok(merged_state)
Ok((merged_state, scheme))
}
pub(super) fn prepare_call_result(
@ -106,7 +109,11 @@ pub(super) fn prepare_call_result(
let trace_pos = data_keeper.result_trace_next_pos();
prepare_positions_mapping(scheme, data_keeper);
MergerCallResult::CallResult { value, trace_pos }
MergerCallResult::CallResult {
value,
trace_pos,
scheme,
}
}
#[derive(Debug, Copy, Clone)]

View File

@ -65,16 +65,16 @@ fn are_streams_equal(
/// Merging of value from only current data to a stream is a something special, because it's
/// needed to choose generation not from current data, but a maximum from streams on a current peer.
/// Maximum versions are tracked in data in a special field called streams.
pub(super) fn merge_current_executed(
value: Value,
value_type: ValueType<'_>,
data_keeper: &DataKeeper,
) -> MergeResult<CallResult> {
pub(super) fn merge_current_executed(value: Value, value_type: ValueType<'_>) -> MergeResult<CallResult> {
match (value, value_type) {
(scalar @ Value::Scalar(_), ValueType::Scalar) => Ok(CallResult::Executed(scalar)),
(Value::Stream { value, .. }, ValueType::Stream(stream_name)) => {
let generation = data_keeper.prev_ctx.stream_generation(stream_name).unwrap_or_default();
let stream = Value::Stream { value, generation };
(Value::Stream { value, .. }, ValueType::Stream(_)) => {
// it is checked by an assertion
let canary_generation = u32::MAX;
let stream = Value::Stream {
value,
generation: canary_generation,
};
Ok(CallResult::Executed(stream))
}
(value, value_type) => Err(CallResultError::data_not_match(value, value_type)),

View File

@ -211,10 +211,7 @@ mod tests {
let fold_result = FoldResult { lore };
let slider = TraceSlider::new(vec![]);
let ctx = MergeCtx {
slider,
streams: <_>::default(),
};
let ctx = MergeCtx { slider };
let (all_states, convoluted_lens) =
compute_lens_convolution(&fold_result, &ctx).expect("convolution should be successful");
@ -244,10 +241,7 @@ mod tests {
let fold_result = FoldResult { lore };
let slider = TraceSlider::new(vec![ExecutedState::Ap(ApResult::new(vec![0]))]);
let ctx = MergeCtx {
slider,
streams: <_>::default(),
};
let ctx = MergeCtx { slider };
let (all_states, convoluted_lens) =
compute_lens_convolution(&fold_result, &ctx).expect("convolution should be successful");
@ -295,10 +289,7 @@ mod tests {
ExecutedState::Ap(ApResult::new(vec![1])),
ExecutedState::Ap(ApResult::new(vec![2])),
]);
let ctx = MergeCtx {
slider,
streams: <_>::default(),
};
let ctx = MergeCtx { slider };
let (all_states, convoluted_lens) =
compute_lens_convolution(&fold_result, &ctx).expect("convolution should be successful");

View File

@ -27,6 +27,7 @@ pub use call_merger::MergerCallResult;
pub use canon_merger::MergerCanonResult;
pub use fold_merger::MergerFoldResult;
pub use par_merger::MergerParResult;
pub use position_mapping::PreparationScheme;
pub use errors::ApResultError;
pub use errors::CallResultError;
@ -43,7 +44,6 @@ pub(crate) use fold_merger::try_merge_next_state_as_fold;
pub(crate) use par_merger::try_merge_next_state_as_par;
use position_mapping::prepare_positions_mapping;
use position_mapping::PreparationScheme;
type MergeResult<T> = std::result::Result<T, MergeError>;

View File

@ -16,7 +16,8 @@
use super::DataKeeper;
pub(super) enum PreparationScheme {
#[derive(Debug, Copy, Clone)]
pub enum PreparationScheme {
Previous,
Current,
Both,