Make fold by streams truly deterministic (#228)

This commit is contained in:
Mike Voronov 2022-03-10 01:32:07 +03:00 committed by GitHub
parent a64a4fc0a6
commit 914ad74516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 157 additions and 106 deletions

View File

@ -45,48 +45,68 @@ impl<'i> super::ExecutableInstruction<'i> for Ap<'i> {
// https://github.com/fluencelabs/aquavm/issues/216
let result = apply_to_arg(&self.argument, exec_ctx, trace_ctx, should_touch_trace)?;
let merger_ap_result = if should_touch_trace {
let merger_ap_result = trace_to_exec_err!(trace_ctx.meet_ap_start(), self)?;
try_match_trace_to_instr(&merger_ap_result, self)?;
merger_ap_result
} else {
MergerApResult::Empty
};
save_result(&self.result, &merger_ap_result, result, exec_ctx)?;
if should_touch_trace {
// if generations are empty, then this ap instruction operates only with scalars and data
// shouldn't be updated
let final_ap_result = to_ap_result(&merger_ap_result, self, exec_ctx);
trace_ctx.meet_ap_end(final_ap_result);
}
let merger_ap_result = to_merger_ap_result(should_touch_trace, self, trace_ctx)?;
let maybe_generation = update_context(&self.result, &merger_ap_result, result, exec_ctx)?;
maybe_update_trace(should_touch_trace, &merger_ap_result, maybe_generation, trace_ctx);
Ok(())
}
}
fn save_result<'ctx>(
ap_result_type: &ast::Variable<'ctx>,
merger_ap_result: &MergerApResult,
result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<()> {
use ast::Variable::*;
match ap_result_type {
Scalar(scalar) => exec_ctx.scalars.set_value(scalar.name, result).map(|_| ()),
Stream(stream) => {
let generation = ap_result_to_generation(merger_ap_result);
exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)
.map(|_| ())
}
}
}
/// This function is intended to check whether a Ap instruction should produce
/// a new state in data.
fn should_touch_trace(ap: &Ap<'_>) -> bool {
matches!(ap.result, ast::Variable::Stream(_))
}
fn to_merger_ap_result(
should_touch_trace: bool,
instr: &Ap<'_>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<MergerApResult> {
let merger_ap_result = if should_touch_trace {
let merger_ap_result = trace_to_exec_err!(trace_ctx.meet_ap_start(), instr)?;
try_match_trace_to_instr(&merger_ap_result, instr)?;
merger_ap_result
} else {
MergerApResult::Empty
};
Ok(merger_ap_result)
}
fn update_context<'ctx>(
ap_result_type: &ast::Variable<'ctx>,
merger_ap_result: &MergerApResult,
result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<Option<u32>> {
use ast::Variable::*;
match ap_result_type {
Scalar(scalar) => exec_ctx.scalars.set_value(scalar.name, result).map(|_| None),
Stream(stream) => {
let generation = ap_result_to_generation(merger_ap_result);
exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)
.map(|generation| Some(generation))
}
}
}
fn maybe_update_trace(
should_touch_trace: bool,
merger_ap_result: &MergerApResult,
maybe_generation: Option<u32>,
trace_ctx: &mut TraceHandler,
) {
if !should_touch_trace {
// if generations are empty, then this ap instruction operates only with scalars and data
// shouldn't be updated
return;
}
let final_ap_result = to_ap_result(&merger_ap_result, maybe_generation);
trace_ctx.meet_ap_end(final_ap_result);
}

View File

@ -14,7 +14,6 @@
* limitations under the License.
*/
use super::ExecutionCtx;
use super::ExecutionResult;
use crate::execution_step::Generation;
@ -54,14 +53,14 @@ fn match_position_variable(
}
}
pub(super) fn to_ap_result(merger_ap_result: &MergerApResult, instr: &Ap<'_>, exec_ctx: &ExecutionCtx<'_>) -> ApResult {
pub(super) fn to_ap_result(merger_ap_result: &MergerApResult, maybe_generation: Option<u32>) -> ApResult {
if let MergerApResult::ApResult { res_generation } = merger_ap_result {
let res_generation = option_to_vec(*res_generation);
return ApResult::new(res_generation);
}
let res_generation = variable_to_generations(&instr.result, exec_ctx);
let res_generation = option_to_vec(maybe_generation);
ApResult::new(res_generation)
}
@ -71,22 +70,3 @@ fn option_to_vec(value: Option<u32>) -> Vec<u32> {
None => vec![],
}
}
fn variable_to_generations(variable: &ast::Variable<'_>, exec_ctx: &ExecutionCtx<'_>) -> Vec<u32> {
use ast::Variable::*;
match variable {
Scalar(_) => vec![],
Stream(stream) => {
// unwrap here is safe because this function will be called only
// when this stream's been created
let stream = exec_ctx.streams.get(stream.name, stream.position).unwrap();
let generation = match stream.generations_count() {
0 => 0,
n => n - 1,
};
vec![generation as u32]
}
}
}

View File

@ -39,22 +39,10 @@ pub(crate) fn set_local_result<'i>(
Ok(CallResult::executed_scalar(result_value))
}
CallOutputValue::Variable(Variable::Stream(stream)) => {
// TODO: refactor this generation handling
let generation = match exec_ctx.streams.get(stream.name, stream.position) {
Some(stream) => {
let generation = match stream.generations_count() {
0 => 0,
n => n - 1,
};
Generation::Nth(generation as u32)
}
None => Generation::Last,
};
let generation =
exec_ctx
.streams
.add_stream_value(executed_result, generation, stream.name, stream.position)?;
.add_stream_value(executed_result, Generation::Last, stream.name, stream.position)?;
Ok(CallResult::executed_stream(result_value, generation))
}
CallOutputValue::None => Ok(CallResult::executed_scalar(result_value)),

View File

@ -33,7 +33,7 @@ impl StreamCursor {
pub(super) fn construct_iterables(&mut self, stream: &Stream) -> Vec<IterableValue> {
let iterables =
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
self.last_seen_generation = stream.non_empty_generations_count() as u32;
self.last_seen_generation = stream.generations_count() as u32;
iterables
}

View File

@ -57,17 +57,7 @@ impl Stream {
}
pub(crate) fn generations_count(&self) -> usize {
let generations_count = self.0.len();
// the last generation could be empty due to the logic of from_generations_count ctor
if generations_count > 0 && self.0[generations_count - 1].is_empty() {
generations_count - 1
} else {
generations_count
}
}
pub(crate) fn non_empty_generations_count(&self) -> usize {
self.0.iter().filter(|gen| !gen.is_empty()).count()
}

View File

@ -59,22 +59,20 @@ fn ap_with_string_literal() {
let vm_1_peer_id = "vm_1_peer_id";
let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id);
let script = format!(
r#"
let some_string = "some_string";
let script = f!(r#"
(seq
(ap "some_string" $stream)
(call "{}" ("" "") [$stream])
(ap "{some_string}" $stream)
(call "{vm_1_peer_id}" ("" "") [$stream])
)
"#,
vm_1_peer_id
);
"#);
let result = checked_call_vm!(vm_1, "", script, "", "");
let actual_trace = trace_from_result(&result);
let expected_state = vec![
executed_state::ap(Some(0)),
executed_state::scalar(json!(["some_string"])),
executed_state::scalar(json!([some_string])),
];
assert_eq!(actual_trace, expected_state);

View File

@ -46,7 +46,7 @@ fn lfold() {
assert_eq!(actual_trace[0], expected_state);
for i in 1..=5 {
let expected_state = executed_state::stream_string(format!("{}", i), 0);
let expected_state = executed_state::stream_string(format!("{}", i), i as u32 - 1);
assert_eq!(actual_trace[i], expected_state);
}
}
@ -80,7 +80,7 @@ fn rfold() {
assert_eq!(actual_trace[0], expected_state);
for i in 1..=5 {
let expected_state = executed_state::stream_string(format!("{}", 6 - i), 0);
let expected_state = executed_state::stream_string(format!("{}", 6 - i), i as u32 - 1);
assert_eq!(actual_trace[i], expected_state);
}
}
@ -124,8 +124,9 @@ fn inner_fold() {
for i in 1..=5 {
for j in 1..=5 {
let expected_state = executed_state::stream_string(i.to_string(), 0);
assert_eq!(actual_trace[1 + 5 * (i - 1) + j], expected_state);
let state_id = 1 + 5 * (i - 1) + j;
let expected_state = executed_state::stream_string(i.to_string(), state_id as u32 - 2);
assert_eq!(actual_trace[state_id], expected_state);
}
}
}
@ -287,7 +288,7 @@ fn lambda() {
assert_eq!(actual_trace[0], expected_state);
for i in 1..=5 {
let expected_state = executed_state::stream_string(format!("{}", i), 0);
let expected_state = executed_state::stream_string(format!("{}", i), i as u32 - 1);
assert_eq!(actual_trace[i], expected_state);
}
}

View File

@ -73,7 +73,7 @@ fn data_merge() {
stream_string(vm_1_id, 0),
par(1, 0),
request_sent_by(vm_1_id),
stream_string(vm_1_id, 0),
stream_string(vm_1_id, 1),
request_sent_by(vm_1_id),
];
@ -105,12 +105,12 @@ fn data_merge() {
par(1, 2),
stream_string(vm_1_id, 0),
par(1, 0),
stream_string(vm_2_id, 1),
stream_string(vm_2_id, 2),
par(1, 2),
stream_string(vm_1_id, 0),
par(1, 0),
stream_string(vm_2_id, 1),
stream_string(vm_1_id, 0),
stream_string(vm_1_id, 1),
request_sent_by(vm_1_id),
];
@ -124,12 +124,12 @@ fn data_merge() {
par(1, 2),
stream_string(vm_1_id, 0),
par(1, 0),
stream_string(vm_2_id, 1),
stream_string(vm_2_id, 2),
par(1, 2),
stream_string(vm_1_id, 0),
par(1, 0),
stream_string(vm_2_id, 1),
stream_string(vm_1_id, 0),
stream_string(vm_1_id, 1),
scalar_string(vm_2_id),
];

View File

@ -59,12 +59,12 @@ fn recursive_stream_with_early_exit() {
let actual_trace = trace_from_result(&result);
let expected_state = vec![
executed_state::stream_number(1, 0),
executed_state::stream_number(1, 0),
executed_state::fold(vec![executed_state::subtrace_lore(
0,
SubTraceDesc::new(3, 1),
SubTraceDesc::new(4, 0),
)]),
executed_state::stream_number(1, 1),
executed_state::fold(vec![
executed_state::subtrace_lore(0, SubTraceDesc::new(3, 1), SubTraceDesc::new(4, 0)),
executed_state::subtrace_lore(1, SubTraceDesc::new(4, 1), SubTraceDesc::new(5, 0)),
]),
executed_state::scalar_string("stop"),
executed_state::scalar_string("stop"),
];
@ -125,7 +125,7 @@ fn recursive_stream_many_iterations() {
let actual_trace = trace_from_result(&result);
let actual_fold = &actual_trace[2];
let expected_fold = executed_state::fold(vec![
executed_state::subtrace_lore(0, SubTraceDesc::new(3, 2), SubTraceDesc::new(7, 0)),
executed_state::subtrace_lore(0, SubTraceDesc::new(3, 2), SubTraceDesc::new(5, 0)),
executed_state::subtrace_lore(1, SubTraceDesc::new(5, 2), SubTraceDesc::new(7, 0)),
executed_state::subtrace_lore(4, SubTraceDesc::new(7, 2), SubTraceDesc::new(9, 0)),
executed_state::subtrace_lore(6, SubTraceDesc::new(9, 2), SubTraceDesc::new(11, 0)),
@ -374,3 +374,77 @@ fn recursive_stream_inner_fold() {
assert_eq!(actual_fold_lores_count, stop_request_id);
}
#[test]
fn recursive_stream_fold_with_n_service_call() {
let vm_peer_id = "vm_peer_id_1";
let request_id = std::cell::Cell::new(0);
let stop_request_id = 10;
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
let uncelled_request_id = request_id.get();
let result = if uncelled_request_id >= stop_request_id {
CallServiceResult::ok(json!("no"))
} else {
CallServiceResult::ok(json!("yes"))
};
request_id.set(uncelled_request_id + 1);
result
});
let mut vm = create_avm(give_n_results_and_then_stop, vm_peer_id);
let script = f!(r#"
(xor
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(new $loop
(new $result
(seq
(seq
(ap "yes" $loop)
(fold $loop l
(seq
(seq
(xor
(match l "yes"
(xor
(call %init_peer_id% ("yesno" "get") [] $loop)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
)
(null)
)
(ap "success" $result)
)
(next l)
)
)
)
(call %init_peer_id% ("op" "identity") [$result] result-fix)
)
)
)
)
(xor
(call %init_peer_id% ("callbackSrv" "response") [result-fix])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
"#);
let result = checked_call_vm!(vm, vm_peer_id, &script, "", "");
let actual_trace = trace_from_result(&result);
let actual_fold_state = match &actual_trace[2] {
ExecutedState::Fold(fold_result) => fold_result,
_ => panic!("2nd state should be fold"),
};
let expected_fold_lores = stop_request_id + 1;
assert_eq!(actual_fold_state.lore.len(), expected_fold_lores);
}