feat(aquavm): allow to set last instruction of fold (#349)

This commit is contained in:
Mike Voronov 2022-10-01 23:41:30 +03:00 committed by GitHub
parent dd0c458339
commit ccbd3262a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1229 additions and 582 deletions

View File

@ -25,6 +25,7 @@ pub(crate) struct FoldState<'i> {
// true of iterator exhausted and reverse execution started
pub(crate) back_iteration_started: bool,
pub(crate) instr_head: Rc<Instruction<'i>>,
pub(crate) last_instr_head: Option<Rc<Instruction<'i>>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -38,12 +39,14 @@ impl<'i> FoldState<'i> {
iterable: IterableValue,
iterable_type: IterableType,
instr_head: Rc<Instruction<'i>>,
last_instr_head: Option<Rc<Instruction<'i>>>,
) -> Self {
Self {
iterable,
iterable_type,
back_iteration_started: false,
instr_head,
last_instr_head,
}
}
}

View File

@ -52,6 +52,7 @@ impl<'i> ExecutableInstruction<'i> for FoldScalar<'i> {
IterableType::Scalar,
self.iterator.name,
self.instruction.clone(),
self.last_instruction.clone(),
exec_ctx,
trace_ctx,
),
@ -64,10 +65,11 @@ pub(super) fn fold<'i>(
iterable_type: IterableType,
iterator: &'i str,
instruction: Rc<Instruction<'i>>,
last_instruction: Option<Rc<Instruction<'i>>>,
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
let fold_state = FoldState::from_iterable(iterable, iterable_type, instruction.clone());
let fold_state = FoldState::from_iterable(iterable, iterable_type, instruction.clone(), last_instruction);
exec_ctx.scalars.meet_fold_start();
exec_ctx.scalars.set_iterable_value(iterator, fold_state)?;

View File

@ -102,6 +102,7 @@ fn execute_iterations<'i>(
IterableType::Stream(fold_id),
fold_stream.iterator.name,
fold_stream.instruction.clone(),
fold_stream.last_instruction.clone(),
exec_ctx,
trace_ctx,
);

View File

@ -33,13 +33,25 @@ impl<'i> super::ExecutableInstruction<'i> for Next<'i> {
let fold_state = exec_ctx.scalars.get_iterable_mut(iterator_name)?;
maybe_meet_iteration_end(self, fold_state, trace_ctx)?;
// TODO: refactor a body of this if to reduce LOCs count and improve readability
if !fold_state.iterable.next() {
maybe_meet_back_iterator(self, fold_state, trace_ctx)?;
if !fold_state.back_iteration_started && matches!(fold_state.iterable_type, IterableType::Stream(_)) {
// this set the last iteration of a next to not executed for fold over streams
// for more info see https://github.com/fluencelabs/aquavm/issues/333
exec_ctx.subgraph_complete = false;
fold_state.back_iteration_started = true;
let fold_state = exec_ctx.scalars.get_iterable(iterator_name)?;
// execute last instruction if any
if let Some(last_instr) = &fold_state.last_instr_head {
let last_instr = last_instr.clone();
exec_ctx.subgraph_complete = true; // it's needed because of determine_subgraph_complete in par
last_instr.execute(exec_ctx, trace_ctx)?;
} else {
// if no last instruction, execute never as a fallback for fold over stream (it'll be removed in future)
let fold_state = exec_ctx.scalars.get_iterable_mut(iterator_name)?;
if !fold_state.back_iteration_started && matches!(fold_state.iterable_type, IterableType::Stream(_)) {
// this set the last iteration of a next to not executed for fold over streams
// for more info see https://github.com/fluencelabs/aquavm/issues/333
exec_ctx.subgraph_complete = false;
fold_state.back_iteration_started = true;
}
}
// just do nothing to exit

View File

@ -136,6 +136,12 @@ impl<'i> Scalars<'i> {
self.non_iterable_variables.get_value(name)
}
pub(crate) fn get_iterable(&mut self, name: &str) -> ExecutionResult<&FoldState<'i>> {
self.iterable_variables
.get(name)
.ok_or_else(|| UncatchableError::FoldStateNotFound(name.to_string()).into())
}
pub(crate) fn get_iterable_mut(&mut self, name: &str) -> ExecutionResult<&mut FoldState<'i>> {
self.iterable_variables
.get_mut(name)

View File

@ -445,7 +445,7 @@ fn fold_waits_on_empty_stream() {
}
#[test]
fn fold_seq_next_never_completes() {
fn fold_stream_seq_next_never_completes() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id);
@ -475,6 +475,199 @@ fn fold_seq_next_never_completes() {
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_stream_seq_next_never_completes_with_never() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id);
let script = f!(r#"
(seq
(call "{vm_peer_id}" ("" "") [] $stream)
(seq
(fold $stream iterator
(seq
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator)
)
(never)
)
(call "{vm_peer_id}" ("" "") [])
)
)
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::stream_number(1, 0),
executed_state::fold(vec![subtrace_lore(
0,
SubTraceDesc::new(2.into(), 1),
SubTraceDesc::new(3.into(), 0),
)]),
executed_state::stream_number(1, 0),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_stream_seq_next_completes_with_null() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id);
let script = f!(r#"
(seq
(call "{vm_peer_id}" ("" "") [] $stream)
(seq
(fold $stream iterator
(seq
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator)
)
(null)
)
(call "{vm_peer_id}" ("" "") [])
)
)
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::stream_number(1, 0),
executed_state::fold(vec![subtrace_lore(
0,
SubTraceDesc::new(2.into(), 1),
SubTraceDesc::new(3.into(), 0),
)]),
executed_state::stream_number(1, 0),
executed_state::scalar_number(1),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_scalar_seq_next_completes_with_null() {
let vm_peer_id = "vm_peer_id";
let service_result = json!([1, 2]);
let mut vm = create_avm(set_variable_call_service(service_result.clone()), vm_peer_id);
let script = f!(r#"
(seq
(call "{vm_peer_id}" ("" "") [] iterable)
(seq
(fold iterable iterator
(par
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator)
)
(null)
)
(seq
(canon "{vm_peer_id}" $new_stream #canon_stream)
(call "{vm_peer_id}" ("" "") [#canon_stream])
)
)
)
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::scalar(service_result.clone()),
executed_state::par(1, 2),
executed_state::stream(service_result.clone(), 0),
executed_state::par(1, 0),
executed_state::stream(service_result.clone(), 0),
executed_state::canon(vec![]),
executed_state::scalar(service_result),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_scalar_seq_next_not_completes_with_never() {
let vm_peer_id = "vm_peer_id";
let service_result = json!([1, 2]);
let mut vm = create_avm(set_variable_call_service(service_result.clone()), vm_peer_id);
let script = f!(r#"
(seq
(call "{vm_peer_id}" ("" "") [] iterable)
(seq
(fold iterable iterator
(par
(call "unknwon_peer_id" ("" "") [iterator] $new_stream)
(next iterator)
)
(never)
)
(seq
(canon "{vm_peer_id}" $new_stream #canon_stream)
(call "{vm_peer_id}" ("" "") [#canon_stream])
)
)
)
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::scalar(service_result.clone()),
executed_state::par(1, 2),
executed_state::request_sent_by(vm_peer_id),
executed_state::par(1, 0),
executed_state::request_sent_by(vm_peer_id),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_stream_seq_next_saves_call_result() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(echo_call_service(), vm_peer_id);
let script = f!(r#"
(seq
(seq
(ap 1 $stream)
(ap 2 $stream)
)
(seq
(fold $stream iterator
(seq
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator)
)
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
)
(call "{vm_peer_id}" ("" "") [0])
)
)
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(3.into(), 1), SubTraceDesc::new(6.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(4.into(), 1), SubTraceDesc::new(5.into(), 1)),
]),
executed_state::stream_number(1, 0),
executed_state::stream_number(2, 1),
executed_state::stream_number(2, 2),
executed_state::scalar_number(0),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_par_next_completes() {
let vm_1_peer_id = "vm_1_peer_id";

View File

@ -119,6 +119,8 @@ pub struct FoldScalar<'i> {
#[serde(borrow)]
pub iterator: Scalar<'i>,
pub instruction: Rc<Instruction<'i>>,
// option is needed to provide a graceful period of adoption
pub last_instruction: Option<Rc<Instruction<'i>>>,
pub span: Span,
}
@ -129,6 +131,8 @@ pub struct FoldStream<'i> {
#[serde(borrow)]
pub iterator: Scalar<'i>,
pub instruction: Rc<Instruction<'i>>,
// option is needed to provide a graceful period of adoption
pub last_instruction: Option<Rc<Instruction<'i>>>,
pub span: Span,
}

View File

@ -110,12 +110,14 @@ impl<'i> FoldScalar<'i> {
iterable: FoldScalarIterable<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Self {
Self {
iterable,
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
}
}
@ -126,12 +128,14 @@ impl<'i> FoldStream<'i> {
iterable: Stream<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Self {
Self {
iterable,
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
}
}

View File

@ -62,21 +62,21 @@ Instr: Box<Instruction<'input>> = {
Box::new(Instruction::Fail(fail_body))
},
<left: @L> "(" fold <iterable:FoldScalarIterable> <iterator:Scalar> <i:Instr> ")" <right: @R> => {
<left: @L> "(" fold <iterable:FoldScalarIterable> <iterator:Scalar> <instruction:Instr> <last_instruction:Instr?>")" <right: @R> => {
let iterator = Scalar::new(iterator.0, iterator.1);
let span = Span::new(left, right);
let fold = FoldScalar::new(iterable, iterator, *i, span);
let fold = FoldScalar::new(iterable, iterator, *instruction, last_instruction.map(|v| *v), span);
validator.met_fold_scalar(&fold, span);
Box::new(Instruction::FoldScalar(fold))
},
<left: @L> "(" fold <stream:Stream> <iterator:Scalar> <i:Instr> ")" <right: @R> => {
<left: @L> "(" fold <stream:Stream> <iterator:Scalar> <instruction:Instr> <last_instruction:Instr?> ")" <right: @R> => {
let iterable = Stream::new(stream.0, stream.1);
let iterator = Scalar::new(iterator.0, iterator.1);
let span = Span::new(left, right);
let fold = FoldStream::new(iterable, iterator, *i, span);
let fold = FoldStream::new(iterable, iterator, *instruction, last_instruction.map(|v| *v), span);
validator.meet_fold_stream(&fold, span);

File diff suppressed because it is too large Load Diff

View File

@ -92,12 +92,14 @@ pub(super) fn fold_scalar_variable<'i>(
scalar: ScalarWithLambda<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Instruction<'i> {
Instruction::FoldScalar(FoldScalar {
iterable: FoldScalarIterable::Scalar(scalar),
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
})
}
@ -106,12 +108,14 @@ pub(super) fn fold_scalar_canon_stream<'i>(
canon_stream: CanonStream<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Instruction<'i> {
Instruction::FoldScalar(FoldScalar {
iterable: FoldScalarIterable::CanonStream(canon_stream),
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
})
}
@ -119,12 +123,14 @@ pub(super) fn fold_scalar_canon_stream<'i>(
pub(super) fn fold_scalar_empty_array<'i>(
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Instruction<'i> {
Instruction::FoldScalar(FoldScalar {
iterable: FoldScalarIterable::EmptyArray,
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
})
}
@ -133,12 +139,14 @@ pub(super) fn fold_stream<'i>(
iterable: Stream<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Instruction<'i> {
Instruction::FoldStream(FoldStream {
iterable,
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
})
}

View File

@ -275,11 +275,31 @@ fn parse_fold() {
ScalarWithLambda::new("iterable", None, 15.into()),
Scalar::new("i", 24.into()),
null(),
None,
Span::new(9.into(), 54.into()),
);
assert_eq!(instruction, expected);
}
#[test]
fn fold_with_scalar_and_last_instruction() {
let source_code = r#"
(fold iterable i
(null)
(null)
)
"#;
let instruction = parse(&source_code);
let expected = fold_scalar_variable(
ScalarWithLambda::new("iterable", None, 15.into()),
Scalar::new("i", 24.into()),
null(),
Some(null()),
Span::new(9.into(), 73.into()),
);
assert_eq!(instruction, expected);
}
#[test]
fn fold_json_path() {
let source_code = r#"
@ -297,6 +317,7 @@ fn fold_json_path() {
),
Scalar::new("m", 52.into()),
null(),
None,
Span::new(27.into(), 61.into()),
);
assert_eq!(instruction, expected);
@ -314,6 +335,7 @@ fn fold_empty_array_iterable() {
let expected = fold_scalar_empty_array(
Scalar::new("m", 18.into()),
null(),
None,
Span::new(9.into(), 48.into()),
);
assert_eq!(instruction, expected);
@ -330,11 +352,32 @@ fn fold_on_stream() {
Stream::new("$stream", 15.into()),
Scalar::new("iterator", 23.into()),
null(),
None,
Span::new(9.into(), 39.into()),
);
assert_eq!(instruction, expected);
}
#[test]
fn fold_on_stream_with_last_null() {
let source_code = r#"
(fold $stream iterator
(null)
(null)
)
"#;
let instruction = parse(source_code);
let expected = fold_stream(
Stream::new("$stream", 15.into()),
Scalar::new("iterator", 23.into()),
null(),
Some(null()),
Span::new(9.into(), 79.into()),
);
assert_eq!(instruction, expected);
}
#[test]
fn fold_on_canon_stream() {
let canon_stream = "#canon_stream";
@ -348,6 +391,7 @@ fn fold_on_canon_stream() {
CanonStream::new(canon_stream, 15.into()),
Scalar::new(iterator, 29.into()),
null(),
None,
Span::new(9.into(), 45.into()),
);
assert_eq!(instruction, expected);
@ -374,6 +418,7 @@ fn comments() {
),
Scalar::new("m", 52.into()),
null(),
None,
Span::new(27.into(), 61.into()),
);
assert_eq!(instruction, expected);
@ -394,6 +439,7 @@ fn parse_fold_with_xor_par_seq() {
ScalarWithLambda::new("iterable", None, 6.into()),
Scalar::new("i", 15.into()),
instr(null(), null()),
None,
Span::new(0.into(), 58.into()),
);
assert_eq!(instruction, expected);