mirror of
https://github.com/fluencelabs/aquavm
synced 2025-03-15 20:40:50 +00:00
Introduce recursive streams (#225)
This commit is contained in:
parent
ac050abc8a
commit
cc54e4c383
@ -22,6 +22,7 @@ use crate::SecurityTetraplet;
|
|||||||
|
|
||||||
use air_parser::ast;
|
use air_parser::ast;
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
@ -34,11 +35,6 @@ pub(crate) enum FoldIterableScalar {
|
|||||||
Scalar(IterableValue),
|
Scalar(IterableValue),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum FoldIterableStream {
|
|
||||||
Empty,
|
|
||||||
Stream(Vec<IterableValue>),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructs iterable value for given scalar iterable.
|
/// Constructs iterable value for given scalar iterable.
|
||||||
pub(crate) fn construct_scalar_iterable_value<'ctx>(
|
pub(crate) fn construct_scalar_iterable_value<'ctx>(
|
||||||
iterable: &ast::ScalarWithLambda<'ctx>,
|
iterable: &ast::ScalarWithLambda<'ctx>,
|
||||||
@ -51,36 +47,26 @@ pub(crate) fn construct_scalar_iterable_value<'ctx>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Constructs iterable value for given stream iterable.
|
/// Constructs iterable value for given stream iterable.
|
||||||
pub(crate) fn construct_stream_iterable_value<'ctx>(
|
pub(crate) fn construct_stream_iterable_values(
|
||||||
stream: &ast::Stream<'_>,
|
stream: &RefCell<Stream>,
|
||||||
exec_ctx: &ExecutionCtx<'ctx>,
|
start: Generation,
|
||||||
) -> ExecutionResult<FoldIterableStream> {
|
end: Generation,
|
||||||
match exec_ctx.streams.get(stream.name, stream.position) {
|
) -> Vec<IterableValue> {
|
||||||
Some(stream) => {
|
let stream = stream.borrow();
|
||||||
let stream = stream.borrow();
|
let stream_iter = match stream.slice_iter(start, end) {
|
||||||
if stream.is_empty() {
|
Some(stream_iter) => stream_iter,
|
||||||
return Ok(FoldIterableStream::Empty);
|
None => return vec![],
|
||||||
}
|
};
|
||||||
|
|
||||||
let mut iterables = Vec::with_capacity(stream.generations_count());
|
stream_iter
|
||||||
|
.filter(|iterable| !iterable.is_empty())
|
||||||
for iterable in stream.slice_iter(Generation::Last).unwrap() {
|
.map(|iterable| {
|
||||||
if iterable.is_empty() {
|
let call_results = iterable.to_vec();
|
||||||
continue;
|
let foldable = IterableVecResolvedCall::init(call_results);
|
||||||
}
|
let foldable: IterableValue = Box::new(foldable);
|
||||||
|
foldable
|
||||||
let call_results = iterable.to_vec();
|
})
|
||||||
let foldable = IterableVecResolvedCall::init(call_results);
|
.collect::<Vec<_>>()
|
||||||
let foldable: IterableValue = Box::new(foldable);
|
|
||||||
iterables.push(foldable);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(FoldIterableStream::Stream(iterables))
|
|
||||||
}
|
|
||||||
// it's possible to met streams without variables at the moment in fold,
|
|
||||||
// they should be treated as empty.
|
|
||||||
None => Ok(FoldIterableStream::Empty),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_scalar_iterable<'ctx>(
|
fn create_scalar_iterable<'ctx>(
|
||||||
|
@ -14,39 +14,61 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
mod stream_cursor;
|
||||||
|
|
||||||
use super::fold::*;
|
use super::fold::*;
|
||||||
use super::fold_scalar::fold;
|
use super::fold_scalar::fold;
|
||||||
use super::ExecutableInstruction;
|
use super::ExecutableInstruction;
|
||||||
use super::ExecutionCtx;
|
use super::ExecutionCtx;
|
||||||
use super::ExecutionResult;
|
use super::ExecutionResult;
|
||||||
use super::TraceHandler;
|
use super::TraceHandler;
|
||||||
use crate::execution_step::Joinable;
|
use crate::execution_step::boxed_value::Stream;
|
||||||
use crate::joinable;
|
|
||||||
use crate::log_instruction;
|
use crate::log_instruction;
|
||||||
use crate::trace_to_exec_err;
|
use crate::trace_to_exec_err;
|
||||||
|
use air_parser::ast;
|
||||||
|
use stream_cursor::StreamCursor;
|
||||||
|
|
||||||
use air_parser::ast::FoldStream;
|
use air_parser::ast::FoldStream;
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
|
||||||
impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||||
log_instruction!(fold, exec_ctx, trace_ctx);
|
log_instruction!(fold, exec_ctx, trace_ctx);
|
||||||
exec_ctx.tracker.meet_fold_stream();
|
exec_ctx.tracker.meet_fold_stream();
|
||||||
|
|
||||||
let stream_iterable = joinable!(construct_stream_iterable_value(&self.iterable, exec_ctx), exec_ctx)?;
|
let iterable = &self.iterable;
|
||||||
let iterables = match stream_iterable {
|
let stream = match exec_ctx.streams.get(iterable.name, iterable.position) {
|
||||||
FoldIterableStream::Empty => return Ok(()),
|
Some(stream) => stream,
|
||||||
FoldIterableStream::Stream(iterables) => iterables,
|
// it's possible to met streams without variables at the moment in fold, they are treated as empty
|
||||||
|
None => return Ok(()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
||||||
|
|
||||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), self)?;
|
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), self)?;
|
||||||
|
|
||||||
let result = execute_iterations(iterables, self, fold_id, exec_ctx, trace_ctx);
|
let mut stream_cursor = StreamCursor::new();
|
||||||
|
let mut stream_iterable = stream_cursor.construct_iterables(stream);
|
||||||
|
|
||||||
|
let mut result = Ok(true);
|
||||||
|
while !stream_iterable.is_empty() {
|
||||||
|
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
|
||||||
|
// write operation to this stream to write to this new generation
|
||||||
|
add_new_generation_if_non_empty(&self.iterable, exec_ctx);
|
||||||
|
result = execute_iterations(stream_iterable, self, fold_id, exec_ctx, trace_ctx);
|
||||||
|
|
||||||
|
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
|
||||||
|
// and likely that stream could be mutably borrowed in execute_iterations
|
||||||
|
let stream = remove_new_generation_if_non_empty(&self.iterable, exec_ctx);
|
||||||
|
if should_stop_iteration(&result) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream_iterable = stream_cursor.construct_iterables(stream)
|
||||||
|
}
|
||||||
|
|
||||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
|
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
|
||||||
|
result.map(|_| ())
|
||||||
result
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +78,7 @@ fn execute_iterations<'i>(
|
|||||||
fold_id: u32,
|
fold_id: u32,
|
||||||
exec_ctx: &mut ExecutionCtx<'i>,
|
exec_ctx: &mut ExecutionCtx<'i>,
|
||||||
trace_ctx: &mut TraceHandler,
|
trace_ctx: &mut TraceHandler,
|
||||||
) -> ExecutionResult<()> {
|
) -> ExecutionResult<bool> {
|
||||||
for iterable in iterables {
|
for iterable in iterables {
|
||||||
let value = match iterable.peek() {
|
let value = match iterable.peek() {
|
||||||
Some(value) => value,
|
Some(value) => value,
|
||||||
@ -83,5 +105,29 @@ fn execute_iterations<'i>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(exec_ctx.subtree_complete)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_stop_iteration(iteration_result: &ExecutionResult<bool>) -> bool {
|
||||||
|
match &iteration_result {
|
||||||
|
Ok(result) if !result => true,
|
||||||
|
Ok(_) => false,
|
||||||
|
Err(_) => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Safety: this function should be called iff stream is present in context
|
||||||
|
fn add_new_generation_if_non_empty(stream: &ast::Stream<'_>, exec_ctx: &mut ExecutionCtx<'_>) {
|
||||||
|
let stream = exec_ctx.streams.get(stream.name, stream.position).unwrap();
|
||||||
|
stream.borrow_mut().add_new_generation_if_non_empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Safety: this function should be called iff stream is present in context
|
||||||
|
fn remove_new_generation_if_non_empty<'ctx>(
|
||||||
|
stream: &ast::Stream<'_>,
|
||||||
|
exec_ctx: &'ctx mut ExecutionCtx<'_>,
|
||||||
|
) -> &'ctx RefCell<Stream> {
|
||||||
|
let stream = exec_ctx.streams.get(stream.name, stream.position).unwrap();
|
||||||
|
stream.borrow_mut().remove_last_generation_if_empty();
|
||||||
|
stream
|
||||||
}
|
}
|
||||||
|
42
air/src/execution_step/air/fold_stream/stream_cursor.rs
Normal file
42
air/src/execution_step/air/fold_stream/stream_cursor.rs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Fluence Labs Limited
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use super::construct_stream_iterable_values;
|
||||||
|
use crate::execution_step::air::fold::IterableValue;
|
||||||
|
use crate::execution_step::boxed_value::Generation;
|
||||||
|
use crate::execution_step::boxed_value::Stream;
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
|
||||||
|
pub(super) struct StreamCursor {
|
||||||
|
last_seen_generation: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamCursor {
|
||||||
|
pub(super) fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
last_seen_generation: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn construct_iterables(&mut self, stream: &RefCell<Stream>) -> Vec<IterableValue> {
|
||||||
|
let iterables =
|
||||||
|
construct_stream_iterable_values(stream, Generation::Nth(self.last_seen_generation), Generation::Last);
|
||||||
|
self.last_seen_generation = stream.borrow().non_empty_generations_count() as u32;
|
||||||
|
|
||||||
|
iterables
|
||||||
|
}
|
||||||
|
}
|
@ -67,6 +67,37 @@ impl Stream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn non_empty_generations_count(&self) -> usize {
|
||||||
|
self.0.iter().filter(|gen| !gen.is_empty()).count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a new empty generation if the latest isn't empty.
|
||||||
|
pub(crate) fn add_new_generation_if_non_empty(&mut self) -> bool {
|
||||||
|
let should_add_generation = match self.0.last() {
|
||||||
|
Some(last) => !last.is_empty(),
|
||||||
|
None => true,
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_add_generation {
|
||||||
|
self.0.push(vec![]);
|
||||||
|
}
|
||||||
|
should_add_generation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a last generation if it's empty.
|
||||||
|
pub(crate) fn remove_last_generation_if_empty(&mut self) -> bool {
|
||||||
|
let should_remove_generation = match self.0.last() {
|
||||||
|
Some(last) => last.is_empty(),
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_remove_generation {
|
||||||
|
self.0.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
should_remove_generation
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn elements_count(&self, generation: Generation) -> Option<usize> {
|
pub(crate) fn elements_count(&self, generation: Generation) -> Option<usize> {
|
||||||
match generation {
|
match generation {
|
||||||
Generation::Nth(generation) if generation as usize > self.generations_count() => None,
|
Generation::Nth(generation) if generation as usize > self.generations_count() => None,
|
||||||
@ -106,18 +137,26 @@ impl Stream {
|
|||||||
Some(iter)
|
Some(iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn slice_iter(&self, generation: Generation) -> Option<StreamSliceIter<'_>> {
|
pub(crate) fn slice_iter(&self, start: Generation, end: Generation) -> Option<StreamSliceIter<'_>> {
|
||||||
let iter: Box<dyn Iterator<Item = &[ValueAggregate]>> = match generation {
|
if self.is_empty() {
|
||||||
Generation::Nth(generation) if generation as usize >= self.generations_count() => return None,
|
return None;
|
||||||
Generation::Nth(generation) => Box::new(self.0.iter().take(generation as usize + 1).map(|v| v.as_slice())),
|
}
|
||||||
Generation::Last => Box::new(self.0.iter().map(|v| v.as_slice())),
|
|
||||||
|
let generations_count = self.generations_count() as u32 - 1;
|
||||||
|
let (start, end) = match (start, end) {
|
||||||
|
(Generation::Nth(start), Generation::Nth(end)) => (start, end),
|
||||||
|
(Generation::Nth(start), Generation::Last) => (start, generations_count),
|
||||||
|
(Generation::Last, Generation::Nth(end)) => (generations_count, end),
|
||||||
|
(Generation::Last, Generation::Last) => (generations_count, generations_count),
|
||||||
};
|
};
|
||||||
|
|
||||||
let len = match generation {
|
if start > end || end > generations_count {
|
||||||
Generation::Nth(generation) => generation as usize,
|
return None;
|
||||||
Generation::Last => self.0.len(),
|
}
|
||||||
};
|
|
||||||
|
|
||||||
|
let len = (end - start) as usize + 1;
|
||||||
|
let iter: Box<dyn Iterator<Item = &[ValueAggregate]>> =
|
||||||
|
Box::new(self.0.iter().skip(start as usize).take(len).map(|v| v.as_slice()));
|
||||||
let iter = StreamSliceIter { iter, len };
|
let iter = StreamSliceIter { iter, len };
|
||||||
|
|
||||||
Some(iter)
|
Some(iter)
|
||||||
@ -163,7 +202,7 @@ impl<'result> ExactSizeIterator for StreamIter<'result> {}
|
|||||||
|
|
||||||
pub(crate) struct StreamSliceIter<'slice> {
|
pub(crate) struct StreamSliceIter<'slice> {
|
||||||
iter: Box<dyn Iterator<Item = &'slice [ValueAggregate]> + 'slice>,
|
iter: Box<dyn Iterator<Item = &'slice [ValueAggregate]> + 'slice>,
|
||||||
len: usize,
|
pub len: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'slice> Iterator for StreamSliceIter<'slice> {
|
impl<'slice> Iterator for StreamSliceIter<'slice> {
|
||||||
@ -189,7 +228,7 @@ impl fmt::Display for Stream {
|
|||||||
return write!(f, "[]");
|
return write!(f, "[]");
|
||||||
}
|
}
|
||||||
|
|
||||||
write!(f, "[ ")?;
|
writeln!(f, "[")?;
|
||||||
for (id, generation) in self.0.iter().enumerate() {
|
for (id, generation) in self.0.iter().enumerate() {
|
||||||
write!(f, " -- {}: ", id)?;
|
write!(f, " -- {}: ", id)?;
|
||||||
for value in generation.iter() {
|
for value in generation.iter() {
|
||||||
@ -201,3 +240,53 @@ impl fmt::Display for Stream {
|
|||||||
write!(f, "]")
|
write!(f, "]")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::Generation;
|
||||||
|
use super::Stream;
|
||||||
|
use super::ValueAggregate;
|
||||||
|
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_slice_iter() {
|
||||||
|
let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1);
|
||||||
|
let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1);
|
||||||
|
let mut stream = Stream::from_generations_count(2);
|
||||||
|
|
||||||
|
stream.add_value(value_1, Generation::Nth(0)).unwrap();
|
||||||
|
stream.add_value(value_2, Generation::Nth(1)).unwrap();
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1)).unwrap();
|
||||||
|
assert_eq!(slice.len, 2);
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Last).unwrap();
|
||||||
|
assert_eq!(slice.len, 2);
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(0)).unwrap();
|
||||||
|
assert_eq!(slice.len, 1);
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Last, Generation::Last).unwrap();
|
||||||
|
assert_eq!(slice.len, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_slice_on_empty_stream() {
|
||||||
|
let stream = Stream::from_generations_count(2);
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1));
|
||||||
|
assert!(slice.is_none());
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Last);
|
||||||
|
assert!(slice.is_none());
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(0));
|
||||||
|
assert!(slice.is_none());
|
||||||
|
|
||||||
|
let slice = stream.slice_iter(Generation::Last, Generation::Last);
|
||||||
|
assert!(slice.is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -26,6 +26,7 @@ mod join_behaviour;
|
|||||||
mod lambda;
|
mod lambda;
|
||||||
mod last_error;
|
mod last_error;
|
||||||
mod network_explore;
|
mod network_explore;
|
||||||
|
mod recursive_streams;
|
||||||
mod security_tetraplets;
|
mod security_tetraplets;
|
||||||
mod streams;
|
mod streams;
|
||||||
mod streams_early_exit;
|
mod streams_early_exit;
|
||||||
|
376
air/tests/test_module/integration/recursive_streams.rs
Normal file
376
air/tests/test_module/integration/recursive_streams.rs
Normal file
@ -0,0 +1,376 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2022 Fluence Labs Limited
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use air_test_utils::prelude::*;
|
||||||
|
|
||||||
|
use fstrings::f;
|
||||||
|
use fstrings::format_args_f;
|
||||||
|
|
||||||
|
use pretty_assertions::assert_eq;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recursive_stream_with_early_exit() {
|
||||||
|
let vm_peer_id = "vm_peer_id";
|
||||||
|
let variable_mappings = maplit::hashmap! {
|
||||||
|
"stream_value".to_string() => json!(1),
|
||||||
|
"stop".to_string() => json!("stop"),
|
||||||
|
};
|
||||||
|
let mut vm = create_avm(
|
||||||
|
set_variables_call_service(variable_mappings, VariableOptionSource::FunctionName),
|
||||||
|
vm_peer_id,
|
||||||
|
);
|
||||||
|
|
||||||
|
let script = f!(r#"
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id}" ("" "stream_value") [] $stream)
|
||||||
|
(call "{vm_peer_id}" ("" "stream_value") [] $stream)
|
||||||
|
)
|
||||||
|
(fold $stream iterator
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id}" ("" "stop") [] value)
|
||||||
|
(xor
|
||||||
|
(match value "stop"
|
||||||
|
(null)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(ap value $stream)
|
||||||
|
(next iterator)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)"#);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm, "", script, "", "");
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
let expected_state = vec![
|
||||||
|
executed_state::stream_number(1, 0),
|
||||||
|
executed_state::stream_number(1, 0),
|
||||||
|
executed_state::fold(vec![executed_state::subtrace_lore(
|
||||||
|
0,
|
||||||
|
SubTraceDesc::new(3, 1),
|
||||||
|
SubTraceDesc::new(4, 0),
|
||||||
|
)]),
|
||||||
|
executed_state::scalar_string("stop"),
|
||||||
|
];
|
||||||
|
|
||||||
|
assert_eq!(actual_trace, expected_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recursive_stream_many_iterations() {
|
||||||
|
let vm_peer_id_1 = "vm_peer_id_1";
|
||||||
|
|
||||||
|
let request_id = std::cell::Cell::new(0);
|
||||||
|
let stop_request_id = 10;
|
||||||
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
||||||
|
let uncelled_request_id = request_id.get();
|
||||||
|
|
||||||
|
let result = if uncelled_request_id >= stop_request_id {
|
||||||
|
CallServiceResult::ok(json!("stop"))
|
||||||
|
} else {
|
||||||
|
CallServiceResult::ok(json!("non_stop"))
|
||||||
|
};
|
||||||
|
|
||||||
|
request_id.set(uncelled_request_id + 1);
|
||||||
|
result
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
||||||
|
|
||||||
|
let vm_peer_id_2 = "vm_peer_id_2";
|
||||||
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
||||||
|
|
||||||
|
let result_value = "result_value";
|
||||||
|
let script = f!(r#"
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
||||||
|
)
|
||||||
|
(fold $stream iterator
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
||||||
|
(xor
|
||||||
|
(match value "stop"
|
||||||
|
(null)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(ap value $stream)
|
||||||
|
(next iterator)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
||||||
|
)"#);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm_1, "", &script, "", "");
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
let actual_fold = &actual_trace[2];
|
||||||
|
let expected_fold = executed_state::fold(vec![
|
||||||
|
executed_state::subtrace_lore(0, SubTraceDesc::new(3, 2), SubTraceDesc::new(7, 0)),
|
||||||
|
executed_state::subtrace_lore(1, SubTraceDesc::new(5, 2), SubTraceDesc::new(7, 0)),
|
||||||
|
executed_state::subtrace_lore(4, SubTraceDesc::new(7, 2), SubTraceDesc::new(9, 0)),
|
||||||
|
executed_state::subtrace_lore(6, SubTraceDesc::new(9, 2), SubTraceDesc::new(11, 0)),
|
||||||
|
executed_state::subtrace_lore(8, SubTraceDesc::new(11, 2), SubTraceDesc::new(15, 0)),
|
||||||
|
executed_state::subtrace_lore(10, SubTraceDesc::new(13, 2), SubTraceDesc::new(15, 0)),
|
||||||
|
executed_state::subtrace_lore(12, SubTraceDesc::new(15, 2), SubTraceDesc::new(19, 0)),
|
||||||
|
executed_state::subtrace_lore(14, SubTraceDesc::new(17, 2), SubTraceDesc::new(19, 0)),
|
||||||
|
executed_state::subtrace_lore(16, SubTraceDesc::new(19, 1), SubTraceDesc::new(20, 0)),
|
||||||
|
]);
|
||||||
|
assert_eq!(actual_fold, &expected_fold);
|
||||||
|
|
||||||
|
let actual_last_state = &actual_trace[20];
|
||||||
|
let expected_last_state = executed_state::request_sent_by(vm_peer_id_1);
|
||||||
|
assert_eq!(actual_last_state, &expected_last_state);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm_2, "", script, "", result.data);
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
let actual_last_state = &actual_trace[20];
|
||||||
|
let expected_last_state = executed_state::scalar_string(result_value);
|
||||||
|
assert_eq!(actual_last_state, &expected_last_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recursive_stream_join() {
|
||||||
|
let vm_peer_id_1 = "vm_peer_id_1";
|
||||||
|
|
||||||
|
let request_id = std::cell::Cell::new(0);
|
||||||
|
let stop_request_id = 5;
|
||||||
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
||||||
|
let uncelled_request_id = request_id.get();
|
||||||
|
|
||||||
|
let result = if uncelled_request_id >= stop_request_id {
|
||||||
|
CallServiceResult::ok(json!("join"))
|
||||||
|
} else {
|
||||||
|
CallServiceResult::ok(json!("non_join"))
|
||||||
|
};
|
||||||
|
|
||||||
|
request_id.set(uncelled_request_id + 1);
|
||||||
|
result
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
||||||
|
|
||||||
|
let vm_peer_id_2 = "vm_peer_id_2";
|
||||||
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
||||||
|
let vm_peer_id_3 = "vm_peer_id_3";
|
||||||
|
let mut vm_3 = create_avm(echo_call_service(), vm_peer_id_3);
|
||||||
|
|
||||||
|
let result_value = "result_value";
|
||||||
|
let script = f!(r#"
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(par
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
||||||
|
(call "{vm_peer_id_3}" ("" "stream_value") [""] join_variable)
|
||||||
|
)
|
||||||
|
(fold $stream iterator
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "") [""] value)
|
||||||
|
(xor
|
||||||
|
(match value "join"
|
||||||
|
(call "{vm_peer_id_2}" ("" "") [join_variable])
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(ap value $stream)
|
||||||
|
(next iterator)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
||||||
|
)"#);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm_1, "", &script, "", "");
|
||||||
|
let result = checked_call_vm!(vm_3, "", &script, "", result.data);
|
||||||
|
let result = checked_call_vm!(vm_2, "", &script, "", result.data);
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
let expected_trace = vec![
|
||||||
|
executed_state::par(1, 1),
|
||||||
|
executed_state::stream_string("non_join", 0),
|
||||||
|
executed_state::scalar_string(""),
|
||||||
|
executed_state::fold(vec![
|
||||||
|
executed_state::subtrace_lore(1, SubTraceDesc::new(4, 2), SubTraceDesc::new(6, 0)),
|
||||||
|
executed_state::subtrace_lore(5, SubTraceDesc::new(6, 2), SubTraceDesc::new(8, 0)),
|
||||||
|
executed_state::subtrace_lore(7, SubTraceDesc::new(8, 2), SubTraceDesc::new(10, 0)),
|
||||||
|
executed_state::subtrace_lore(9, SubTraceDesc::new(10, 2), SubTraceDesc::new(12, 0)),
|
||||||
|
executed_state::subtrace_lore(11, SubTraceDesc::new(12, 2), SubTraceDesc::new(14, 0)),
|
||||||
|
]),
|
||||||
|
executed_state::scalar_string("non_join"),
|
||||||
|
executed_state::ap(Some(1)),
|
||||||
|
executed_state::scalar_string("non_join"),
|
||||||
|
executed_state::ap(Some(2)),
|
||||||
|
executed_state::scalar_string("non_join"),
|
||||||
|
executed_state::ap(Some(3)),
|
||||||
|
executed_state::scalar_string("non_join"),
|
||||||
|
executed_state::ap(Some(4)),
|
||||||
|
executed_state::scalar_string("join"),
|
||||||
|
executed_state::scalar_string(""),
|
||||||
|
executed_state::scalar_string(result_value),
|
||||||
|
];
|
||||||
|
assert_eq!(actual_trace, expected_trace);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recursive_stream_error_handling() {
|
||||||
|
let vm_peer_id_1 = "vm_peer_id_1";
|
||||||
|
|
||||||
|
let request_id = std::cell::Cell::new(0);
|
||||||
|
let stop_request_id = 5;
|
||||||
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
||||||
|
let uncelled_request_id = request_id.get();
|
||||||
|
|
||||||
|
let result = if uncelled_request_id >= stop_request_id {
|
||||||
|
CallServiceResult::err(1, json!("error"))
|
||||||
|
} else {
|
||||||
|
CallServiceResult::ok(json!("non_stop"))
|
||||||
|
};
|
||||||
|
|
||||||
|
request_id.set(uncelled_request_id + 1);
|
||||||
|
result
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
||||||
|
|
||||||
|
let vm_peer_id_2 = "vm_peer_id_2";
|
||||||
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
||||||
|
|
||||||
|
let result_value = "result_value";
|
||||||
|
let script = f!(r#"
|
||||||
|
(xor
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
||||||
|
)
|
||||||
|
(fold $stream iterator
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
||||||
|
(xor
|
||||||
|
(match value "stop"
|
||||||
|
(null)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(ap value $stream)
|
||||||
|
(next iterator)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
||||||
|
)"#);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm_1, "", &script, "", "");
|
||||||
|
let result = checked_call_vm!(vm_2, "", &script, "", result.data);
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
let actual_last_state = &actual_trace[10];
|
||||||
|
let expected_last_state = executed_state::scalar_string(result_value);
|
||||||
|
|
||||||
|
assert_eq!(actual_last_state, &expected_last_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn recursive_stream_inner_fold() {
|
||||||
|
let vm_peer_id_1 = "vm_peer_id_1";
|
||||||
|
|
||||||
|
let request_id = std::cell::Cell::new(0);
|
||||||
|
let stop_request_id = 10;
|
||||||
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
||||||
|
let uncelled_request_id = request_id.get();
|
||||||
|
|
||||||
|
let result = if uncelled_request_id >= stop_request_id {
|
||||||
|
CallServiceResult::ok(json!("stop"))
|
||||||
|
} else {
|
||||||
|
CallServiceResult::ok(json!("non_stop"))
|
||||||
|
};
|
||||||
|
|
||||||
|
request_id.set(uncelled_request_id + 1);
|
||||||
|
result
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
||||||
|
|
||||||
|
let vm_peer_id_2 = "vm_peer_id_2";
|
||||||
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
||||||
|
|
||||||
|
let result_value = "result_value";
|
||||||
|
let script = f!(r#"
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream_1)
|
||||||
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream_2)
|
||||||
|
)
|
||||||
|
(fold $stream_1 iterator_1
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
||||||
|
(xor
|
||||||
|
(match value "stop"
|
||||||
|
(null)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(seq
|
||||||
|
(ap value $stream_1)
|
||||||
|
(fold $stream_2 iterator_2
|
||||||
|
(seq
|
||||||
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
||||||
|
(xor
|
||||||
|
(match value "stop"
|
||||||
|
(null)
|
||||||
|
)
|
||||||
|
(seq
|
||||||
|
(ap value $stream_2)
|
||||||
|
(next iterator_2)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(next iterator_1)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
||||||
|
)"#);
|
||||||
|
|
||||||
|
let result = checked_call_vm!(vm_1, "", &script, "", "");
|
||||||
|
let result = checked_call_vm!(vm_2, "", script, "", result.data);
|
||||||
|
let actual_trace = trace_from_result(&result);
|
||||||
|
|
||||||
|
let actual_last_state = &actual_trace[22];
|
||||||
|
let expected_last_state = executed_state::scalar_string(result_value);
|
||||||
|
assert_eq!(actual_last_state, &expected_last_state);
|
||||||
|
|
||||||
|
let external_fold = &actual_trace[2];
|
||||||
|
let internal_fold = &actual_trace[5];
|
||||||
|
let actual_fold_lores_count = match (external_fold, internal_fold) {
|
||||||
|
(ExecutedState::Fold(external_fold), ExecutedState::Fold(internal_fold)) => {
|
||||||
|
external_fold.lore.len() + internal_fold.lore.len()
|
||||||
|
}
|
||||||
|
_ => panic!("2nd and 5th states should be fold"),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(actual_fold_lores_count, stop_request_id);
|
||||||
|
}
|
@ -41,8 +41,8 @@ impl TestRunner {
|
|||||||
let mut prev_data = prev_data.into();
|
let mut prev_data = prev_data.into();
|
||||||
let mut data = data.into();
|
let mut data = data.into();
|
||||||
let init_user_id = init_user_id.into();
|
let init_user_id = init_user_id.into();
|
||||||
let mut call_results = HashMap::new();
|
|
||||||
|
|
||||||
|
let mut call_results = HashMap::new();
|
||||||
let mut next_peer_pks = HashSet::new();
|
let mut next_peer_pks = HashSet::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user