1
0
mirror of https://github.com/fluencelabs/marine.git synced 2025-03-15 14:00:50 +00:00

Use Mutex, simplify everything

This commit is contained in:
folex 2020-12-25 15:16:08 +03:00
parent 8280b7ec2f
commit e08aca0ba4
5 changed files with 75 additions and 62 deletions

52
Cargo.lock generated

@ -46,6 +46,7 @@ dependencies = [
"fluence-faas",
"log",
"maplit",
"parking_lot 0.11.1",
"serde",
"serde_json",
"stepper-interface",
@ -1203,6 +1204,15 @@ dependencies = [
"safe-transmute",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "inventory"
version = "0.1.10"
@ -1338,6 +1348,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.11"
@ -1639,8 +1658,19 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
"lock_api",
"parking_lot_core",
"lock_api 0.3.4",
"parking_lot_core 0.7.2",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api 0.4.2",
"parking_lot_core 0.8.2",
]
[[package]]
@ -1657,6 +1687,20 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -2848,7 +2892,7 @@ dependencies = [
"libc",
"nix 0.15.0",
"page_size",
"parking_lot",
"parking_lot 0.10.2",
"rustc_version",
"serde",
"serde-bench",
@ -2877,7 +2921,7 @@ dependencies = [
"libc",
"nix 0.15.0",
"page_size",
"parking_lot",
"parking_lot 0.10.2",
"rustc_version",
"serde",
"serde-bench",

@ -18,6 +18,7 @@ maplit = "1.0.2"
serde_json = "1.0.60"
serde = "1.0.118"
log = "0.4.11"
parking_lot = "0.11.1"
[features]
# enable raw AquamarineVM API intended for testing

@ -25,10 +25,9 @@ use fluence_faas::IValue;
use stepper_interface::StepperOutcome;
use std::path::PathBuf;
use std::cell::{RefCell};
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::borrow::Cow;
use std::sync::Arc;
use parking_lot::{Mutex};
const CALL_SERVICE_NAME: &str = "call_service";
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
@ -49,9 +48,9 @@ impl DerefMut for SendSafeFaaS {
}
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct ParticleParams {
pub init_peer_id: String,
pub init_user_id: String,
pub particle_id: String,
}
@ -60,7 +59,8 @@ pub struct AquamarineVM {
particle_data_store: PathBuf,
/// file name of the AIR interpreter .wasm
wasm_filename: String,
current_particle: Rc<RefCell<ParticleParams>>,
/// information about the particle that is being executed at the moment
current_particle: Arc<Mutex<ParticleParams>>,
}
impl AquamarineVM {
@ -68,20 +68,15 @@ impl AquamarineVM {
pub fn new(config: AquamarineVMConfig) -> Result<Self> {
use AquamarineVMError::InvalidDataStorePath;
let current_particle: Rc<RefCell<ParticleParams>> = <_>::default();
let current_particle: Arc<Mutex<ParticleParams>> = <_>::default();
let call_service = config.call_service;
let params = current_particle.clone();
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
let params = params.deref().try_borrow();
match params {
Ok(params) => call_service(params.deref(), ivalues),
Err(err) => {
let err = AquamarineVMError::BorrowParticleParams(err);
log::error!("UNEXPECTED: {}", err);
// TODO: return error to the stepper?
None
}
}
let params = {
let lock = params.lock();
lock.deref().clone()
};
call_service(params, ivalues)
});
let import_descriptor = HostImportDescriptor {
host_exported_func: call_service_closure,
@ -120,32 +115,28 @@ impl AquamarineVM {
init_user_id: impl Into<String>,
aqua: impl Into<String>,
data: impl Into<Vec<u8>>,
particle_id: Cow<'_, str>,
particle_id: impl Into<String>,
) -> Result<StepperOutcome> {
use AquamarineVMError::PersistDataError;
match self.current_particle.try_borrow_mut() {
Ok(mut params) => params.particle_id = particle_id.to_string(),
Err(err) => {
let err = AquamarineVMError::BorrowMutParticleParams(err);
log::error!("UNEXPECTED: {}", err);
return Err(err);
}
}
let particle_id = particle_id.into();
let init_user_id = init_user_id.into();
let prev_data_path = self.particle_data_store.join(particle_id.deref());
let prev_data_path = self.particle_data_store.join(&particle_id);
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or_default();
let prev_data = into_ibytes_array(prev_data.into_bytes());
let data = into_ibytes_array(data.into());
let args = vec![
IValue::String(init_user_id.into()),
IValue::String(init_user_id.clone()),
IValue::String(aqua.into()),
IValue::Array(prev_data),
IValue::Array(data),
];
self.update_current_particle(particle_id, init_user_id);
let result =
self.faas
.call_with_ivalues(&self.wasm_filename, "invoke", &args, <_>::default())?;
@ -159,6 +150,12 @@ impl AquamarineVM {
Ok(outcome)
}
pub fn update_current_particle(&self, particle_id: String, init_user_id: String) {
let mut params = self.current_particle.lock();
params.particle_id = particle_id;
params.init_user_id = init_user_id;
}
}
/// Splits given path into its directory and file stem

@ -19,7 +19,7 @@ use std::path::PathBuf;
use crate::aquamarine_stepper_vm::ParticleParams;
use crate::IValue;
type CallServiceClosure = Box<dyn Fn(&ParticleParams, Vec<IValue>) -> Option<IValue> + 'static>;
type CallServiceClosure = Box<dyn Fn(ParticleParams, Vec<IValue>) -> Option<IValue> + 'static>;
/// Describes behaviour of the Aquamarine VM stepper.
pub struct AquamarineVMConfig {

@ -19,7 +19,6 @@ use fluence_faas::FaaSError;
use std::io::Error as IOError;
use std::error::Error;
use std::path::PathBuf;
use std::cell::{BorrowError, BorrowMutError};
#[derive(Debug)]
pub enum AquamarineVMError {
@ -41,18 +40,6 @@ pub enum AquamarineVMError {
io_error: Option<IOError>,
reason: &'static str,
},
/// [[BorrowError]] and [[BorrowMutError] are very serious errors, and shouldn't happen. If happened,
/// that means RefCell invariants are broken we have borrowed [[AquamarineVm::current_particle]] twice.
/// Running in several threads? I don't know how else that could be possible.
/// But if that happened, chances are that we must avoid using RefCell,
/// and switch to [[parking_lot::Mutex]] or [[crossbeam_utils::AtomicCell]]
/// try_borrow failed on [[AquamarineVm::current_particle]]
BorrowParticleParams(BorrowError),
/// try_borrow_mut failed on [[AquamarineVm::current_particle]]
BorrowMutParticleParams(BorrowMutError),
}
impl Error for AquamarineVMError {}
@ -82,22 +69,6 @@ impl std::fmt::Display for AquamarineVMError {
"path to AIR interpreter .wasm ({:?}) is invalid: {}; IO Error: {:?}",
invalid_path, reason, io_error
),
AquamarineVMError::BorrowMutParticleParams(err) => {
write!(
f,
"RefCell::try_borrow_mut failed for AquamarineVm::current_particle: {}",
err
)
}
AquamarineVMError::BorrowParticleParams(err) => {
write!(
f,
"RefCell::try_borrow failed for AquamarineVm::current_particle: {}",
err
)
}
}
}
}