Store ParticleParams in AquamarineVM, pass them to call_service

This commit is contained in:
folex 2020-12-25 14:34:19 +03:00
parent f022a2dec4
commit e3f66d0ecd
15 changed files with 116 additions and 39 deletions

59
Cargo.lock generated
View File

@ -44,6 +44,7 @@ name = "aquamarine-vm"
version = "0.1.6"
dependencies = [
"fluence-faas",
"log",
"maplit",
"serde",
"serde_json",
@ -722,15 +723,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "fluence"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence"
version = "0.2.13"
@ -741,6 +733,15 @@ dependencies = [
"fluence-sdk-main 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fluence"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"fluence-sdk-main 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence-app-service"
version = "0.1.18"
@ -779,14 +780,6 @@ dependencies = [
"wasmer-wasi-fl",
]
[[package]]
name = "fluence-sdk-macro"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
name = "fluence-sdk-macro"
version = "0.2.13"
@ -797,13 +790,11 @@ dependencies = [
]
[[package]]
name = "fluence-sdk-main"
name = "fluence-sdk-macro"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"log",
"serde",
"fluence-sdk-wit 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
]
[[package]]
@ -818,9 +809,20 @@ dependencies = [
]
[[package]]
name = "fluence-sdk-wit"
name = "fluence-sdk-main"
version = "0.2.13"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"fluence-sdk-macro 0.2.13 (git+https://github.com/fluencelabs/rust-sdk)",
"log",
"serde",
]
[[package]]
name = "fluence-sdk-wit"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a"
dependencies = [
"proc-macro2",
"quote",
@ -833,8 +835,7 @@ dependencies = [
[[package]]
name = "fluence-sdk-wit"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da5f51cddeae52ff5b91d1a5d8be90e54629f4887f89f8d7501b829b374fe6a"
source = "git+https://github.com/fluencelabs/rust-sdk#71591f412cb65879d74e8c38838e827ab74d8802"
dependencies = [
"proc-macro2",
"quote",
@ -1576,9 +1577,9 @@ checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]]
name = "openssl"
version = "0.10.31"
version = "0.10.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187"
checksum = "038d43985d1ddca7a9900630d8cd031b56e4794eecc2e9ea39dd17aa04399a70"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
@ -1596,9 +1597,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "openssl-sys"
version = "0.9.59"
version = "0.9.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe"
checksum = "921fc71883267538946025deffb622905ecad223c28efbfdef9bb59a0175f3e6"
dependencies = [
"autocfg",
"cc",
@ -2205,7 +2206,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stepper-interface"
version = "0.1.0"
source = "git+https://github.com/fluencelabs/aquamarine#724117547205d8ccc742d325b542af8f3df801b8"
source = "git+https://github.com/fluencelabs/aquamarine?branch=master#5cb4cc0fb0c149a4bd3160dd0bac9c3d5ac3db7d"
dependencies = [
"fluence 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"serde",

View File

@ -15,8 +15,9 @@ fluence-faas = { path = "../fluence-faas", version = "0.1.18" }
stepper-interface = { git = "https://github.com/fluencelabs/aquamarine", branch = "master" }
maplit = "1.0.2"
serde_json = "1.0.57"
serde = "1.0.116"
serde_json = "1.0.60"
serde = "1.0.118"
log = "0.4.11"
[features]
# enable raw AquamarineVM API intended for testing

View File

@ -14,29 +14,39 @@
* limitations under the License.
*/
use crate::Result;
use crate::{Result, IType};
use crate::AquamarineVMError;
use crate::config::AquamarineVMConfig;
use fluence_faas::FaaSConfig;
use fluence_faas::{FaaSConfig, HostExportedFunc};
use fluence_faas::FluenceFaaS;
use fluence_faas::HostImportDescriptor;
use fluence_faas::IValue;
use stepper_interface::StepperOutcome;
use std::path::PathBuf;
use std::path::Path;
use std::cell::{RefCell};
use std::rc::Rc;
use std::ops::Deref;
use std::borrow::Cow;
const CALL_SERVICE_NAME: &str = "call_service";
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
unsafe impl Send for AquamarineVM {}
#[derive(Debug, Default)]
pub struct ParticleParams {
pub init_peer_id: String,
pub particle_id: String,
}
pub struct AquamarineVM {
faas: FluenceFaaS,
particle_data_store: PathBuf,
/// file name of the AIR interpreter .wasm
wasm_filename: String,
current_particle: Rc<RefCell<ParticleParams>>,
}
impl AquamarineVM {
@ -44,12 +54,34 @@ impl AquamarineVM {
pub fn new(config: AquamarineVMConfig) -> Result<Self> {
use AquamarineVMError::InvalidDataStorePath;
let current_particle: Rc<RefCell<ParticleParams>> = <_>::default();
let call_service = config.call_service;
let params = current_particle.clone();
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
let params = params.deref().try_borrow();
match params {
Ok(params) => call_service(params.deref(), ivalues),
Err(err) => {
let err = AquamarineVMError::BorrowParticleParams(err);
log::error!("UNEXPECTED: {}", err);
// TODO: return error to the stepper?
None
}
}
});
let import_descriptor = HostImportDescriptor {
host_exported_func: call_service_closure,
argument_types: vec![IType::String, IType::String, IType::String, IType::String],
output_type: Some(IType::Record(0)),
error_handler: None,
};
let (wasm_dir, wasm_filename) = split_dirname(config.aquamarine_wasm_path)?;
let faas_config = make_faas_config(
wasm_dir,
&wasm_filename,
config.call_service,
import_descriptor,
config.current_peer_id,
config.logging_mask,
);
@ -63,6 +95,7 @@ impl AquamarineVM {
faas,
particle_data_store,
wasm_filename,
current_particle,
};
Ok(aqua_vm)
@ -73,11 +106,20 @@ impl AquamarineVM {
init_user_id: impl Into<String>,
aqua: impl Into<String>,
data: impl Into<Vec<u8>>,
particle_id: impl AsRef<Path>,
particle_id: Cow<'_, str>,
) -> Result<StepperOutcome> {
use AquamarineVMError::PersistDataError;
let prev_data_path = self.particle_data_store.join(particle_id);
match self.current_particle.try_borrow_mut() {
Ok(mut params) => params.particle_id = particle_id.to_string(),
Err(err) => {
let err = AquamarineVMError::BorrowMutParticleParams(err);
log::error!("UNEXPECTED: {}", err);
return Err(err);
}
}
let prev_data_path = self.particle_data_store.join(particle_id.deref());
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or_default();

View File

@ -14,16 +14,20 @@
* limitations under the License.
*/
use fluence_faas::HostImportDescriptor;
use std::path::PathBuf;
use crate::aquamarine_stepper_vm::ParticleParams;
use crate::IValue;
type CallServiceClosure = Box<dyn Fn(&ParticleParams, Vec<IValue>) -> Option<IValue> + 'static>;
/// Describes behaviour of the Aquamarine VM stepper.
pub struct AquamarineVMConfig {
/// Path to a aquamarine stepper Wasm file.
pub aquamarine_wasm_path: PathBuf,
/// Descriptor of a closure that will be invoked on call_service call from Aquamarine stepper.
pub call_service: HostImportDescriptor,
pub call_service: CallServiceClosure,
/// Current peer id.
pub current_peer_id: String,

View File

@ -19,6 +19,7 @@ use fluence_faas::FaaSError;
use std::io::Error as IOError;
use std::error::Error;
use std::path::PathBuf;
use std::cell::{BorrowError, BorrowMutError};
#[derive(Debug)]
pub enum AquamarineVMError {
@ -40,6 +41,18 @@ pub enum AquamarineVMError {
io_error: Option<IOError>,
reason: &'static str,
},
/// [[BorrowError]] and [[BorrowMutError] are very serious errors, and shouldn't happen. If happened,
/// that means RefCell invariants are broken we have borrowed [[AquamarineVm::current_particle]] twice.
/// Running in several threads? I don't know how else that could be possible.
/// But if that happened, chances are that we must avoid using RefCell,
/// and switch to [[parking_lot::Mutex]] or [[crossbeam_utils::AtomicCell]]
/// try_borrow failed on [[AquamarineVm::current_particle]]
BorrowParticleParams(BorrowError),
/// try_borrow_mut failed on [[AquamarineVm::current_particle]]
BorrowMutParticleParams(BorrowMutError),
}
impl Error for AquamarineVMError {}
@ -69,6 +82,22 @@ impl std::fmt::Display for AquamarineVMError {
"path to AIR interpreter .wasm ({:?}) is invalid: {}; IO Error: {:?}",
invalid_path, reason, io_error
),
AquamarineVMError::BorrowMutParticleParams(err) => {
write!(
f,
"RefCell::try_borrow_mut failed for AquamarineVm::current_particle: {}",
err
)
}
AquamarineVMError::BorrowParticleParams(err) => {
write!(
f,
"RefCell::try_borrow failed for AquamarineVm::current_particle: {}",
err
)
}
}
}
}