move some code to functions

This commit is contained in:
folex 2020-12-25 16:25:33 +03:00
parent 11d4af0dd2
commit 836693d08a

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::{Result, IType}; use crate::{Result, IType, CallServiceClosure};
use crate::AquamarineVMError; use crate::AquamarineVMError;
use crate::config::AquamarineVMConfig; use crate::config::AquamarineVMConfig;
@ -32,9 +32,12 @@ use parking_lot::{Mutex};
const CALL_SERVICE_NAME: &str = "call_service"; const CALL_SERVICE_NAME: &str = "call_service";
const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID"; const CURRENT_PEER_ID_ENV_NAME: &str = "CURRENT_PEER_ID";
/// A newtype needed to mark it as `unsafe impl Send`
struct SendSafeFaaS(FluenceFaaS);
/// Mark runtime as Send, so libp2p on the node (use-site) is happy
unsafe impl Send for SendSafeFaaS {} unsafe impl Send for SendSafeFaaS {}
struct SendSafeFaaS(FluenceFaaS);
impl Deref for SendSafeFaaS { impl Deref for SendSafeFaaS {
type Target = FluenceFaaS; type Target = FluenceFaaS;
@ -48,6 +51,7 @@ impl DerefMut for SendSafeFaaS {
} }
} }
/// Information about the particle that is being executed by the stepper at the moment
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct ParticleParameters { pub struct ParticleParameters {
pub init_user_id: String, pub init_user_id: String,
@ -69,28 +73,13 @@ impl AquamarineVM {
use AquamarineVMError::InvalidDataStorePath; use AquamarineVMError::InvalidDataStorePath;
let current_particle: Arc<Mutex<ParticleParameters>> = <_>::default(); let current_particle: Arc<Mutex<ParticleParameters>> = <_>::default();
let call_service = config.call_service; let call_service = call_service_descriptor(current_particle.clone(), config.call_service);
let params = current_particle.clone();
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
let params = {
let lock = params.lock();
lock.deref().clone()
};
call_service(params, ivalues)
});
let import_descriptor = HostImportDescriptor {
host_exported_func: call_service_closure,
argument_types: vec![IType::String, IType::String, IType::String, IType::String],
output_type: Some(IType::Record(0)),
error_handler: None,
};
let (wasm_dir, wasm_filename) = split_dirname(config.aquamarine_wasm_path)?; let (wasm_dir, wasm_filename) = split_dirname(config.aquamarine_wasm_path)?;
let faas_config = make_faas_config( let faas_config = make_faas_config(
wasm_dir, wasm_dir,
&wasm_filename, &wasm_filename,
import_descriptor, call_service,
config.current_peer_id, config.current_peer_id,
config.logging_mask, config.logging_mask,
); );
@ -124,17 +113,13 @@ impl AquamarineVM {
let prev_data_path = self.particle_data_store.join(&particle_id); let prev_data_path = self.particle_data_store.join(&particle_id);
// TODO: check for errors related to invalid file content (such as invalid UTF8 string) // TODO: check for errors related to invalid file content (such as invalid UTF8 string)
let prev_data = std::fs::read_to_string(&prev_data_path).unwrap_or_default(); let prev_data = std::fs::read_to_string(&prev_data_path)
.unwrap_or_default()
.into_bytes();
let prev_data = into_ibytes_array(prev_data.into_bytes()); let args = prepare_args(prev_data, data, init_user_id.clone(), aqua);
let data = into_ibytes_array(data.into());
let args = vec![
IValue::String(init_user_id.clone()),
IValue::String(aqua.into()),
IValue::Array(prev_data),
IValue::Array(data),
];
// Update ParticleParams with the new values so subsequent calls to `call_service` can use them
self.update_current_particle(particle_id, init_user_id); self.update_current_particle(particle_id, init_user_id);
let result = let result =
@ -151,13 +136,50 @@ impl AquamarineVM {
Ok(outcome) Ok(outcome)
} }
pub fn update_current_particle(&self, particle_id: String, init_user_id: String) { fn update_current_particle(&self, particle_id: String, init_user_id: String) {
let mut params = self.current_particle.lock(); let mut params = self.current_particle.lock();
params.particle_id = particle_id; params.particle_id = particle_id;
params.init_user_id = init_user_id; params.init_user_id = init_user_id;
} }
} }
fn prepare_args(
prev_data: Vec<u8>,
data: impl Into<Vec<u8>>,
init_user_id: String,
aqua: impl Into<String>,
) -> Vec<IValue> {
let prev_data = into_ibytes_array(prev_data);
let data = into_ibytes_array(data.into());
vec![
IValue::String(init_user_id),
IValue::String(aqua.into()),
IValue::Array(prev_data),
IValue::Array(data),
]
}
fn call_service_descriptor(
params: Arc<Mutex<ParticleParameters>>,
call_service: CallServiceClosure,
) -> HostImportDescriptor {
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
let params = {
let lock = params.lock();
lock.deref().clone()
};
call_service(params, ivalues)
});
HostImportDescriptor {
host_exported_func: call_service_closure,
argument_types: vec![IType::String, IType::String, IType::String, IType::String],
output_type: Some(IType::Record(0)),
error_handler: None,
}
}
/// Splits given path into its directory and file stem /// Splits given path into its directory and file stem
/// ///
/// # Example /// # Example