mirror of
https://github.com/fluencelabs/aquavm
synced 2025-03-15 12:30:50 +00:00
Cleanup fold iterable while exception (#185)
This commit is contained in:
parent
c5c9aefa37
commit
fbbe28c5b2
@ -31,10 +31,10 @@ impl<'i> ExecutableInstruction<'i> for FoldScalar<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(fold, exec_ctx, trace_ctx);
|
||||
|
||||
exec_ctx.scalars.meet_fold_start();
|
||||
|
||||
let scalar_iterable = joinable!(construct_scalar_iterable_value(&self.iterable, exec_ctx), exec_ctx)?;
|
||||
let fold_result = match scalar_iterable {
|
||||
|
||||
match scalar_iterable {
|
||||
// just exit on empty iterable
|
||||
FoldIterableScalar::Empty => Ok(()),
|
||||
FoldIterableScalar::Scalar(iterable) => fold(
|
||||
iterable,
|
||||
@ -44,11 +44,7 @@ impl<'i> ExecutableInstruction<'i> for FoldScalar<'i> {
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
),
|
||||
};
|
||||
|
||||
exec_ctx.scalars.meet_fold_end();
|
||||
|
||||
fold_result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,11 +57,13 @@ pub(super) fn fold<'i>(
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
let fold_state = FoldState::from_iterable(iterable, iterable_type, instruction.clone());
|
||||
exec_ctx.scalars.meet_fold_start();
|
||||
exec_ctx.scalars.set_iterable_value(iterator, fold_state)?;
|
||||
|
||||
instruction.execute(exec_ctx, trace_ctx)?;
|
||||
let result = instruction.execute(exec_ctx, trace_ctx);
|
||||
|
||||
exec_ctx.scalars.remove_iterable_value(iterator);
|
||||
exec_ctx.scalars.meet_fold_end();
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ use air_parser::ast::FoldStream;
|
||||
impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(fold, exec_ctx, trace_ctx);
|
||||
exec_ctx.tracker.meet_fold_stream();
|
||||
|
||||
let stream_iterable = joinable!(construct_stream_iterable_value(&self.iterable, exec_ctx), exec_ctx)?;
|
||||
let iterables = match stream_iterable {
|
||||
@ -38,37 +39,49 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||
};
|
||||
|
||||
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
||||
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id))?;
|
||||
exec_ctx.scalars.meet_fold_start();
|
||||
|
||||
for iterable in iterables {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos))?;
|
||||
fold(
|
||||
iterable,
|
||||
IterableType::Stream(fold_id),
|
||||
self.iterator.name,
|
||||
self.instruction.clone(),
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
)?;
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id))?;
|
||||
|
||||
if !exec_ctx.subtree_complete {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let result = execute_iterations(iterables, self, fold_id, exec_ctx, trace_ctx);
|
||||
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id))?;
|
||||
exec_ctx.scalars.meet_fold_end();
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_iterations<'i>(
|
||||
iterables: Vec<IterableValue>,
|
||||
fold_stream: &FoldStream<'i>,
|
||||
fold_id: u32,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
for iterable in iterables {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos))?;
|
||||
let result = fold(
|
||||
iterable,
|
||||
IterableType::Stream(fold_id),
|
||||
fold_stream.iterator.name,
|
||||
fold_stream.instruction.clone(),
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
);
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id))?;
|
||||
|
||||
result?;
|
||||
if !exec_ctx.subtree_complete {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -42,6 +42,8 @@ use crate::execution_step::TraceHandler;
|
||||
|
||||
use air_parser::ast::Instruction;
|
||||
|
||||
// TODO: move all error set logic from macros into the execution context
|
||||
|
||||
/// Executes instruction and updates last error if needed.
|
||||
macro_rules! execute {
|
||||
($self:expr, $instr:expr, $exec_ctx:ident, $trace_ctx:ident) => {
|
||||
@ -62,31 +64,6 @@ macro_rules! execute {
|
||||
};
|
||||
}
|
||||
|
||||
/// Executes fold instruction, updates last error if needed, and call error_exit of TraceHandler.
|
||||
macro_rules! execute_fold {
|
||||
($self:expr, $instr:expr, $exec_ctx:ident, $trace_ctx:ident) => {{
|
||||
$exec_ctx.tracker.meet_fold_stream();
|
||||
let fold_id = $exec_ctx.tracker.fold.seen_stream_count;
|
||||
|
||||
match $instr.execute($exec_ctx, $trace_ctx) {
|
||||
Err(e) => {
|
||||
$trace_ctx.fold_end_with_error(fold_id);
|
||||
|
||||
if !$exec_ctx.last_error_could_be_set {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let instruction = format!("{}", $self);
|
||||
let last_error =
|
||||
LastErrorDescriptor::new(e.clone(), instruction, $exec_ctx.current_peer_id.to_string(), None);
|
||||
$exec_ctx.last_error = Some(last_error);
|
||||
Err(e)
|
||||
}
|
||||
v => v,
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Executes match/mismatch instructions and updates last error if error type wasn't
|
||||
/// MatchWithoutXorError or MismatchWithoutXorError.
|
||||
macro_rules! execute_match_mismatch {
|
||||
@ -126,7 +103,7 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
|
||||
|
||||
Instruction::Ap(ap) => execute!(self, ap, exec_ctx, trace_ctx),
|
||||
Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::FoldStream(fold) => execute_fold!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::New(new) => execute!(self, new, exec_ctx, trace_ctx),
|
||||
Instruction::Next(next) => execute!(self, next, exec_ctx, trace_ctx),
|
||||
Instruction::Null(null) => execute!(self, null, exec_ctx, trace_ctx),
|
||||
|
@ -69,6 +69,6 @@
|
||||
)
|
||||
(call "{4}" ("" "") [%last_error%])
|
||||
)
|
||||
(call "{5}" ("" "") [])
|
||||
(call "{5}" ("" "") ["last_peer"])
|
||||
)
|
||||
)
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
use air_test_utils::prelude::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn par_early_exit() {
|
||||
@ -170,8 +171,8 @@ fn fold_early_exit() {
|
||||
let mut stream_setter = create_avm(echo_call_service(), stream_setter_id);
|
||||
let mut fold_executor = create_avm(unit_call_service(), fold_executor_id);
|
||||
let mut error_trigger = create_avm(fallible_call_service("error"), error_trigger_id);
|
||||
let mut last_error_receiver = create_avm(unit_call_service(), last_error_receiver_id);
|
||||
let mut last_peer_checker = create_avm(unit_call_service(), last_peer_checker_id);
|
||||
let mut last_error_receiver = create_avm(echo_call_service(), last_error_receiver_id);
|
||||
let mut last_peer_checker = create_avm(echo_call_service(), last_peer_checker_id);
|
||||
|
||||
let script = format!(
|
||||
include_str!("scripts/fold_early_exit.clj"),
|
||||
@ -234,8 +235,10 @@ fn fold_early_exit() {
|
||||
executed_state::scalar_string(unit_call_service_result),
|
||||
executed_state::scalar_string(unit_call_service_result),
|
||||
executed_state::service_failed(1, "failed result from fallible_call_service"),
|
||||
executed_state::scalar_string(unit_call_service_result),
|
||||
executed_state::scalar_string(unit_call_service_result),
|
||||
executed_state::scalar(
|
||||
json!({"instruction" : r#"call "error_trigger_id" ("error" "") [] "#, "msg": r#"Local service error, ret_code is 1, error message is '"failed result from fallible_call_service"'"#, "peer_id": "error_trigger_id"}),
|
||||
),
|
||||
executed_state::scalar_string("last_peer"),
|
||||
];
|
||||
|
||||
assert_eq!(actual_trace, expected_trace);
|
||||
|
@ -154,13 +154,4 @@ impl TraceHandler {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fold_end_with_error(&mut self, fold_id: u32) {
|
||||
let fold_fsm = match self.fsm_keeper.extract_fold(fold_id) {
|
||||
Ok(fold_fsm) => fold_fsm,
|
||||
// just passing here is ok, because error could be produced while fold initialization
|
||||
Err(_) => return,
|
||||
};
|
||||
fold_fsm.fold_end_with_error(&mut self.data_keeper);
|
||||
}
|
||||
}
|
||||
|
@ -140,11 +140,6 @@ impl FoldFSM {
|
||||
self.state_inserter.insert(data_keeper, state);
|
||||
self.state_handler.set_final_states(data_keeper);
|
||||
}
|
||||
|
||||
pub(crate) fn fold_end_with_error(mut self, data_keeper: &mut DataKeeper) {
|
||||
self.meet_generation_end(data_keeper);
|
||||
self.meet_fold_end(data_keeper);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
|
Loading…
x
Reference in New Issue
Block a user