mirror of
https://github.com/fluencelabs/aquavm
synced 2025-03-14 20:20:49 +00:00
feature(execution-engine): Canon data with CID (#419)
* Use CID values for tetraplets and `canon` vectors. * Rename `cid_store` to `value_store` It is consistent with the new `tetraplet_store` and `canon_store` fields. * Make canon data more typeful The `CanonResult` doesn't take a JSON value anymore that is further deserialized elsewhere, but is a struct that has all data deserialized. * Typeful `CID` type The `CID` type has a phantom type paramter defining its value's type. * Group cid stores and trackers Group cid stores into `CidInfo` struct, and trackers into `ExecutionCidState` struct.
This commit is contained in:
parent
f73e246a2e
commit
8f587b7803
@ -1,3 +1,12 @@
|
||||
## Version 0.35.0 (2022-12-27)
|
||||
|
||||
[PR 419](https://github.com/fluencelabs/aquavm/pull/419):
|
||||
- Rename data's `cid_store` field to `value_store`.
|
||||
- Canon data is stored with CIDs. Values, tetraplets and canon elements
|
||||
are stored as CIDs resolved with data's `value_store`, `tetraplet_store`
|
||||
and `canon_store` fields respectively.
|
||||
- Group stores in the data into `cid_info: CidInfo` field.
|
||||
|
||||
## Version 0.34.0 (2022-12-26)
|
||||
|
||||
[PR 414](https://github.com/fluencelabs/aquavm/pull/414):
|
||||
|
11
Cargo.lock
generated
11
Cargo.lock
generated
@ -13,7 +13,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "air"
|
||||
version = "0.34.0"
|
||||
version = "0.35.0"
|
||||
dependencies = [
|
||||
"air-execution-info-collector",
|
||||
"air-interpreter-cid",
|
||||
@ -85,7 +85,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "air-interpreter"
|
||||
version = "0.34.0"
|
||||
version = "0.35.0"
|
||||
dependencies = [
|
||||
"air",
|
||||
"air-interpreter-interface",
|
||||
@ -101,7 +101,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "air-interpreter-cid"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"cid",
|
||||
"multihash",
|
||||
@ -111,13 +111,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "air-interpreter-data"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
dependencies = [
|
||||
"air-interpreter-cid",
|
||||
"air-interpreter-interface",
|
||||
"air-parser",
|
||||
"air-utils",
|
||||
"once_cell",
|
||||
"polyplets",
|
||||
"semver 1.0.16",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@ -203,6 +204,7 @@ dependencies = [
|
||||
"object-pool",
|
||||
"once_cell",
|
||||
"semver 1.0.16",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
@ -249,6 +251,7 @@ dependencies = [
|
||||
"air-parser",
|
||||
"bimap",
|
||||
"log",
|
||||
"polyplets",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "air-interpreter"
|
||||
version = "0.34.0"
|
||||
version = "0.35.0"
|
||||
description = "Crate-wrapper for air"
|
||||
authors = ["Fluence Labs"]
|
||||
edition = "2018"
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "air"
|
||||
version = "0.34.0"
|
||||
version = "0.35.0"
|
||||
description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network"
|
||||
authors = ["Fluence Labs"]
|
||||
edition = "2018"
|
||||
@ -17,7 +17,7 @@ doctest = false
|
||||
[dependencies]
|
||||
air-parser = { path = "../crates/air-lib/air-parser" }
|
||||
air-execution-info-collector = { path = "../crates/air-lib/execution-info-collector" }
|
||||
air-interpreter-cid = { version = "0.1.0", path = "../crates/air-lib/interpreter-cid" }
|
||||
air-interpreter-cid = { version = "0.2.0", path = "../crates/air-lib/interpreter-cid" }
|
||||
air-interpreter-data = { path = "../crates/air-lib/interpreter-data" }
|
||||
air-interpreter-interface = { path = "../crates/air-lib/interpreter-interface", default-features = false }
|
||||
air-log-targets = { path = "../crates/air-lib/log-targets" }
|
||||
|
@ -35,6 +35,10 @@ pub struct CanonStream {
|
||||
}
|
||||
|
||||
impl CanonStream {
|
||||
pub(crate) fn new(values: Vec<ValueAggregate>, tetraplet: Rc<SecurityTetraplet>) -> Self {
|
||||
Self { values, tetraplet }
|
||||
}
|
||||
|
||||
pub(crate) fn from_stream(stream: &Stream, peer_pk: String) -> Self {
|
||||
// it's always possible to iter over all generations of a stream
|
||||
let values = stream.iter(Generation::Last).unwrap().cloned().collect::<Vec<_>>();
|
||||
|
@ -18,7 +18,6 @@ use crate::JValue;
|
||||
use crate::ToErrorCode;
|
||||
|
||||
use air_interpreter_cid::CidCalculationError;
|
||||
use air_interpreter_cid::CID;
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_interpreter_data::ValueRef;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
@ -29,8 +28,6 @@ use strum_macros::EnumDiscriminants;
|
||||
use strum_macros::EnumIter;
|
||||
use thiserror::Error as ThisError;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
/// Uncatchable errors arisen during AIR script execution. Uncatchable here means that these errors
|
||||
/// couldn't be handled by a xor instruction and their error_code couldn't be used in a match
|
||||
/// instruction. They are similar to JVM runtime errors and some of them could be caught only
|
||||
@ -98,8 +95,10 @@ pub enum UncatchableError {
|
||||
#[error("failed to calculate value's CID")]
|
||||
CidError(#[from] CidCalculationError),
|
||||
|
||||
#[error("value for CID {0:?} not found")]
|
||||
ValueForCidNotFound(Rc<CID>),
|
||||
/// We consider now that every CID should present in the data;
|
||||
/// and not having any CID is considered a non-catching error.
|
||||
#[error("{0} for CID {1:?} not found")]
|
||||
ValueForCidNotFound(&'static str, String),
|
||||
}
|
||||
|
||||
impl ToErrorCode for UncatchableError {
|
||||
|
@ -18,15 +18,21 @@ use super::LastError;
|
||||
use super::LastErrorDescriptor;
|
||||
use super::Scalars;
|
||||
use super::Streams;
|
||||
use crate::execution_step::RcSecurityTetraplet;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
use crate::JValue;
|
||||
use crate::UncatchableError;
|
||||
|
||||
use air_execution_info_collector::InstructionTracker;
|
||||
use air_interpreter_cid::CID;
|
||||
use air_interpreter_data::CidStore;
|
||||
use air_interpreter_data::CanonCidAggregate;
|
||||
use air_interpreter_data::CidInfo;
|
||||
use air_interpreter_data::CidTracker;
|
||||
use air_interpreter_data::GlobalStreamGens;
|
||||
use air_interpreter_data::RestrictedStreamGens;
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_interpreter_interface::*;
|
||||
use polyplets::SecurityTetraplet;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
@ -69,8 +75,8 @@ pub(crate) struct ExecutionCtx<'i> {
|
||||
/// Tracks all functions that should be called from services.
|
||||
pub(crate) call_requests: CallRequests,
|
||||
|
||||
/// Merged CID-to-value dictionaries
|
||||
pub(crate) cid_tracker: CidTracker,
|
||||
/// CID-to-something trackers.
|
||||
pub(crate) cid_state: ExecutionCidState,
|
||||
}
|
||||
|
||||
impl<'i> ExecutionCtx<'i> {
|
||||
@ -88,7 +94,7 @@ impl<'i> ExecutionCtx<'i> {
|
||||
current_ingredients.restricted_streams,
|
||||
);
|
||||
|
||||
let cid_tracker = CidTracker::from_cid_stores(prev_ingredients.cid_store, current_ingredients.cid_store);
|
||||
let cid_state = ExecutionCidState::from_cid_info(prev_ingredients.cid_info, current_ingredients.cid_info);
|
||||
|
||||
Self {
|
||||
run_parameters,
|
||||
@ -96,7 +102,7 @@ impl<'i> ExecutionCtx<'i> {
|
||||
last_call_request_id: prev_ingredients.last_call_request_id,
|
||||
call_results,
|
||||
streams,
|
||||
cid_tracker,
|
||||
cid_state,
|
||||
..<_>::default()
|
||||
}
|
||||
}
|
||||
@ -109,10 +115,6 @@ impl<'i> ExecutionCtx<'i> {
|
||||
self.last_call_request_id += 1;
|
||||
self.last_call_request_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_value_by_cid(&self, cid: &CID) -> Option<Rc<JValue>> {
|
||||
self.cid_tracker.get(cid)
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionCtx<'_> {
|
||||
@ -139,7 +141,71 @@ pub(crate) struct ExecCtxIngredients {
|
||||
pub(crate) global_streams: GlobalStreamGens,
|
||||
pub(crate) last_call_request_id: u32,
|
||||
pub(crate) restricted_streams: RestrictedStreamGens,
|
||||
pub(crate) cid_store: CidStore<JValue>,
|
||||
pub(crate) cid_info: CidInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ExecutionCidState {
|
||||
pub(crate) value_tracker: CidTracker<JValue>,
|
||||
pub(crate) tetraplet_tracker: CidTracker<SecurityTetraplet>,
|
||||
pub(crate) canon_tracker: CidTracker<CanonCidAggregate>,
|
||||
}
|
||||
|
||||
impl ExecutionCidState {
|
||||
fn from_cid_info(prev_cid_info: CidInfo, current_cid_info: CidInfo) -> Self {
|
||||
Self {
|
||||
value_tracker: CidTracker::from_cid_stores(prev_cid_info.value_store, current_cid_info.value_store),
|
||||
tetraplet_tracker: CidTracker::from_cid_stores(
|
||||
prev_cid_info.tetraplet_store,
|
||||
current_cid_info.tetraplet_store,
|
||||
),
|
||||
canon_tracker: CidTracker::from_cid_stores(prev_cid_info.canon_store, current_cid_info.canon_store),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_value_by_cid(&self, cid: &CID<JValue>) -> Result<Rc<JValue>, UncatchableError> {
|
||||
self.value_tracker
|
||||
.get(cid)
|
||||
.ok_or_else(|| UncatchableError::ValueForCidNotFound("value", cid.clone().into()))
|
||||
}
|
||||
|
||||
pub(crate) fn get_tetraplet_by_cid(
|
||||
&self,
|
||||
cid: &CID<SecurityTetraplet>,
|
||||
) -> Result<RcSecurityTetraplet, UncatchableError> {
|
||||
self.tetraplet_tracker
|
||||
.get(cid)
|
||||
.ok_or_else(|| UncatchableError::ValueForCidNotFound("tetraplet", cid.clone().into()))
|
||||
}
|
||||
|
||||
pub(crate) fn get_canon_value_by_cid(
|
||||
&self,
|
||||
cid: &CID<CanonCidAggregate>,
|
||||
) -> Result<ValueAggregate, UncatchableError> {
|
||||
let canon_aggregate = self
|
||||
.canon_tracker
|
||||
.get(cid)
|
||||
.ok_or_else(|| UncatchableError::ValueForCidNotFound("canon aggregate", cid.clone().into()))?;
|
||||
let result = self.get_value_by_cid(&canon_aggregate.value)?;
|
||||
let tetraplet = self.get_tetraplet_by_cid(&canon_aggregate.tetraplet)?;
|
||||
|
||||
let fake_trace_pos = TracePos::default();
|
||||
Ok(ValueAggregate {
|
||||
result,
|
||||
tetraplet,
|
||||
trace_pos: fake_trace_pos,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ExecutionCidState> for CidInfo {
|
||||
fn from(value: ExecutionCidState) -> Self {
|
||||
Self {
|
||||
value_store: value.value_tracker.into(),
|
||||
tetraplet_store: value.tetraplet_tracker.into(),
|
||||
canon_store: value.canon_tracker.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use serde::Deserialize;
|
||||
|
@ -33,7 +33,8 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
) -> ExecutionResult<CallResult> {
|
||||
let cid = exec_ctx
|
||||
.cid_tracker
|
||||
.cid_state
|
||||
.value_tracker
|
||||
.record_value(executed_result.result.clone())
|
||||
.map_err(UncatchableError::from)?;
|
||||
|
||||
@ -69,17 +70,13 @@ pub(crate) fn populate_context_from_data<'i>(
|
||||
) -> ExecutionResult<ValueRef> {
|
||||
match (output, value) {
|
||||
(CallOutputValue::Scalar(scalar), ValueRef::Scalar(cid)) => {
|
||||
let value = exec_ctx
|
||||
.get_value_by_cid(&cid)
|
||||
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
|
||||
let value = exec_ctx.cid_state.get_value_by_cid(&cid)?;
|
||||
let result = ValueAggregate::new(value, tetraplet, trace_pos);
|
||||
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
|
||||
Ok(ValueRef::Scalar(cid))
|
||||
}
|
||||
(CallOutputValue::Stream(stream), ValueRef::Stream { cid, generation }) => {
|
||||
let value = exec_ctx
|
||||
.get_value_by_cid(&cid)
|
||||
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
|
||||
let value = exec_ctx.cid_state.get_value_by_cid(&cid)?;
|
||||
let result = ValueAggregate::new(value, tetraplet, trace_pos);
|
||||
let value_descriptor = StreamValueDescriptor::new(
|
||||
result,
|
||||
|
@ -21,15 +21,17 @@ use crate::execution_step::boxed_value::CanonStream;
|
||||
use crate::execution_step::Stream;
|
||||
use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use crate::ExecutionError;
|
||||
use crate::JValue;
|
||||
use crate::UncatchableError;
|
||||
|
||||
use air_interpreter_cid::CID;
|
||||
use air_interpreter_data::CanonCidAggregate;
|
||||
use air_interpreter_data::CanonResult;
|
||||
use air_parser::ast;
|
||||
use air_trace_handler::merger::MergerCanonResult;
|
||||
use polyplets::SecurityTetraplet;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::rc::Rc;
|
||||
|
||||
impl<'i> super::ExecutableInstruction<'i> for ast::Canon<'i> {
|
||||
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
|
||||
@ -38,8 +40,8 @@ impl<'i> super::ExecutableInstruction<'i> for ast::Canon<'i> {
|
||||
let canon_result = trace_to_exec_err!(trace_ctx.meet_canon_start(), self)?;
|
||||
|
||||
match canon_result {
|
||||
MergerCanonResult::CanonResult { canonicalized_element } => {
|
||||
handle_seen_canon(self, canonicalized_element, exec_ctx, trace_ctx)
|
||||
MergerCanonResult::CanonResult { tetraplet, values } => {
|
||||
handle_seen_canon(self, tetraplet, values, exec_ctx, trace_ctx)
|
||||
}
|
||||
MergerCanonResult::Empty => handle_unseen_canon(self, exec_ctx, trace_ctx),
|
||||
}
|
||||
@ -48,19 +50,23 @@ impl<'i> super::ExecutableInstruction<'i> for ast::Canon<'i> {
|
||||
|
||||
fn handle_seen_canon(
|
||||
ast_canon: &ast::Canon<'_>,
|
||||
se_canon_stream: JValue,
|
||||
tetraplet_cid: Rc<CID<SecurityTetraplet>>,
|
||||
value_cids: Vec<Rc<CID<CanonCidAggregate>>>,
|
||||
exec_ctx: &mut ExecutionCtx<'_>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
let canon_stream = serde_json::from_value(se_canon_stream.clone()).map_err(|de_error| {
|
||||
ExecutionError::Uncatchable(UncatchableError::InvalidCanonStreamInData {
|
||||
canonicalized_stream: se_canon_stream.clone(),
|
||||
de_error,
|
||||
})
|
||||
})?;
|
||||
let tetraplet = exec_ctx.cid_state.get_tetraplet_by_cid(&tetraplet_cid)?;
|
||||
let values = value_cids
|
||||
.iter()
|
||||
.map(|canon_value_cid| exec_ctx.cid_state.get_canon_value_by_cid(canon_value_cid))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let canon_stream = CanonStream::new(values, tetraplet);
|
||||
|
||||
let canon_stream_with_se = StreamWithSerializedView {
|
||||
canon_stream,
|
||||
se_canon_stream,
|
||||
tetraplet_cid,
|
||||
value_cids,
|
||||
};
|
||||
|
||||
epilog(ast_canon.canon_stream.name, canon_stream_with_se, exec_ctx, trace_ctx)
|
||||
@ -97,35 +103,56 @@ fn epilog(
|
||||
) -> ExecutionResult<()> {
|
||||
let StreamWithSerializedView {
|
||||
canon_stream,
|
||||
se_canon_stream,
|
||||
tetraplet_cid,
|
||||
value_cids,
|
||||
} = stream_with_positions;
|
||||
|
||||
exec_ctx
|
||||
.scalars
|
||||
.set_canon_value(canon_stream_name, canon_stream)
|
||||
.map(|_| ())?;
|
||||
trace_ctx.meet_canon_end(CanonResult::new(se_canon_stream));
|
||||
trace_ctx.meet_canon_end(CanonResult::new(tetraplet_cid, value_cids));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct StreamWithSerializedView {
|
||||
canon_stream: CanonStream,
|
||||
se_canon_stream: JValue,
|
||||
tetraplet_cid: Rc<CID<SecurityTetraplet>>,
|
||||
value_cids: Vec<Rc<CID<CanonCidAggregate>>>,
|
||||
}
|
||||
|
||||
fn create_canon_stream_from_name(
|
||||
ast_canon: &ast::Canon<'_>,
|
||||
peer_id: String,
|
||||
exec_ctx: &ExecutionCtx<'_>,
|
||||
exec_ctx: &mut ExecutionCtx<'_>,
|
||||
) -> ExecutionResult<StreamWithSerializedView> {
|
||||
let stream = get_stream_or_default(ast_canon, exec_ctx);
|
||||
|
||||
let canon_stream = CanonStream::from_stream(stream.as_ref(), peer_id);
|
||||
let se_canon_stream = serde_json::to_value(&canon_stream).expect("default serialized shouldn't fail");
|
||||
|
||||
let value_cids = canon_stream
|
||||
.iter()
|
||||
.map(|val| -> Result<_, UncatchableError> {
|
||||
let canon_value_aggregate = CanonCidAggregate {
|
||||
value: exec_ctx.cid_state.value_tracker.record_value(val.result.clone())?,
|
||||
tetraplet: exec_ctx
|
||||
.cid_state
|
||||
.tetraplet_tracker
|
||||
.record_value(val.tetraplet.clone())?,
|
||||
};
|
||||
Ok(exec_ctx.cid_state.canon_tracker.record_value(canon_value_aggregate)?)
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
let tetraplet_cid = exec_ctx
|
||||
.cid_state
|
||||
.tetraplet_tracker
|
||||
.record_value(canon_stream.tetraplet().clone())
|
||||
.map_err(UncatchableError::from)?;
|
||||
|
||||
let result = StreamWithSerializedView {
|
||||
canon_stream,
|
||||
se_canon_stream,
|
||||
tetraplet_cid,
|
||||
value_cids,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
|
@ -93,7 +93,7 @@ fn populate_outcome_from_contexts(
|
||||
trace_handler.into_result_trace(),
|
||||
global_streams,
|
||||
restricted_streams,
|
||||
exec_ctx.cid_tracker,
|
||||
exec_ctx.cid_state.into(),
|
||||
exec_ctx.last_call_request_id,
|
||||
semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("cargo version is valid"),
|
||||
);
|
||||
|
@ -52,14 +52,14 @@ pub(crate) fn prepare<'i>(
|
||||
global_streams: prev_data.global_streams,
|
||||
last_call_request_id: prev_data.last_call_request_id,
|
||||
restricted_streams: prev_data.restricted_streams,
|
||||
cid_store: prev_data.cid_store,
|
||||
cid_info: prev_data.cid_info,
|
||||
};
|
||||
|
||||
let current_ingredients = ExecCtxIngredients {
|
||||
global_streams: current_data.global_streams,
|
||||
last_call_request_id: current_data.last_call_request_id,
|
||||
restricted_streams: current_data.restricted_streams,
|
||||
cid_store: current_data.cid_store,
|
||||
cid_info: current_data.cid_info,
|
||||
};
|
||||
|
||||
let exec_ctx = make_exec_ctx(prev_ingredients, current_ingredients, call_results, run_parameters)?;
|
||||
|
351
air/tests/test_module/features/cid/canon.rs
Normal file
351
air/tests/test_module/features/cid/canon.rs
Normal file
@ -0,0 +1,351 @@
|
||||
/*
|
||||
* Copyright 2022 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air_interpreter_cid::CID;
|
||||
use air_interpreter_data::{CidStore, CidTracker};
|
||||
use air_test_framework::AirScriptExecutor;
|
||||
use air_test_utils::prelude::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn test_canon_ok() {
|
||||
let init_peer_id = "init_peer_id";
|
||||
|
||||
let script = format!(
|
||||
r#"(seq
|
||||
(seq
|
||||
(ap 42 $stream)
|
||||
(call "{init_peer_id}" ("serv" "func") [] $stream)) ; ok = "to canon"
|
||||
(canon "{init_peer_id}" $stream #canon)
|
||||
)"#
|
||||
);
|
||||
|
||||
let executor = AirScriptExecutor::simple(TestRunParameters::from_init_peer_id(init_peer_id), &script).unwrap();
|
||||
let result = executor.execute_one(init_peer_id).unwrap();
|
||||
let data = data_from_result(&result);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let expected_trace = vec![
|
||||
ap(0),
|
||||
stream_tracked("to canon", 1, &mut value_tracker),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": init_peer_id, "service_id": ""},
|
||||
"values": [{
|
||||
"result": 42,
|
||||
"tetraplet": {
|
||||
"function_name": "",
|
||||
"json_path": "",
|
||||
"peer_pk": init_peer_id,
|
||||
"service_id": "",
|
||||
},
|
||||
}, {
|
||||
"result": "to canon",
|
||||
"tetraplet": {
|
||||
"function_name": "func",
|
||||
"json_path": "",
|
||||
"peer_pk": init_peer_id,
|
||||
"service_id": "serv..0",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(&*data.trace, expected_trace);
|
||||
assert_eq!(data.cid_info.value_store, value_tracker.into());
|
||||
assert_eq!(data.cid_info.tetraplet_store, tetraplet_tracker.into());
|
||||
assert_eq!(data.cid_info.canon_store, canon_tracker.into());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_canon_ok_multi() {
|
||||
let init_peer_id = "init_peer_id";
|
||||
let other_peer_id = "other_peer_id";
|
||||
|
||||
let script = format!(
|
||||
r#"(seq
|
||||
(seq
|
||||
(call "{init_peer_id}" ("serv" "func") [] $stream) ; ok = "to canon"
|
||||
(call "{other_peer_id}" ("other_serv" "other_func") [] $stream) ; ok = "other"
|
||||
)
|
||||
(canon "{init_peer_id}" $stream #canon)
|
||||
)"#
|
||||
);
|
||||
|
||||
let executor = AirScriptExecutor::simple(TestRunParameters::from_init_peer_id(init_peer_id), &script).unwrap();
|
||||
let _result1 = executor.execute_one(init_peer_id).unwrap();
|
||||
let _result2 = executor.execute_one(other_peer_id).unwrap();
|
||||
let result3 = executor.execute_one(init_peer_id).unwrap();
|
||||
let data = data_from_result(&result3);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let expected_trace = vec![
|
||||
stream_tracked("to canon", 0, &mut value_tracker),
|
||||
stream_tracked("other", 1, &mut value_tracker),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": init_peer_id, "service_id": ""},
|
||||
"values": [{
|
||||
"result": "to canon",
|
||||
"tetraplet": {
|
||||
"function_name": "func",
|
||||
"json_path": "",
|
||||
"peer_pk": init_peer_id,
|
||||
"service_id": "serv..0",
|
||||
},
|
||||
}, {
|
||||
"result": "other",
|
||||
"tetraplet": {
|
||||
"function_name": "other_func",
|
||||
"json_path": "",
|
||||
"peer_pk": other_peer_id,
|
||||
"service_id": "other_serv..1",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(&*data.trace, expected_trace);
|
||||
assert_eq!(data.cid_info.value_store.len(), 2);
|
||||
assert_eq!(data.cid_info.value_store, value_tracker.into());
|
||||
assert_eq!(data.cid_info.tetraplet_store, tetraplet_tracker.into());
|
||||
assert_eq!(data.cid_info.canon_store, canon_tracker.into());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_canon_value_not_found() {
|
||||
let init_peer_id = "vm_peer_id";
|
||||
let mut vm = create_avm(echo_call_service(), init_peer_id);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let air_script = format!(
|
||||
r#"
|
||||
(seq
|
||||
(ap 42 $stream)
|
||||
(canon "{init_peer_id}" $stream #canon))"#
|
||||
);
|
||||
let trace = vec![
|
||||
ap(0),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": init_peer_id, "service_id": ""},
|
||||
"values": [{
|
||||
"result": 42,
|
||||
"tetraplet": {
|
||||
"function_name": "",
|
||||
"json_path": "",
|
||||
"peer_pk": init_peer_id,
|
||||
"service_id": "",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
let missing_cid = "bagaaieraondvznakk2hi3kfaixhnceatpykz7cikytniqo3lc7ogkgz2qbeq";
|
||||
let value_store: CidStore<_> = value_tracker.into();
|
||||
assert!(value_store.get(&CID::<_>::new(missing_cid)).is_some());
|
||||
|
||||
let cur_data = raw_data_from_trace_with_canon(trace, CidTracker::<_>::new(), tetraplet_tracker, canon_tracker);
|
||||
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
|
||||
|
||||
assert_eq!(result.ret_code, 20012);
|
||||
assert_eq!(
|
||||
result.error_message,
|
||||
format!("value for CID \"{missing_cid}\" not found")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_canon_root_tetraplet_not_found() {
|
||||
let init_peer_id = "vm_peer_id";
|
||||
let other_peer_id = "other_peer_id";
|
||||
let mut vm = create_avm(echo_call_service(), init_peer_id);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let air_script = format!(
|
||||
r#"
|
||||
(seq
|
||||
(ap 42 $stream)
|
||||
(canon "{other_peer_id}" $stream #canon))"#
|
||||
);
|
||||
let trace = vec![
|
||||
ap(0),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": init_peer_id, "service_id": ""},
|
||||
"values": [{
|
||||
"result": 42,
|
||||
"tetraplet": {
|
||||
"function_name": "",
|
||||
"json_path": "",
|
||||
"peer_pk": other_peer_id,
|
||||
"service_id": "",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
let missing_cid = "bagaaiera2bwoxisr5k7qlbzhxi2jmdqlgqybqgxcfwt3v652nqdo5fyc665q";
|
||||
let tetraplet_store: CidStore<_> = tetraplet_tracker.into();
|
||||
assert!(tetraplet_store.get(&CID::<_>::new(missing_cid)).is_some());
|
||||
|
||||
let mut fake_tetraplet_tracker = CidTracker::<_>::new();
|
||||
fake_tetraplet_tracker
|
||||
.record_value(SecurityTetraplet::literal_tetraplet(other_peer_id))
|
||||
.unwrap();
|
||||
|
||||
let cur_data = raw_data_from_trace_with_canon(trace, value_tracker, fake_tetraplet_tracker, canon_tracker);
|
||||
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
|
||||
|
||||
assert_eq!(result.ret_code, 20012);
|
||||
assert_eq!(
|
||||
result.error_message,
|
||||
format!("tetraplet for CID \"{missing_cid}\" not found")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_canon_tetraplet_not_found() {
|
||||
let init_peer_id = "vm_peer_id";
|
||||
let mut vm = create_avm(echo_call_service(), init_peer_id);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let air_script = format!(
|
||||
r#"
|
||||
(seq
|
||||
(call "peer_1" ("serv" "func") [] $stream)
|
||||
(canon "{init_peer_id}" $stream #canon))"#
|
||||
);
|
||||
let trace = vec![
|
||||
stream_tracked(42, 0, &mut value_tracker),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": init_peer_id, "service_id": ""},
|
||||
"values": [{
|
||||
"result": 42,
|
||||
"tetraplet": {
|
||||
"function_name": "func",
|
||||
"json_path": "",
|
||||
"peer_pk": "peer_1",
|
||||
"service_id": "serv",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
let missing_cid = "bagaaieracu6twiik6az3cosyzlplrscon3ek6rnu3lkjnflibphqkw6kcdiq";
|
||||
let tetraplet_store: CidStore<_> = tetraplet_tracker.into();
|
||||
assert!(tetraplet_store.get(&CID::<_>::new(missing_cid)).is_some());
|
||||
|
||||
let mut fake_tetraplet_tracker = CidTracker::<_>::new();
|
||||
fake_tetraplet_tracker
|
||||
.record_value(SecurityTetraplet::literal_tetraplet(init_peer_id))
|
||||
.unwrap();
|
||||
|
||||
let cur_data = raw_data_from_trace_with_canon(trace, value_tracker, fake_tetraplet_tracker, canon_tracker);
|
||||
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
|
||||
|
||||
assert_eq!(result.ret_code, 20012);
|
||||
assert_eq!(
|
||||
result.error_message,
|
||||
format!("tetraplet for CID \"{missing_cid}\" not found"),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_canon_agg_not_found() {
|
||||
let init_peer_id = "vm_peer_id";
|
||||
let mut vm = create_avm(echo_call_service(), init_peer_id);
|
||||
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
let air_script = format!(
|
||||
r#"
|
||||
(seq
|
||||
(ap 42 $stream)
|
||||
(canon "other_peer_id" $stream #canon))"#
|
||||
);
|
||||
let trace = vec![
|
||||
ap(0),
|
||||
canon_tracked(
|
||||
json!({
|
||||
"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "other_peer_id", "service_id": ""},
|
||||
"values": [{
|
||||
"result": 42,
|
||||
"tetraplet": {
|
||||
"function_name": "",
|
||||
"json_path": "",
|
||||
"peer_pk": init_peer_id,
|
||||
"service_id": "",
|
||||
},
|
||||
}]
|
||||
}),
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
),
|
||||
];
|
||||
|
||||
let missing_cid = "bagaaierapp2oi35ib4iveexfswax6jcf2zhj3e2ergzjyavm6m7stlzh23ta";
|
||||
let canon_store: CidStore<_> = canon_tracker.into();
|
||||
assert!(canon_store.get(&CID::<_>::new(missing_cid)).is_some());
|
||||
|
||||
let cur_data = raw_data_from_trace_with_canon(trace, value_tracker, tetraplet_tracker, <_>::default());
|
||||
let result = call_vm!(vm, <_>::default(), air_script, vec![], cur_data);
|
||||
|
||||
assert_eq!(result.ret_code, 20012);
|
||||
assert_eq!(
|
||||
result.error_message,
|
||||
format!("canon aggregate for CID \"{missing_cid}\" not found")
|
||||
);
|
||||
}
|
@ -14,6 +14,8 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
mod canon;
|
||||
|
||||
use air_interpreter_data::CidTracker;
|
||||
use air_test_framework::AirScriptExecutor;
|
||||
use air_test_utils::prelude::*;
|
||||
@ -36,7 +38,7 @@ fn test_missing_cid() {
|
||||
assert_eq!(result.ret_code, 20012);
|
||||
assert_eq!(
|
||||
result.error_message,
|
||||
"value for CID CID(\"bagaaieraondvznakk2hi3kfaixhnceatpykz7cikytniqo3lc7ogkgz2qbeq\") not found"
|
||||
"value for CID \"bagaaieraondvznakk2hi3kfaixhnceatpykz7cikytniqo3lc7ogkgz2qbeq\" not found"
|
||||
);
|
||||
}
|
||||
|
||||
@ -85,7 +87,7 @@ fn test_scalar_cid() {
|
||||
|
||||
assert_eq!(result.ret_code, 0);
|
||||
assert_eq!(data.trace, expected_trace);
|
||||
assert_eq!(data.cid_store, tracker.into());
|
||||
assert_eq!(data.cid_info.value_store, tracker.into());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -117,5 +119,5 @@ fn test_stream_cid() {
|
||||
|
||||
assert_eq!(result.ret_code, 0);
|
||||
assert_eq!(data.trace, expected_trace);
|
||||
assert_eq!(data.cid_store, tracker.into());
|
||||
assert_eq!(data.cid_info.value_store, tracker.into());
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ fn fold_merge() {
|
||||
ValueRef::Stream { cid, .. } => cid,
|
||||
};
|
||||
|
||||
let value = data.cid_store.get(cid).unwrap().clone();
|
||||
let value = data.cid_info.value_store.get(cid).unwrap().clone();
|
||||
if let JValue::String(ref var_name) = &*value {
|
||||
let current_count: usize = calls_count.get(var_name).copied().unwrap_or_default();
|
||||
calls_count.insert(var_name.to_owned(), current_count + 1);
|
||||
|
@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "air-interpreter-cid"
|
||||
description = "AIR interpreter CID util module"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Fluence Labs"]
|
||||
edition = "2018"
|
||||
license = "Apache-2.0"
|
||||
|
@ -29,13 +29,16 @@
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
|
||||
#[serde(transparent)]
|
||||
pub struct CID(String);
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
impl CID {
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct CID<T: ?Sized>(String, #[serde(skip)] PhantomData<*const T>);
|
||||
|
||||
impl<T: ?Sized> CID<T> {
|
||||
pub fn new(cid: impl Into<String>) -> Self {
|
||||
CID(cid.into())
|
||||
Self(cid.into(), PhantomData)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> String {
|
||||
@ -43,8 +46,29 @@ impl CID {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CID> for String {
|
||||
fn from(value: CID) -> Self {
|
||||
impl<T: ?Sized> fmt::Debug for CID<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_tuple("CID").field(&self.0).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Val> PartialEq for CID<Val> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0 == other.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<Val> Eq for CID<Val> {}
|
||||
|
||||
impl<Val> std::hash::Hash for CID<Val> {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
self.0.hash(state);
|
||||
self.1.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> From<CID<T>> for String {
|
||||
fn from(value: CID<T>) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
@ -52,7 +76,7 @@ impl From<CID> for String {
|
||||
// TODO we might refactor this to `SerializationFormat` trait
|
||||
// that both transform data to binary/text form (be it JSON, CBOR or something else)
|
||||
// and produces CID too
|
||||
pub fn json_data_cid(data: &[u8]) -> CID {
|
||||
pub fn json_data_cid<Val: ?Sized>(data: &[u8]) -> CID<Val> {
|
||||
use cid::Cid;
|
||||
use multihash::{Code, MultihashDigest};
|
||||
|
||||
@ -62,13 +86,11 @@ pub fn json_data_cid(data: &[u8]) -> CID {
|
||||
const JSON_CODEC: u64 = 0x0200;
|
||||
|
||||
let cid = Cid::new_v1(JSON_CODEC, digest);
|
||||
CID(cid.to_string())
|
||||
CID::new(cid.to_string())
|
||||
}
|
||||
|
||||
pub struct CidCalculationError(serde_json::Error);
|
||||
|
||||
use std::fmt;
|
||||
|
||||
impl fmt::Debug for CidCalculationError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.0, f)
|
||||
@ -94,7 +116,7 @@ impl std::error::Error for CidCalculationError {
|
||||
}
|
||||
|
||||
/// Calculate a CID of JSON-serialized value.
|
||||
pub fn value_to_json_cid<Val: Serialize>(value: &Val) -> Result<CID, CidCalculationError> {
|
||||
pub fn value_to_json_cid<Val: Serialize>(value: &Val) -> Result<CID<Val>, CidCalculationError> {
|
||||
let data = serde_json::to_vec(value)?;
|
||||
Ok(json_data_cid(&data))
|
||||
}
|
||||
|
@ -1,5 +1,14 @@
|
||||
## Version 0.6.0
|
||||
[PR 419](https://github.com/fluencelabs/aquavm/pull/419):
|
||||
- Rename data's `cid_store` field to `value_store`.
|
||||
- Canon data is stored with CIDs. Values, tetraplets and canon elements
|
||||
are stored as CIDs resolved with data's `value_store`, `tetraplet_store`
|
||||
and `canon_store` fields respectively.
|
||||
- Group stores in the data into `cid_info: CidInfo` field.
|
||||
|
||||
## Version 0.5.0
|
||||
|
||||
[PR 401](https://github.com/fluencelabs/aquavm/pull/401):
|
||||
- Call result values are stored as CIDs in the data trace. These CIDs refer
|
||||
to a new `cid_store` data's field that maps a CID string to a value.
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "air-interpreter-data"
|
||||
description = "Data format of the AIR interpreter"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
authors = ["Fluence Labs"]
|
||||
edition = "2018"
|
||||
license = "Apache-2.0"
|
||||
@ -18,7 +18,8 @@ air-utils = { path = "../utils" }
|
||||
air-parser = { path = "../air-parser" }
|
||||
# TODO version?
|
||||
air-interpreter-interface = { path = "../interpreter-interface" }
|
||||
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
|
||||
air-interpreter-cid = { version = "0.2.0", path = "../interpreter-cid" }
|
||||
polyplets = { path = "../polyplets" }
|
||||
|
||||
serde = {version = "1.0.152", features = ["derive", "rc"]}
|
||||
serde_json = "1.0.91"
|
||||
|
@ -27,16 +27,24 @@ use std::{collections::HashMap, rc::Rc};
|
||||
/// Stores CID to Value corresponance.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(transparent)]
|
||||
pub struct CidStore<Val>(HashMap<Rc<CID>, Rc<Val>>);
|
||||
pub struct CidStore<Val>(HashMap<Rc<CID<Val>>, Rc<Val>>);
|
||||
|
||||
impl<Val> CidStore<Val> {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn get(&self, cid: &CID) -> Option<Rc<Val>> {
|
||||
pub fn get(&self, cid: &CID<Val>) -> Option<Rc<Val>> {
|
||||
self.0.get(cid).cloned()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Val> Default for CidStore<Val> {
|
||||
@ -47,7 +55,7 @@ impl<Val> Default for CidStore<Val> {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CidTracker<Val = JValue> {
|
||||
cids: HashMap<Rc<CID>, Rc<Val>>,
|
||||
cids: HashMap<Rc<CID<Val>>, Rc<Val>>,
|
||||
}
|
||||
|
||||
impl<Val> CidTracker<Val> {
|
||||
@ -64,7 +72,7 @@ impl<Val> CidTracker<Val> {
|
||||
Self { cids }
|
||||
}
|
||||
|
||||
pub fn get(&self, cid: &CID) -> Option<Rc<Val>> {
|
||||
pub fn get(&self, cid: &CID<Val>) -> Option<Rc<Val>> {
|
||||
self.cids.get(cid).cloned()
|
||||
}
|
||||
}
|
||||
@ -73,9 +81,9 @@ impl<Val: Serialize> CidTracker<Val> {
|
||||
pub fn record_value(
|
||||
&mut self,
|
||||
value: impl Into<Rc<Val>>,
|
||||
) -> Result<Rc<CID>, CidCalculationError> {
|
||||
) -> Result<Rc<CID<Val>>, CidCalculationError> {
|
||||
let value = value.into();
|
||||
let cid = Rc::new(value_to_json_cid(&value)?);
|
||||
let cid = Rc::new(value_to_json_cid(&*value)?);
|
||||
self.cids.insert(cid.clone(), value);
|
||||
Ok(cid)
|
||||
}
|
||||
@ -96,9 +104,9 @@ impl<Val> From<CidTracker<Val>> for CidStore<Val> {
|
||||
}
|
||||
|
||||
impl<Val> IntoIterator for CidStore<Val> {
|
||||
type Item = (Rc<CID>, Rc<Val>);
|
||||
type Item = (Rc<CID<Val>>, Rc<Val>);
|
||||
|
||||
type IntoIter = std::collections::hash_map::IntoIter<Rc<CID>, Rc<Val>>;
|
||||
type IntoIter = std::collections::hash_map::IntoIter<Rc<CID<Val>>, Rc<Val>>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.0.into_iter()
|
||||
|
@ -21,6 +21,7 @@ use crate::JValue;
|
||||
use crate::TracePos;
|
||||
|
||||
use air_interpreter_cid::CID;
|
||||
use polyplets::SecurityTetraplet;
|
||||
use se_de::par_serializer;
|
||||
use se_de::sender_serializer;
|
||||
use serde::Deserialize;
|
||||
@ -60,8 +61,11 @@ pub enum CallResult {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ValueRef {
|
||||
Scalar(Rc<CID>),
|
||||
Stream { cid: Rc<CID>, generation: u32 },
|
||||
Scalar(Rc<CID<JValue>>),
|
||||
Stream {
|
||||
cid: Rc<CID<JValue>>,
|
||||
generation: u32,
|
||||
},
|
||||
}
|
||||
|
||||
/// Let's consider an example of trace that could be produces by the following fold:
|
||||
@ -127,10 +131,18 @@ pub struct ApResult {
|
||||
}
|
||||
|
||||
/// Contains ids of element that were on a stream at the moment of an appropriate canon call.
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct CanonResult {
|
||||
pub canonicalized_element: JValue,
|
||||
pub tetraplet: Rc<CID<SecurityTetraplet>>,
|
||||
pub values: Vec<Rc<CID<CanonCidAggregate>>>,
|
||||
}
|
||||
|
||||
/// The type Canon trace CID refers to.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct CanonCidAggregate {
|
||||
pub value: Rc<CID<serde_json::Value>>,
|
||||
pub tetraplet: Rc<CID<SecurityTetraplet>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::JValue;
|
||||
|
||||
impl ParResult {
|
||||
pub fn new(left_size: u32, right_size: u32) -> Self {
|
||||
@ -41,13 +42,13 @@ impl CallResult {
|
||||
CallResult::RequestSentBy(Sender::PeerIdWithCallId { peer_id, call_id })
|
||||
}
|
||||
|
||||
pub fn executed_scalar(cid: Rc<CID>) -> CallResult {
|
||||
pub fn executed_scalar(cid: Rc<CID<JValue>>) -> CallResult {
|
||||
let value = ValueRef::Scalar(cid);
|
||||
|
||||
CallResult::Executed(value)
|
||||
}
|
||||
|
||||
pub fn executed_stream(cid: Rc<CID>, generation: u32) -> CallResult {
|
||||
pub fn executed_stream(cid: Rc<CID<JValue>>, generation: u32) -> CallResult {
|
||||
let value = ValueRef::Stream { cid, generation };
|
||||
|
||||
CallResult::Executed(value)
|
||||
@ -87,10 +88,11 @@ impl ApResult {
|
||||
}
|
||||
|
||||
impl CanonResult {
|
||||
pub fn new(canonicalized_element: JValue) -> Self {
|
||||
Self {
|
||||
canonicalized_element,
|
||||
}
|
||||
pub fn new(
|
||||
tetraplet: Rc<CID<SecurityTetraplet>>,
|
||||
values: Vec<Rc<CID<CanonCidAggregate>>>,
|
||||
) -> Self {
|
||||
Self { tetraplet, values }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,10 +17,12 @@
|
||||
use super::GlobalStreamGens;
|
||||
use super::RestrictedStreamGens;
|
||||
use crate::cid_store::CidStore;
|
||||
use crate::CanonCidAggregate;
|
||||
use crate::ExecutionTrace;
|
||||
use crate::JValue;
|
||||
|
||||
use air_utils::measure;
|
||||
use polyplets::SecurityTetraplet;
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@ -57,8 +59,8 @@ pub struct InterpreterData {
|
||||
/// Version of interpreter produced this data.
|
||||
pub interpreter_version: semver::Version,
|
||||
|
||||
/// Map CID to values
|
||||
pub cid_store: CidStore<JValue>,
|
||||
/// CID-to-somethings mappings.
|
||||
pub cid_info: CidInfo,
|
||||
}
|
||||
|
||||
impl InterpreterData {
|
||||
@ -70,20 +72,19 @@ impl InterpreterData {
|
||||
last_call_request_id: 0,
|
||||
restricted_streams: RestrictedStreamGens::new(),
|
||||
interpreter_version,
|
||||
cid_store: <_>::default(),
|
||||
cid_info: <_>::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn from_execution_result(
|
||||
trace: ExecutionTrace,
|
||||
streams: GlobalStreamGens,
|
||||
restricted_streams: RestrictedStreamGens,
|
||||
cid_store: impl Into<CidStore<JValue>>,
|
||||
cid_info: CidInfo,
|
||||
last_call_request_id: u32,
|
||||
interpreter_version: semver::Version,
|
||||
) -> Self {
|
||||
let cid_store = cid_store.into();
|
||||
|
||||
Self {
|
||||
trace,
|
||||
global_streams: streams,
|
||||
@ -91,7 +92,7 @@ impl InterpreterData {
|
||||
last_call_request_id,
|
||||
restricted_streams,
|
||||
interpreter_version,
|
||||
cid_store,
|
||||
cid_info,
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,6 +115,18 @@ impl InterpreterData {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct CidInfo {
|
||||
/// Map CID to value
|
||||
pub value_store: CidStore<JValue>,
|
||||
|
||||
/// Map CID to a tetraplet
|
||||
pub tetraplet_store: CidStore<SecurityTetraplet>,
|
||||
|
||||
/// Map CID to a canon value
|
||||
pub canon_store: CidStore<CanonCidAggregate>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -15,7 +15,7 @@ path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
air = { path = "../../../air" }
|
||||
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
|
||||
air-interpreter-cid = { version = "0.2.0", path = "../interpreter-cid" }
|
||||
air-interpreter-data = { path = "../interpreter-data" }
|
||||
air-interpreter-interface = { path = "../interpreter-interface" }
|
||||
avm-interface = { path = "../../../avm/interface" }
|
||||
@ -27,6 +27,7 @@ object-pool = "0.5.4"
|
||||
once_cell = "1.17.0"
|
||||
semver = "1.0.16"
|
||||
serde_json = "1.0.91"
|
||||
serde = { version = "1.0.151", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
maplit = "1.0.2"
|
||||
|
@ -29,7 +29,11 @@ use crate::FoldSubTraceLore;
|
||||
use crate::SubTraceDesc;
|
||||
|
||||
use air_interpreter_cid::value_to_json_cid;
|
||||
use air_interpreter_data::CanonCidAggregate;
|
||||
use air_interpreter_data::CidTracker;
|
||||
use avm_server::SecurityTetraplet;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
@ -158,7 +162,62 @@ pub fn ap(generation: u32) -> ExecutedState {
|
||||
ExecutedState::Ap(ap_result)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ValueAggregateAlike {
|
||||
pub result: Rc<JValue>,
|
||||
pub tetraplet: Rc<SecurityTetraplet>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct CanonResultAlike {
|
||||
pub tetraplet: Rc<SecurityTetraplet>,
|
||||
pub values: Vec<ValueAggregateAlike>,
|
||||
}
|
||||
|
||||
/// This function takes a JSON DSL-like struct for compatibility and test writer
|
||||
/// convenience.
|
||||
pub fn canon(canonicalized_element: JValue) -> ExecutedState {
|
||||
let canon_result = CanonResult::new(canonicalized_element);
|
||||
let mut value_tracker = CidTracker::<JValue>::new();
|
||||
let mut tetraplet_tracker = CidTracker::<SecurityTetraplet>::new();
|
||||
let mut canon_tracker = CidTracker::<CanonCidAggregate>::new();
|
||||
|
||||
canon_tracked(
|
||||
canonicalized_element,
|
||||
&mut value_tracker,
|
||||
&mut tetraplet_tracker,
|
||||
&mut canon_tracker,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn canon_tracked(
|
||||
canonicalized_element: JValue,
|
||||
value_tracker: &mut CidTracker<JValue>,
|
||||
tetraplet_tracker: &mut CidTracker<SecurityTetraplet>,
|
||||
canon_tracker: &mut CidTracker<CanonCidAggregate>,
|
||||
) -> ExecutedState {
|
||||
let canon_input = serde_json::from_value::<CanonResultAlike>(canonicalized_element)
|
||||
.expect("Malformed canon input");
|
||||
let tetraplet_cid = tetraplet_tracker
|
||||
.record_value(canon_input.tetraplet.clone())
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"{:?}: failed to compute CID of {:?}",
|
||||
e, canon_input.tetraplet
|
||||
)
|
||||
});
|
||||
let value_cids = canon_input
|
||||
.values
|
||||
.iter()
|
||||
.map(|value| {
|
||||
let value_cid = value_tracker.record_value(value.result.clone())?;
|
||||
let tetraplet_cid = tetraplet_tracker.record_value(value.tetraplet.clone())?;
|
||||
canon_tracker.record_value(CanonCidAggregate {
|
||||
value: value_cid,
|
||||
tetraplet: tetraplet_cid,
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap_or_else(|e| panic!("{:?}: failed to compute CID of {:?}", e, canon_input.values));
|
||||
let canon_result = CanonResult::new(tetraplet_cid, value_cids);
|
||||
ExecutedState::Canon(canon_result)
|
||||
}
|
||||
|
@ -36,6 +36,9 @@ mod native_test_runner;
|
||||
mod wasm_test_runner;
|
||||
|
||||
pub use air::interpreter_data::*;
|
||||
use air_interpreter_data::CanonCidAggregate;
|
||||
use air_interpreter_data::CidStore;
|
||||
use air_interpreter_data::CidTracker;
|
||||
pub use avm_interface::raw_outcome::*;
|
||||
pub use avm_server::*;
|
||||
|
||||
@ -90,12 +93,37 @@ pub fn data_from_result(result: &RawAVMOutcome) -> InterpreterData {
|
||||
serde_json::from_slice(&result.data).expect("default serializer shouldn't fail")
|
||||
}
|
||||
|
||||
pub fn raw_data_from_trace(trace: impl Into<ExecutionTrace>, cid_tracker: CidTracker) -> Vec<u8> {
|
||||
pub fn raw_data_from_trace(trace: impl Into<ExecutionTrace>, value_tracker: CidTracker) -> Vec<u8> {
|
||||
let data = InterpreterData::from_execution_result(
|
||||
trace.into(),
|
||||
<_>::default(),
|
||||
<_>::default(),
|
||||
cid_tracker,
|
||||
CidInfo {
|
||||
value_store: value_tracker.into(),
|
||||
tetraplet_store: CidStore::<_>::default(),
|
||||
canon_store: CidStore::<_>::default(),
|
||||
},
|
||||
0,
|
||||
semver::Version::new(1, 1, 1),
|
||||
);
|
||||
serde_json::to_vec(&data).expect("default serializer shouldn't fail")
|
||||
}
|
||||
|
||||
pub fn raw_data_from_trace_with_canon(
|
||||
trace: impl Into<ExecutionTrace>,
|
||||
value_tracker: CidTracker,
|
||||
tetraplet_tracker: CidTracker<SecurityTetraplet>,
|
||||
canon_tracker: CidTracker<CanonCidAggregate>,
|
||||
) -> Vec<u8> {
|
||||
let data = InterpreterData::from_execution_result(
|
||||
trace.into(),
|
||||
<_>::default(),
|
||||
<_>::default(),
|
||||
CidInfo {
|
||||
value_store: value_tracker.into(),
|
||||
tetraplet_store: tetraplet_tracker.into(),
|
||||
canon_store: canon_tracker.into(),
|
||||
},
|
||||
0,
|
||||
semver::Version::new(1, 1, 1),
|
||||
);
|
||||
|
@ -14,11 +14,12 @@ name = "air_trace_handler"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
air-interpreter-cid = { version = "0.1.0", path = "../interpreter-cid" }
|
||||
air-interpreter-cid = { version = "0.2.0", path = "../interpreter-cid" }
|
||||
air-interpreter-data = { path = "../interpreter-data" }
|
||||
air-interpreter-interface = { path = "../interpreter-interface" }
|
||||
air-log-targets = { path = "../log-targets" }
|
||||
air-parser = { path = "../air-parser" }
|
||||
polyplets = { path = "../polyplets" }
|
||||
|
||||
bimap = "0.6.2"
|
||||
serde_json = "1.0.91"
|
||||
|
@ -17,6 +17,7 @@
|
||||
use super::*;
|
||||
|
||||
use air_interpreter_cid::CID;
|
||||
use serde_json::Value as JValue;
|
||||
|
||||
pub(super) fn merge_executed(prev_value: ValueRef, current_value: ValueRef) -> MergeResult<CallResult> {
|
||||
match (&prev_value, ¤t_value) {
|
||||
@ -44,8 +45,8 @@ fn are_scalars_equal(prev_value: &ValueRef, current_value: &ValueRef) -> MergeRe
|
||||
}
|
||||
|
||||
fn are_streams_equal(
|
||||
prev_result_value: &CID,
|
||||
current_result_value: &CID,
|
||||
prev_result_value: &CID<JValue>,
|
||||
current_result_value: &CID<JValue>,
|
||||
prev_value: &ValueRef,
|
||||
current_value: &ValueRef,
|
||||
) -> MergeResult<()> {
|
||||
|
@ -17,7 +17,10 @@
|
||||
use super::*;
|
||||
use crate::merger::errors::CanonResultError;
|
||||
|
||||
use serde_json::Value as JValue;
|
||||
use air_interpreter_cid::CID;
|
||||
use polyplets::SecurityTetraplet;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
const EXPECTED_STATE_NAME: &str = "canon";
|
||||
|
||||
@ -28,7 +31,10 @@ pub enum MergerCanonResult {
|
||||
|
||||
/// There was a state in at least one of the contexts. If there were two states in
|
||||
/// both contexts, they were successfully merged.
|
||||
CanonResult { canonicalized_element: JValue },
|
||||
CanonResult {
|
||||
tetraplet: Rc<CID<SecurityTetraplet>>,
|
||||
values: Vec<Rc<CID<CanonCidAggregate>>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) fn try_merge_next_state_as_canon(data_keeper: &mut DataKeeper) -> MergeResult<MergerCanonResult> {
|
||||
@ -60,7 +66,8 @@ fn prepare_both_canon_result(
|
||||
|
||||
fn prepare_single_canon_result(canon_result: CanonResult) -> MergeResult<MergerCanonResult> {
|
||||
Ok(MergerCanonResult::CanonResult {
|
||||
canonicalized_element: canon_result.canonicalized_element,
|
||||
tetraplet: canon_result.tetraplet,
|
||||
values: canon_result.values,
|
||||
})
|
||||
}
|
||||
|
||||
@ -68,7 +75,7 @@ fn check_canon_results(
|
||||
prev_canon_result: &CanonResult,
|
||||
current_canon_result: &CanonResult,
|
||||
) -> Result<(), CanonResultError> {
|
||||
if prev_canon_result.canonicalized_element != current_canon_result.canonicalized_element {
|
||||
if prev_canon_result != current_canon_result {
|
||||
return Err(CanonResultError::incompatible_state(
|
||||
prev_canon_result.clone(),
|
||||
current_canon_result.clone(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user