Accommodate new marine-js and avm interface (#181)

This commit is contained in:
Pavel 2022-09-12 13:32:50 +03:00 committed by GitHub
parent b68d61068a
commit 3d02c67fd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 169 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@fluencelabs/fluence", "name": "@fluencelabs/fluence",
"version": "0.24.1", "version": "0.25.0",
"description": "TypeScript implementation of Fluence Peer", "description": "TypeScript implementation of Fluence Peer",
"main": "./dist/index.js", "main": "./dist/index.js",
"typings": "./dist/index.d.ts", "typings": "./dist/index.d.ts",
@ -25,11 +25,11 @@
"copy-marine": "dist/tools/copyMarine.js" "copy-marine": "dist/tools/copyMarine.js"
}, },
"dependencies": { "dependencies": {
"@fluencelabs/avm": "0.27.8", "@fluencelabs/avm": "0.28.8",
"@fluencelabs/connection": "workspace:0.2.0", "@fluencelabs/connection": "workspace:0.2.0",
"@fluencelabs/interfaces": "workspace:0.1.0", "@fluencelabs/interfaces": "workspace:0.1.0",
"@fluencelabs/keypair": "workspace:0.2.0", "@fluencelabs/keypair": "workspace:0.2.0",
"@fluencelabs/marine-js": "0.3.10", "@fluencelabs/marine-js": "0.3.16",
"async": "3.2.3", "async": "3.2.3",
"base64-js": "^1.5.1", "base64-js": "^1.5.1",
"browser-or-node": "^2.0.0", "browser-or-node": "^2.0.0",

View File

@ -23,7 +23,7 @@ import type { MultiaddrInput } from 'multiaddr';
import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes'; import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes';
import { PeerIdB58 } from './commonTypes'; import { PeerIdB58 } from './commonTypes';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { throwIfNotSupported, dataToString, jsonify, MarineLoglevel, marineLogLevelToEnvs } from './utils'; import { throwIfNotSupported, dataToString, jsonify, MarineLoglevel, marineLogLevelToEnvs, isString } from './utils';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs'; import { concatMap, filter, pipe, Subject, tap } from 'rxjs';
import log from 'loglevel'; import log from 'loglevel';
import { builtInServices } from './builtins/common'; import { builtInServices } from './builtins/common';
@ -31,9 +31,8 @@ import { defaultSigGuard, Sig } from './builtins/Sig';
import { registerSig } from './_aqua/services'; import { registerSig } from './_aqua/services';
import Buffer from './Buffer'; import Buffer from './Buffer';
import { AVM, AvmRunner } from './avm';
import { isBrowser, isNode } from 'browser-or-node'; import { isBrowser, isNode } from 'browser-or-node';
import { InterpreterResult, LogLevel } from '@fluencelabs/avm'; import { deserializeAvmResult, InterpreterResult, JSONValue, LogLevel, serializeAvmArgs } from '@fluencelabs/avm';
/** /**
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id * Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
@ -100,12 +99,6 @@ export interface PeerConfig {
*/ */
defaultTtlMs?: number; defaultTtlMs?: number;
/**
* @deprecated. AVM run through marine-js infrastructure.
* @see marineJS option to configure AVM
*/
avmRunner?: AvmRunner;
/** /**
* This option allows to specify the location of various dependencies needed for marine-js. * This option allows to specify the location of various dependencies needed for marine-js.
* Each key specifies the location of the corresponding dependency. * Each key specifies the location of the corresponding dependency.
@ -306,9 +299,7 @@ export class FluencePeer {
this._keyPair = undefined; // This will set peer to non-initialized state and stop particle processing this._keyPair = undefined; // This will set peer to non-initialized state and stop particle processing
this._stopParticleProcessing(); this._stopParticleProcessing();
await this.disconnect(); await this.disconnect();
await this._avmRunner?.terminate();
await this._fluenceAppService?.terminate(); await this._fluenceAppService?.terminate();
this._avmRunner = undefined;
this._fluenceAppService = undefined; this._fluenceAppService = undefined;
this._classServices = undefined; this._classServices = undefined;
@ -331,12 +322,12 @@ export class FluencePeer {
new Error("Can't use avm: peer is not initialized"); new Error("Can't use avm: peer is not initialized");
} }
const args = JSON.stringify([air]); const res = await this._fluenceAppService!.callService('avm', 'ast', [air], undefined);
const rawRes = await this._fluenceAppService!.callService('avm', 'ast', args, undefined); if (!isString(res)) {
let res; throw new Error(`Call to avm:ast expected to return string. Actual return: ${res}`);
}
try { try {
res = JSON.parse(rawRes);
res = res.result as string;
if (res.startsWith('error')) { if (res.startsWith('error')) {
return { return {
success: false, success: false,
@ -349,7 +340,7 @@ export class FluencePeer {
}; };
} }
} catch (err) { } catch (err) {
throw new Error('Failed to call avm. Raw result: ' + rawRes + '. Error: ' + err); throw new Error('Failed to call avm. Result: ' + res + '. Error: ' + err);
} }
}, },
createNewParticle: (script: string, ttl: number = this._defaultTTL) => { createNewParticle: (script: string, ttl: number = this._defaultTTL) => {
@ -454,8 +445,6 @@ export class FluencePeer {
undefined, undefined,
marineLogLevelToEnvs(this._marineLogLevel), marineLogLevelToEnvs(this._marineLogLevel),
); );
this._avmRunner = config?.avmRunner || new AVM(this._fluenceAppService);
await this._avmRunner.init(config?.avmLogLevel || 'off');
registerDefaultServices(this); registerDefaultServices(this);
@ -520,11 +509,6 @@ export class FluencePeer {
private _defaultTTL: number = DEFAULT_TTL; private _defaultTTL: number = DEFAULT_TTL;
private _keyPair: KeyPair | undefined; private _keyPair: KeyPair | undefined;
private _connection?: FluenceConnection; private _connection?: FluenceConnection;
/**
* @deprecated. AVM run through marine-js infrastructure. This field is needed for backward compatibility with the previous API
*/
private _avmRunner?: AvmRunner;
private _fluenceAppService?: FluenceAppService; private _fluenceAppService?: FluenceAppService;
private _timeouts: Array<NodeJS.Timeout> = []; private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>(); private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
@ -576,7 +560,7 @@ export class FluencePeer {
() => { () => {
item.onStageChange({ stage: 'sent' }); item.onStageChange({ stage: 'sent' });
}, },
(e) => { (e: any) => {
log.error(e); log.error(e);
}, },
); );
@ -605,7 +589,7 @@ export class FluencePeer {
concatMap(async (item) => { concatMap(async (item) => {
const status = this.getStatus(); const status = this.getStatus();
if (!status.isInitialized || this._avmRunner === undefined) { if (!status.isInitialized || this._fluenceAppService === undefined) {
// If `.stop()` was called return null to stop particle processing immediately // If `.stop()` was called return null to stop particle processing immediately
return null; return null;
} }
@ -615,14 +599,37 @@ export class FluencePeer {
// MUST happen sequentially (in a critical section). // MUST happen sequentially (in a critical section).
// Otherwise the race between runner might occur corrupting the prevData // Otherwise the race between runner might occur corrupting the prevData
const result = await runAvmRunner(status.peerId, this._avmRunner, item.particle, prevData); const args = serializeAvmArgs(
const newData = Buffer.from(result.data); {
initPeerId: item.particle.initPeerId,
currentPeerId: status.peerId,
timestamp: item.particle.timestamp,
ttl: item.particle.ttl,
},
item.particle.script,
prevData,
item.particle.data,
item.particle.callResults,
);
item.particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
let avmCallResult: InterpreterResult | Error;
try {
const res = await this._fluenceAppService.callService('avm', 'invoke', args, undefined);
avmCallResult = deserializeAvmResult(res);
} catch (e) {
avmCallResult = e instanceof Error ? e : new Error((e as any).toString());
}
if (!(avmCallResult instanceof Error) && avmCallResult.retCode === 0) {
const newData = Buffer.from(avmCallResult.data);
prevData = newData; prevData = newData;
}
return { return {
...item, ...item,
result: result, result: avmCallResult,
newData: newData,
}; };
}), }),
) )
@ -633,11 +640,21 @@ export class FluencePeer {
} }
// Do not continue if there was an error in particle interpretation // Do not continue if there was an error in particle interpretation
if (!isInterpretationSuccessful(item.result)) { if (item.result instanceof Error) {
log.error('Interpreter failed: ', jsonify(item.result.message));
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.message });
return;
}
const toLog = { ...item.result, data: dataToString(item.result.data) };
if (item.result.retCode !== 0) {
log.error('Interpreter failed: ', jsonify(toLog));
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage }); item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
return; return;
} }
log.debug('Interpreter result: ', jsonify(toLog));
setTimeout(() => { setTimeout(() => {
item.onStageChange({ stage: 'interpreted' }); item.onStageChange({ stage: 'interpreted' });
}, 0); }, 0);
@ -645,7 +662,8 @@ export class FluencePeer {
// send particle further if requested // send particle further if requested
if (item.result.nextPeerPks.length > 0) { if (item.result.nextPeerPks.length > 0) {
const newParticle = item.particle.clone(); const newParticle = item.particle.clone();
newParticle.data = item.newData; const newData = Buffer.from(item.result.data);
newParticle.data = newData;
this._outgoingParticles.next({ this._outgoingParticles.next({
...item, ...item,
particle: newParticle, particle: newParticle,
@ -700,31 +718,12 @@ export class FluencePeer {
const particleId = req.particleContext.particleId; const particleId = req.particleContext.particleId;
if (this._fluenceAppService && this._marineServices.has(req.serviceId)) { if (this._fluenceAppService && this._marineServices.has(req.serviceId)) {
const args = JSON.stringify(req.args); const result = await this._fluenceAppService.callService(req.serviceId, req.fnName, req.args, undefined);
const rawResult = await this._fluenceAppService.callService(req.serviceId, req.fnName, args, undefined);
try {
const result = JSON.parse(rawResult);
if (typeof result.error === 'string' && result.error.length > 0) {
return {
retCode: ResultCodes.error,
result: result.error,
};
}
if (result.result === undefined) {
throw new Error(
`Call to marine-js returned no error and empty result. Original request: ${jsonify(req)}`,
);
}
return { return {
retCode: ResultCodes.success, retCode: ResultCodes.success,
result: result.result, result: result as JSONValue,
}; };
} catch (e) {
throw new Error(`Call to marine-js. Result parsing error: ${e}, original text: ${rawResult}`);
}
} }
const key = serviceFnKey(req.serviceId, req.fnName); const key = serviceFnKey(req.serviceId, req.fnName);
@ -805,10 +804,6 @@ async function configToConnection(
return res; return res;
} }
function isInterpretationSuccessful(result: InterpreterResult) {
return result.retCode === 0;
}
function serviceFnKey(serviceId: string, fnName: string) { function serviceFnKey(serviceId: string, fnName: string) {
return `${serviceId}/${fnName}`; return `${serviceId}/${fnName}`;
} }
@ -821,37 +816,6 @@ function registerDefaultServices(peer: FluencePeer) {
}); });
} }
async function runAvmRunner(
currentPeerId: PeerIdB58,
runner: AvmRunner,
particle: Particle,
prevData: Uint8Array,
): Promise<InterpreterResult> {
particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
const interpreterResult = await runner.run(
particle.script,
prevData,
particle.data,
{
initPeerId: particle.initPeerId,
currentPeerId: currentPeerId,
timestamp: particle.timestamp,
ttl: particle.ttl,
},
particle.callResults,
);
const toLog = { ...interpreterResult, data: dataToString(interpreterResult.data) };
if (isInterpretationSuccessful(interpreterResult)) {
log.debug('Interpreter result: ', jsonify(toLog));
} else {
log.error('Interpreter failed: ', jsonify(toLog));
}
return interpreterResult;
}
function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) { function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) {
return pipe( return pipe(
tap((item: ParticleQueueItem) => { tap((item: ParticleQueueItem) => {

View File

@ -1,65 +0,0 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { callAvm, CallResultsArray, InterpreterResult, LogLevel, RunParameters } from '@fluencelabs/avm';
import { FluenceAppService } from '@fluencelabs/marine-js';
/**
* @deprecated. AVM run through marine-js infrastructure. This type is needed for backward compatibility with the previous API
*/
export type AvmRunner = {
init: (logLevel: LogLevel) => Promise<void>;
terminate: () => Promise<void>;
run: (
air: string,
prevData: Uint8Array,
data: Uint8Array,
params: RunParameters,
callResults: CallResultsArray,
) => Promise<InterpreterResult>;
};
/**
* @deprecated. AVM run through marine-js infrastructure. This type is needed for backward compatibility with the previous API
*/
export class AVM implements AvmRunner {
private _fluenceAppService: FluenceAppService;
constructor(fluenceAppService: FluenceAppService) {
this._fluenceAppService = fluenceAppService;
}
async init(logLevel: LogLevel): Promise<void> {}
async terminate(): Promise<void> {}
async run(
air: string,
prevData: Uint8Array,
data: Uint8Array,
runParams: RunParameters,
callResults: CallResultsArray,
): Promise<InterpreterResult> {
return callAvm(
(args) => this._fluenceAppService.callService('avm', 'invoke', args, undefined),
runParams,
air,
prevData,
data,
callResults,
);
}
}

View File

@ -125,7 +125,7 @@ export interface CallServiceData {
/** /**
* Type for all the possible objects that can be returned to the AVM * Type for all the possible objects that can be returned to the AVM
*/ */
export type CallServiceResultType = object | boolean | number | string | null; export type CallServiceResultType = JSONValue;
/** /**
* Generic call service handler * Generic call service handler
@ -146,3 +146,7 @@ export interface CallServiceResult {
*/ */
result: CallServiceResultType; result: CallServiceResultType;
} }
export type JSONValue = string | number | boolean | null | { [x: string]: JSONValue } | Array<JSONValue>;
export type JSONArray = Array<JSONValue>;
export type JSONObject = { [x: string]: JSONValue };

View File

@ -173,3 +173,7 @@ export type MarineLoglevel = LogLevel;
export const marineLogLevelToEnvs = (marineLogLevel: MarineLoglevel | undefined) => export const marineLogLevelToEnvs = (marineLogLevel: MarineLoglevel | undefined) =>
marineLogLevel ? { WASM_LOG: marineLogLevel } : undefined; marineLogLevel ? { WASM_LOG: marineLogLevel } : undefined;
export const isString = (x: unknown): x is string => {
return x !== null && typeof x === 'string';
};

16
pnpm-lock.yaml generated
View File

@ -58,12 +58,12 @@ importers:
specifiers: specifiers:
'@fluencelabs/aqua': ^0.7.2-307 '@fluencelabs/aqua': ^0.7.2-307
'@fluencelabs/aqua-lib': ^0.5.1 '@fluencelabs/aqua-lib': ^0.5.1
'@fluencelabs/avm': 0.27.8 '@fluencelabs/avm': 0.28.8
'@fluencelabs/connection': workspace:0.2.0 '@fluencelabs/connection': workspace:0.2.0
'@fluencelabs/fluence-network-environment': ^1.0.13 '@fluencelabs/fluence-network-environment': ^1.0.13
'@fluencelabs/interfaces': workspace:0.1.0 '@fluencelabs/interfaces': workspace:0.1.0
'@fluencelabs/keypair': workspace:0.2.0 '@fluencelabs/keypair': workspace:0.2.0
'@fluencelabs/marine-js': 0.3.10 '@fluencelabs/marine-js': 0.3.16
'@types/bs58': ^4.0.1 '@types/bs58': ^4.0.1
'@types/jest': ^27.5.1 '@types/jest': ^27.5.1
'@types/platform': ^1.3.4 '@types/platform': ^1.3.4
@ -89,11 +89,11 @@ importers:
typescript: ^4.6.4 typescript: ^4.6.4
uuid: 8.3.2 uuid: 8.3.2
dependencies: dependencies:
'@fluencelabs/avm': 0.27.8 '@fluencelabs/avm': 0.28.8
'@fluencelabs/connection': link:../fluence-connection '@fluencelabs/connection': link:../fluence-connection
'@fluencelabs/interfaces': link:../fluence-interfaces '@fluencelabs/interfaces': link:../fluence-interfaces
'@fluencelabs/keypair': link:../fluence-keypair '@fluencelabs/keypair': link:../fluence-keypair
'@fluencelabs/marine-js': 0.3.10_rl5xm3oiydas7snsul2pa47p2m '@fluencelabs/marine-js': 0.3.16_rl5xm3oiydas7snsul2pa47p2m
async: 3.2.3 async: 3.2.3
base64-js: 1.5.1 base64-js: 1.5.1
browser-or-node: 2.0.0 browser-or-node: 2.0.0
@ -505,8 +505,8 @@ packages:
resolution: {integrity: sha512-BD7pr3ZRHLIb9XVt08i+/fX2+B4l5zln6j+5mEIJmDBQETv6Gz7NdsgKn0jUQueBcztR+mw5w7byM66yf6xEnA==} resolution: {integrity: sha512-BD7pr3ZRHLIb9XVt08i+/fX2+B4l5zln6j+5mEIJmDBQETv6Gz7NdsgKn0jUQueBcztR+mw5w7byM66yf6xEnA==}
dev: true dev: true
/@fluencelabs/avm/0.27.8: /@fluencelabs/avm/0.28.8:
resolution: {integrity: sha512-8d0iGs7dtYC/pneMkSWadgEWsKyGQOsi+BFtR6Pxh73tkAAEhiCydbgu1CTDNffV7eO2MztlGEYMib2x38M0bw==} resolution: {integrity: sha512-s3s5Y+qRmXXXUFYawXlWwK4CRtfuQWAi2UkyqzTieBS2hS/V6IZ+OZkV3XvBpF2CNWO7lH2gGzo4anhlg8KMIA==}
dev: false dev: false
/@fluencelabs/fluence-network-environment/1.0.13: /@fluencelabs/fluence-network-environment/1.0.13:
@ -571,8 +571,8 @@ packages:
- typescript - typescript
dev: true dev: true
/@fluencelabs/marine-js/0.3.10_rl5xm3oiydas7snsul2pa47p2m: /@fluencelabs/marine-js/0.3.16_rl5xm3oiydas7snsul2pa47p2m:
resolution: {integrity: sha512-1eVDMQjC5si6+4EcGu684dhSnfTFxBxARaQsL0un4sHHBqXp9qD5rmuyTYyQ+GHEEqez3cnen9xD09SptqqvoQ==} resolution: {integrity: sha512-LJ1rj530L0qDX0fGF6LO14bGY5pycN6KVqpC01snIqIP550vslEj5DzMewzKcnll4jObtTX6n5cQtn2kq/NvrA==}
dependencies: dependencies:
'@wasmer/wasi': 0.12.0 '@wasmer/wasi': 0.12.0
'@wasmer/wasmfs': 0.12.0 '@wasmer/wasmfs': 0.12.0