From ba537c79b3d906eadfd156f4ce2ab6ca2b581328 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 19 Jan 2021 15:47:49 +0300 Subject: [PATCH] Big refactoring (#8) Big codebase refactoring. * Multiple clients are allowed on the same browser instance * Particle queue processing is split from particle handling logic * Public AIP is completely rethought * Updated project file structure. Clean exports for public api methods * Additional unit tests --- .gitignore | 3 +- package.json | 13 +- src/FluenceClient.ts | 252 +++++++++++ src/__test__/air.spec.ts | 158 +++++++ src/{test => __test__}/ast.spec.ts | 4 +- src/__test__/client.spec.ts | 358 +++++++++++++++ src/{test => __test__}/greeting_wasm.ts | 0 src/{test => __test__}/mocha.opts | 2 +- src/__test__/util.ts | 15 + src/api.ts | 111 +++++ src/aqua/scripts.ts | 123 ------ src/dataStorage.ts | 27 -- src/fluence.ts | 83 ---- src/fluenceClient.ts | 414 ------------------ src/globalState.ts | 51 --- src/helpers/waitService.ts | 46 -- src/{stepperOutcome.ts => index.ts} | 10 +- src/internal/FluenceClientBase.ts | 96 ++++ .../FluenceConnection.ts} | 19 +- src/internal/ParticleProcessor.ts | 239 ++++++++++ .../ParticleProcessorStrategy.ts} | 22 +- src/{ => internal}/aqua/index.d.ts | 0 src/{ => internal}/aqua/index.js | 0 src/{ => internal}/aqua/index_bg.js | 0 .../commonTypes.ts} | 20 + src/{ => internal}/particle.ts | 68 ++- src/{seed.ts => internal/peerIdUtils.ts} | 4 + src/{ => internal}/stepper.ts | 58 ++- src/{ => internal}/trust/certificate.ts | 0 src/{ => internal}/trust/misc.ts | 0 src/{ => internal}/trust/trust.ts | 0 src/{ => internal}/trust/trust_graph.ts | 7 +- src/service.ts | 158 ------- src/subscriptions.ts | 56 --- src/test/air.spec.ts | 153 ------- src/test/client.spec.ts | 128 ------ src/utils.ts | 21 - tsconfig.json | 6 +- webpack.config.js | 4 +- 39 files changed, 1397 insertions(+), 1332 deletions(-) create mode 100644 src/FluenceClient.ts create mode 100644 src/__test__/air.spec.ts rename src/{test => __test__}/ast.spec.ts (89%) create mode 100644 src/__test__/client.spec.ts rename src/{test => __test__}/greeting_wasm.ts (100%) rename src/{test => __test__}/mocha.opts (67%) create mode 100644 src/__test__/util.ts create mode 100644 src/api.ts delete mode 100644 src/aqua/scripts.ts delete mode 100644 src/dataStorage.ts delete mode 100644 src/fluence.ts delete mode 100644 src/fluenceClient.ts delete mode 100644 src/globalState.ts delete mode 100644 src/helpers/waitService.ts rename src/{stepperOutcome.ts => index.ts} (68%) create mode 100644 src/internal/FluenceClientBase.ts rename src/{fluenceConnection.ts => internal/FluenceConnection.ts} (88%) create mode 100644 src/internal/ParticleProcessor.ts rename src/{moduleConfig.ts => internal/ParticleProcessorStrategy.ts} (51%) rename src/{ => internal}/aqua/index.d.ts (100%) rename src/{ => internal}/aqua/index.js (100%) rename src/{ => internal}/aqua/index_bg.js (100%) rename src/{securityTetraplet.ts => internal/commonTypes.ts} (67%) rename src/{ => internal}/particle.ts (65%) rename src/{seed.ts => internal/peerIdUtils.ts} (90%) rename src/{ => internal}/stepper.ts (79%) rename src/{ => internal}/trust/certificate.ts (100%) rename src/{ => internal}/trust/misc.ts (100%) rename src/{ => internal}/trust/trust.ts (100%) rename src/{ => internal}/trust/trust_graph.ts (94%) delete mode 100644 src/service.ts delete mode 100644 src/subscriptions.ts delete mode 100644 src/test/air.spec.ts delete mode 100644 src/test/client.spec.ts delete mode 100644 src/utils.ts diff --git a/.gitignore b/.gitignore index a3460188..8f9e3ab8 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ bundle/ # Dependency directories node_modules/ -jspm_packages/ \ No newline at end of file +jspm_packages/ +/dist/ diff --git a/package.json b/package.json index 413ae32b..880486ce 100644 --- a/package.json +++ b/package.json @@ -2,15 +2,12 @@ "name": "@fluencelabs/fluence", "version": "0.8.0", "description": "JS SDK for the Fluence network", - "main": "./dist/fluence.js", - "typings": "./dist/fluence.d.ts", + "main": "./dist/index.js", + "typings": "./dist/index.d.ts", "scripts": { "test": "mocha --timeout 10000 -r esm -r ts-node/register src/**/*.spec.ts", - "test-ts": "ts-mocha --timeout 10000 -r esm -p tsconfig.json src/**/*.spec.ts", - "package:build": "NODE_ENV=production webpack && npm run package", - "package": "tsc && rsync -r src/aqua/*.js dist/aqua", - "start": "webpack-dev-server -p", - "build": "webpack --mode production" + "build": "tsc && rsync -r src/internal/aqua/*.js dist/internal/aqua", + "build:webpack": "webpack --mode production" }, "repository": "https://github.com/fluencelabs/fluence-js", "author": "Fluence Labs", @@ -48,7 +45,7 @@ "text-encoding": "^0.7.0", "ts-loader": "7.0.5", "ts-mocha": "8.0.0", - "typescript": "3.9.5", + "typescript": "^3.9.5", "webpack": "4.43.0", "webpack-cli": "3.3.11", "webpack-dev-server": "3.11.0" diff --git a/src/FluenceClient.ts b/src/FluenceClient.ts new file mode 100644 index 00000000..6a534e52 --- /dev/null +++ b/src/FluenceClient.ts @@ -0,0 +1,252 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import log from 'loglevel'; +import PeerId from 'peer-id'; +import { SecurityTetraplet, StepperOutcome } from './internal/commonTypes'; +import { FluenceClientBase } from './internal/FluenceClientBase'; +import { build, genUUID, ParticleDto } from './internal/particle'; +import { ParticleProcessor } from './internal/ParticleProcessor'; +import { ParticleProcessorStrategy } from './internal/ParticleProcessorStrategy'; + +const fetchCallbackServiceName = '__callback'; +const selfRelayVarName = '__relay'; + +const wrapRelayBasedCall = (script: string) => { + return ` + (seq + (call ${selfRelayVarName} ("op" "identity") []) + ${script} + ) + `; +}; + +const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => { + // TODO: sanitize + const resultTogether = resultArgNames.join(' '); + let res = ` + (seq + ${script} + (seq + (call ${selfRelayVarName} ("op" "identity") []) + (call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}]) + ) + )`; + return wrapRelayBasedCall(res); +}; + +export interface FluenceEvent { + type: string; + args: any[]; +} + +export type FluenceEventHandler = (event: FluenceEvent) => void; + +export class FluenceClient extends FluenceClientBase { + private eventSubscribers: Map = new Map(); + private eventValidators: Map = new Map(); + private callbacks: Map = new Map(); + private fetchParticles: Map = new Map(); + + constructor(selfPeerId: PeerId) { + super(selfPeerId); + this.processor = new ParticleProcessor(this.strategy, selfPeerId); + } + + async fetch(script: string, resultArgNames: string[], data?: Map, ttl?: number): Promise { + data = this.addRelayToArgs(data); + const callBackId = genUUID(); + script = wrapFetchCall(script, callBackId, resultArgNames); + const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId); + + return new Promise((resolve, reject) => { + this.fetchParticles.set(callBackId, { resolve, reject }); + this.processor.executeLocalParticle(particle); + }); + } + + // TODO:: better naming probably? + async fireAndForget(script: string, data?: Map, ttl?: number) { + data = this.addRelayToArgs(data); + script = wrapRelayBasedCall(script); + + await this.sendScript(script, data, ttl); + } + + registerEvent( + channel: string, + eventName: string, + validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean, + ) { + if (!validate) { + validate = (c, e, a, t) => true; + } + + this.eventValidators.set(`${channel}/${eventName}`, validate); + } + + unregisterEvent(channel: string, eventName: string) { + this.eventValidators.delete(`${channel}/${eventName}`); + } + + registerCallback( + serviceId: string, + fnName: string, + callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object, + ) { + this.callbacks.set(`${serviceId}/${fnName}`, callback); + } + + unregisterCallback(channel: string, eventName: string) { + this.eventValidators.delete(`${channel}/${eventName}`); + } + + subscribe(channel: string, handler: FluenceEventHandler) { + if (!this.eventSubscribers.get(channel)) { + this.eventSubscribers.set(channel, []); + } + + this.eventSubscribers.get(channel).push(handler); + } + + protected strategy: ParticleProcessorStrategy = { + particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => { + // missing built-in op + if (serviceId === 'op' && fnName === 'identity') { + return { + ret_code: 0, + result: JSON.stringify(args), + }; + } + + // async fetch model handling + if (serviceId === fetchCallbackServiceName) { + const executingParticlePromiseFns = this.fetchParticles.get(fnName); + if (executingParticlePromiseFns) { + // don't block + setImmediate(() => { + this.fetchParticles.delete(fnName); + executingParticlePromiseFns.resolve(args); + }); + } + + return { + ret_code: 0, + result: JSON.stringify({}), + }; + } + + // event model handling + const eventPair = `${serviceId}/${fnName}`; + const eventValidator = this.eventValidators.get(eventPair); + if (eventValidator) { + try { + if (!eventValidator(serviceId, fnName, args, tetraplets)) { + return { + ret_code: 1, // TODO:: error codes + result: 'validation failed', + }; + } + } catch (e) { + log.error('error running validation function: ' + e); + return { + ret_code: 1, // TODO:: error codes + result: 'validation failed', + }; + } + + // don't block + setImmediate(() => { + this.pushEvent(serviceId, { + type: fnName, + args: args, + }); + }); + + return { + ret_code: 0, + result: JSON.stringify({}), + }; + } + + // callback model handling + const callback = this.callbacks.get(eventPair); + if (callback) { + try { + const res = callback(args, tetraplets); + return { + ret_code: 0, + result: JSON.stringify(res), + }; + } catch (e) { + return { + ret_code: 1, // TODO:: error codes + result: JSON.stringify(e), + }; + } + } + + return { + ret_code: 1, + result: `Error. There is no service: ${serviceId}`, + }; + }, + + sendParticleFurther: async (particle: ParticleDto) => { + try { + await this.connection.sendParticle(particle); + } catch (reason) { + log.error(`Error on sending particle with id ${particle.id}: ${reason}`); + } + }, + + onParticleTimeout: (particle: ParticleDto, now: number) => { + log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`); + const executingParticle = this.fetchParticles.get(particle.id); + if (executingParticle) { + executingParticle.reject(new Error(`particle ${particle.id} timed out`)); + } + }, + onLocalParticleRecieved: (particle: ParticleDto) => {}, + onExternalParticleRecieved: (particle: ParticleDto) => {}, + onStepperExecuting: (particle: ParticleDto) => {}, + onStepperExecuted: (stepperOutcome: StepperOutcome) => { + log.info('inner interpreter outcome:'); + log.info(stepperOutcome); + }, + }; + + private pushEvent(channel: string, event: FluenceEvent) { + const subs = this.eventSubscribers.get(channel); + if (subs) { + for (let sub of subs) { + sub(event); + } + } + } + + private addRelayToArgs(data: Map) { + if (data === undefined) { + data = new Map(); + } + + if (!data.has(selfRelayVarName)) { + data.set(selfRelayVarName, this.relayPeerId); + } + + return data; + } +} diff --git a/src/__test__/air.spec.ts b/src/__test__/air.spec.ts new file mode 100644 index 00000000..55d73815 --- /dev/null +++ b/src/__test__/air.spec.ts @@ -0,0 +1,158 @@ +import 'mocha'; +import { expect } from 'chai'; +import { createLocalClient } from './util'; + +describe('== AIR suite', () => { + it('check init_peer_id', async function () { + // arrange + const serviceId = 'test_service'; + const fnName = 'return_first_arg'; + + const client = await createLocalClient(); + + let res; + client.registerCallback(serviceId, fnName, (args, _) => { + res = args[0]; + return res; + }); + + // act + const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`; + await client.sendScript(script); + + // assert + expect(res).to.be.equal(client.selfPeerId); + }); + + it('call local function', async function () { + // arrange + const serviceId = 'test_service'; + const fnName = 'return_first_arg'; + + const client = await createLocalClient(); + + let res; + client.registerCallback(serviceId, fnName, (args, _) => { + res = args[0]; + return res; + }); + + // act + const arg = 'hello'; + const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`; + await client.sendScript(script); + + // assert + expect(res).to.be.equal(arg); + }); + + it('check particle arguments', async function () { + // arrange + const serviceId = 'test_service'; + const fnName = 'return_first_arg'; + + const client = await createLocalClient(); + + let res; + client.registerCallback(serviceId, fnName, (args, _) => { + res = args[0]; + return res; + }); + + // act + const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`; + const data = new Map(); + data.set('arg1', 'hello'); + await client.sendScript(script, data); + + // assert + expect(res).to.be.equal('hello'); + }); + + it('check security tetraplet', async function () { + // arrange + const makeDataServiceId = 'make_data_service'; + const makeDataFnName = 'make_data'; + const getDataServiceId = 'get_data_service'; + const getDataFnName = 'get_data'; + + const client = await createLocalClient(); + + client.registerCallback(makeDataServiceId, makeDataFnName, (args, _) => { + return { + field: 42, + }; + }); + let res; + client.registerCallback(getDataServiceId, getDataFnName, (args, tetraplets) => { + res = { + args: args, + tetraplets: tetraplets, + }; + return args[0]; + }); + + // act + const script = ` + (seq + (call %init_peer_id% ("${makeDataServiceId}" "${makeDataFnName}") [] result) + (call %init_peer_id% ("${getDataServiceId}" "${getDataFnName}") [result.$.field]) + )`; + await client.sendScript(script); + + // assert + const tetraplet = res.tetraplets[0][0]; + expect(tetraplet).to.contain({ + service_id: 'make_data_service', + function_name: 'make_data', + json_path: '$.field', + }); + }); + + it('check chain of services work properly', async function () { + this.timeout(5000); + // arrange + const client = await createLocalClient(); + + const serviceId1 = 'check1'; + const fnName1 = 'fn1'; + let res1; + client.registerCallback(serviceId1, fnName1, (args, _) => { + res1 = args[0]; + return res1; + }); + + const serviceId2 = 'check2'; + const fnName2 = 'fn2'; + let res2; + client.registerCallback(serviceId2, fnName2, (args, _) => { + res2 = args[0]; + return res2; + }); + + const serviceId3 = 'check3'; + const fnName3 = 'fn3'; + let res3; + client.registerCallback(serviceId3, fnName3, (args, _) => { + res3 = args; + return res3; + }); + + const arg1 = 'arg1'; + const arg2 = 'arg2'; + + // act + const script = `(seq + (seq + (call %init_peer_id% ("${serviceId1}" "${fnName1}") ["${arg1}"] result1) + (call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2)) + (call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2])) + `; + await client.sendScript(script); + + // assert + expect(res1).to.be.equal(arg1); + expect(res2).to.be.equal(arg2); + expect(res3).to.be.deep.equal([res1, res2]); + }); +}); diff --git a/src/test/ast.spec.ts b/src/__test__/ast.spec.ts similarity index 89% rename from src/test/ast.spec.ts rename to src/__test__/ast.spec.ts index a9863fd1..e642c9ce 100644 --- a/src/test/ast.spec.ts +++ b/src/__test__/ast.spec.ts @@ -1,10 +1,10 @@ import { expect } from 'chai'; import 'mocha'; -import Fluence from '../fluence'; +import { parseAIR } from '../internal/stepper'; describe('== AST parsing suite', () => { it('parse simple script and return ast', async function () { - let ast = await Fluence.parseAIR(` + let ast = await parseAIR(` (call node ("service" "function") [1 2 3 arg] output) `); diff --git a/src/__test__/client.spec.ts b/src/__test__/client.spec.ts new file mode 100644 index 00000000..41bebbe4 --- /dev/null +++ b/src/__test__/client.spec.ts @@ -0,0 +1,358 @@ +import { expect } from 'chai'; + +import 'mocha'; +import { encode } from 'bs58'; +import { certificateFromString, certificateToString, issue } from '../internal/trust/certificate'; +import { TrustGraph } from '../internal/trust/trust_graph'; +import { nodeRootCert } from '../internal/trust/misc'; +import { generatePeerId, peerIdToSeed, seedToPeerId } from '../internal/peerIdUtils'; +import { FluenceClient } from '../FluenceClient'; +import { createConnectedClient, createLocalClient } from './util'; +import log from 'loglevel'; +import { createClient } from '../api'; +import Multiaddr from 'multiaddr'; + +describe('Typescript usage suite', () => { + it('should create private key from seed and back', async function () { + // prettier-ignore + let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201]; + let seedStr = encode(seed); + log.trace('SEED STR: ' + seedStr); + let pid = await seedToPeerId(seedStr); + expect(peerIdToSeed(pid)).to.be.equal(seedStr); + }); + + it('should serialize and deserialize certificate correctly', async function () { + let cert = `11 +1111 +5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9 +3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr +158981172690500 +1589974723504 +2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7 +4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d +1590061123504 +1589974723504`; + + let deser = await certificateFromString(cert); + let ser = certificateToString(deser); + + expect(ser).to.be.equal(cert); + }); + + // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes + it.skip('should perform tests on certs', async function () { + this.timeout(15000); + await testCerts(); + }); + + describe.skip('should make connection to network', async function () { + this.timeout(30000); + + const testProcedure = async (client: FluenceClient) => { + let resMakingPromise = new Promise((resolve) => { + client.registerCallback('test', 'test', (args, _) => { + resolve(args); + return {}; + }); + }); + + let script = ` + (seq + (call "${client.relayPeerId}" ("op" "identity") []) + (call "${client.selfPeerId}" ("test" "test") [hello]) + ) + `; + + let data: Map = new Map(); + data.set('hello', 'world'); + + await client.sendScript(script, data); + + const res = await resMakingPromise; + return res; + }; + + it('address as string', async function () { + // arrange + const addr = + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9'; + + // act + const client = await createClient(addr); + + // assert + const res = await testProcedure(client); + expect(res).to.deep.equal(['world']); + }); + + it('address as multiaddr', async function () { + // arrange + const addr = new Multiaddr( + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + // act + const client = await createClient(addr); + + // assert + const res = await testProcedure(client); + expect(res).to.deep.equal(['world']); + }); + + it('address as node', async function () { + // arrange + const addr = { + multiaddr: + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + peerId: '12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + }; + + // act + const client = await createClient(addr); + + // assert + const res = await testProcedure(client); + expect(res).to.deep.equal(['world']); + }); + + it('peerid as peer id', async function () { + // arrange + const addr = + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9'; + const pid = await generatePeerId(); + + // act + const client = await createClient(addr, pid); + + // assert + const res = await testProcedure(client); + expect(res).to.deep.equal(['world']); + }); + + it('peerid as see', async function () { + // arrange + const addr = + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9'; + const pid = peerIdToSeed(await generatePeerId()); + + // act + const client = await createClient(addr, pid); + + // assert + const res = await testProcedure(client); + expect(res).to.deep.equal(['world']); + }); + }); + + it.skip('should make a call through the network', async function () { + this.timeout(30000); + // arrange + const client = await createConnectedClient( + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + client.registerCallback('test', 'test', (args, _) => { + log.trace('should make a call through the network, called "test" "test" with args', args); + return {}; + }); + + let resMakingPromise = new Promise((resolve) => { + client.registerCallback('test', 'reverse_args', (args, _) => { + resolve([...args].reverse()); + return {}; + }); + }); + + // act + let script = ` + (seq + (call "${client.relayPeerId}" ("op" "identity") []) + (seq + (call "${client.selfPeerId}" ("test" "test") [a b c d] result) + (call "${client.selfPeerId}" ("test" "reverse_args") [a b c d]) + ) + ) + `; + + let data: Map = new Map(); + data.set('a', 'some a'); + data.set('b', 'some b'); + data.set('c', 'some c'); + data.set('d', 'some d'); + + await client.sendScript(script, data); + + // assert + const res = await resMakingPromise; + expect(res).to.deep.equal(['some d', 'some c', 'some b', 'some a']); + }); + + it.skip('fireAndForget should work', async function () { + this.timeout(30000); + // arrange + const client = await createConnectedClient( + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + let resMakingPromise = new Promise((resolve) => { + client.registerCallback('test', 'reverse_args', (args, _) => { + resolve([...args].reverse()); + return {}; + }); + }); + + // act + let script = ` + (call "${client.selfPeerId}" ("test" "reverse_args") [a b c d]) + `; + + let data: Map = new Map(); + data.set('a', 'some a'); + data.set('b', 'some b'); + data.set('c', 'some c'); + data.set('d', 'some d'); + + await client.fireAndForget(script, data); + + // assert + const res = await resMakingPromise; + expect(res).to.deep.equal(['some d', 'some c', 'some b', 'some a']); + }); + + it.skip('fetch should work', async function () { + this.timeout(30000); + // arrange + const client = await createConnectedClient( + '/dns4/net01.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + // act + let script = ` + (call "${client.relayPeerId}" ("op" "identify") [] result) + `; + const data = new Map(); + data.set('__relay', client.relayPeerId); + + const [res] = await client.fetch(script, ['result'], data); + + // assert + expect(res.external_addresses).to.be.not.undefined; + }); + + it.skip('two clients should work inside the same time browser', async function () { + // arrange + const pid1 = await generatePeerId(); + const client1 = new FluenceClient(pid1); + await client1.connect( + '/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + const pid2 = await generatePeerId(); + const client2 = new FluenceClient(pid2); + await client2.connect( + '/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + let resMakingPromise = new Promise((resolve) => { + client2.registerCallback('test', 'test', (args, _) => { + resolve([...args]); + return {}; + }); + }); + + let script = ` + (seq + (call "${client1.relayPeerId}" ("op" "identity") []) + (call "${pid2.toB58String()}" ("test" "test") [a b c d]) + ) + `; + + let data: Map = new Map(); + data.set('a', 'some a'); + data.set('b', 'some b'); + data.set('c', 'some c'); + data.set('d', 'some d'); + + await client1.sendScript(script, data); + + let res = await resMakingPromise; + expect(res).to.deep.equal(['some a', 'some b', 'some c', 'some d']); + }); + + it.skip('event registration should work', async function () { + // arrange + const pid1 = await generatePeerId(); + const client1 = new FluenceClient(pid1); + await client1.connect( + '/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + const pid2 = await generatePeerId(); + const client2 = new FluenceClient(pid2); + await client2.connect( + '/dns4/dev.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', + ); + + client2.registerEvent('event_stream', 'test'); + const resMakingPromise = new Promise((resolve) => { + client2.subscribe('event_stream', resolve); + }); + + // act + let script = ` + (call "${pid2.toB58String()}" ("event_stream" "test") [hello]) + `; + + let data: Map = new Map(); + data.set('hello', 'world'); + + await client1.fireAndForget(script, data); + + // assert + let res = await resMakingPromise; + expect(res).to.deep.equal({ + type: 'test', + args: ['world'], + }); + }); +}); + +export async function testCerts() { + const key1 = await generatePeerId(); + const key2 = await generatePeerId(); + + // connect to two different nodes + const cl1 = new FluenceClient(key1); + const cl2 = new FluenceClient(key2); + + await cl1.connect('/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb'); + await cl2.connect('/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er'); + + let trustGraph1 = new TrustGraph(/* cl1 */); + let trustGraph2 = new TrustGraph(/* cl2 */); + + let issuedAt = new Date(); + let expiresAt = new Date(); + // certificate expires after one day + expiresAt.setDate(new Date().getDate() + 1); + + // create root certificate for key1 and extend it with key2 + let rootCert = await nodeRootCert(key1); + let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime()); + + // publish certificates to Fluence network + await trustGraph1.publishCertificates(key2.toB58String(), [extended]); + + // get certificates from network + let certs = await trustGraph2.getCertificates(key2.toB58String()); + + // root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date + expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String()); + expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature); + expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt); + expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt); + + await cl1.disconnect(); + await cl2.disconnect(); +} diff --git a/src/test/greeting_wasm.ts b/src/__test__/greeting_wasm.ts similarity index 100% rename from src/test/greeting_wasm.ts rename to src/__test__/greeting_wasm.ts diff --git a/src/test/mocha.opts b/src/__test__/mocha.opts similarity index 67% rename from src/test/mocha.opts rename to src/__test__/mocha.opts index 9dd9d728..bbb2533b 100644 --- a/src/test/mocha.opts +++ b/src/__test__/mocha.opts @@ -2,4 +2,4 @@ --require @babel/register -src/test/**/*.spec.ts +src/__test__/**/*.spec.ts diff --git a/src/__test__/util.ts b/src/__test__/util.ts new file mode 100644 index 00000000..c9e96ab1 --- /dev/null +++ b/src/__test__/util.ts @@ -0,0 +1,15 @@ +import { FluenceClient, generatePeerId } from '..'; + +export const createLocalClient = async () => { + const peerId = await generatePeerId(); + const client = new FluenceClient(peerId); + await client.local(); + return client; +}; + +export const createConnectedClient = async (node: string) => { + const peerId = await generatePeerId(); + const client = new FluenceClient(peerId); + await client.connect(node); + return client; +}; diff --git a/src/api.ts b/src/api.ts new file mode 100644 index 00000000..216d1167 --- /dev/null +++ b/src/api.ts @@ -0,0 +1,111 @@ +import { FluenceClient } from './FluenceClient'; +import { SecurityTetraplet } from './internal/commonTypes'; +import { genUUID, Particle } from './internal/particle'; +import Multiaddr from 'multiaddr'; +import PeerId, { isPeerId } from 'peer-id'; +import { generatePeerId, seedToPeerId } from './internal/peerIdUtils'; + +type Node = { + peerId: string; + multiaddr: string; +}; + +export const createClient = async ( + connectTo?: string | Multiaddr | Node, + peerIdOrSeed?: PeerId | string, +): Promise => { + let peerId; + if (!peerIdOrSeed) { + peerId = await generatePeerId(); + } else if (isPeerId(peerIdOrSeed)) { + // keep unchanged + peerId = peerIdOrSeed; + } else { + // peerId is string, therefore seed + peerId = await seedToPeerId(peerIdOrSeed); + } + + const client = new FluenceClient(peerId); + + if (connectTo) { + let theAddress: Multiaddr; + let fromNode = (connectTo as any).multiaddr; + if (fromNode) { + theAddress = new Multiaddr(fromNode); + } else { + theAddress = new Multiaddr(connectTo as string); + } + + await client.connect(theAddress); + } + + return client; +}; + +export const sendParticle = async (client: FluenceClient, particle: Particle): Promise => { + return await client.sendScript(particle.script, particle.data, particle.ttl); +}; + +export const registerServiceFunction = ( + client: FluenceClient, + serviceId: string, + fnName: string, + handler: (args: any[], tetraplets: SecurityTetraplet[][]) => object, +) => { + client.registerCallback(serviceId, fnName, handler); +}; + +// prettier-ignore +export const unregisterServiceFunction = ( + client: FluenceClient, + serviceId: string, + fnName: string +) => { + client.unregisterCallback(serviceId, fnName); +}; + +export const subscribeToEvent = ( + client: FluenceClient, + serviceId: string, + fnName: string, + handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void, +): Function => { + const realHandler = (args: any[], tetraplets: SecurityTetraplet[][]) => { + // dont' block + setImmediate(() => { + handler(args, tetraplets); + }); + + return {}; + }; + registerServiceFunction(client, serviceId, fnName, realHandler); + return () => { + unregisterServiceFunction(client, serviceId, fnName); + }; +}; + +export const sendParticleAsFetch = async ( + client: FluenceClient, + particle: Particle, + callbackFnName: string, + callbackServiceId: string = '_callback', +): Promise => { + const serviceId = callbackServiceId; + const fnName = callbackFnName; + + let promise: Promise = new Promise(function (resolve, reject) { + const unsub = subscribeToEvent(client, serviceId, fnName, (args: any[], _) => { + unsub(); + resolve(args as any); + }); + + setTimeout(() => { + unsub(); + reject(new Error(`callback for ${callbackServiceId}/${callbackFnName} timed out after ${particle.ttl}`)); + }, particle.ttl); + }); + + sendParticle(client, particle); + + return promise; +}; diff --git a/src/aqua/scripts.ts b/src/aqua/scripts.ts deleted file mode 100644 index 10251dd7..00000000 --- a/src/aqua/scripts.ts +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Experimental attempts to generate aqua code through typescript functions. - */ -export interface Value { - name: string, - value: T -} - -export function value(name: string, v: T): Value { - return { name, value: v } -} - -function updateData(value: Value, data: Map): void { - if (!data.get(value.name)) { - data.set(value.name, value.value) - } -} - -function isValue(value: string | Value): value is Value { - return (value as Value).name !== undefined; -} - -/** - * Generate a script with a call. Data is modified. - * @param target - * @param service - * @param functionV - * @param args - * @param returnName - * @param data - */ -export function call(target: string | Value, service: string | Value, - functionV: string | Value, args: (string | Value)[], - returnName: string | undefined, data: Map): string { - - let targetName = target; - if (isValue(target)) { - updateData(target, data) - targetName = target.name - } - - let serviceName = service; - if (isValue(service)) { - updateData(service, data) - serviceName = service.name; - } - - let functionName = functionV; - if (isValue(functionV)) { - updateData(functionV, data) - functionName = functionV.name; - } - - let argsStr: string[] = [] - args.forEach((v) => { - if (isValue(v)) { - updateData(v, data) - argsStr.push(v.name) - } else { - argsStr.push(v) - } - }) - - if (!returnName) { - returnName = "" - } - - return `(call ${targetName} ("${serviceName}" "${functionName}") [${argsStr.join(" ")}] ${returnName})` -} - -function wrap(command: string, scripts: string[]): string { - if (scripts.length === 2) { - return `(${command} - ${scripts[0]} - ${scripts[1]} -)` - } else { - let first = scripts.shift() - return `(${command} - ${first} - ${wrap(command, scripts)} -)` - } -} - -/** - * Wrap an array of scripts with multiple 'seq' commands - * @param scripts - */ -export function seq(scripts: string[]): string { - if (scripts.length < 2) { - throw new Error("For 'seq' there must be at least 2 scripts") - } - - return wrap("seq", scripts) -} - -/** - * Wrap a script with 'par' command - * @param script - */ -export function par(script: string): string { - return `par( - ${script} -) - ` -} diff --git a/src/dataStorage.ts b/src/dataStorage.ts deleted file mode 100644 index 8d10e769..00000000 --- a/src/dataStorage.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { getCurrentParticleId, registerService } from './globalState'; -import { ServiceMultiple } from './service'; -import log from 'loglevel'; - -let storage: Map> = new Map(); - -export function addData(particleId: string, data: Map, ttl: number) { - storage.set(particleId, data); - setTimeout(() => { - log.debug(`data for ${particleId} is deleted`); - storage.delete(particleId); - }, ttl); -} - -export const storageService = new ServiceMultiple(''); -storageService.registerFunction('load', (args: any[]) => { - let current = getCurrentParticleId(); - - let data = storage.get(current); - - if (data) { - return data.get(args[0]); - } else { - return {}; - } -}); -registerService(storageService); diff --git a/src/fluence.ts b/src/fluence.ts deleted file mode 100644 index a456035d..00000000 --- a/src/fluence.ts +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as PeerId from 'peer-id'; -import Multiaddr from 'multiaddr'; -import { FluenceClient } from './fluenceClient'; -import * as log from 'loglevel'; -import { LogLevelDesc } from 'loglevel'; -import { parseAstClosure } from './stepper'; - -log.setLevel('info'); - -export default class Fluence { - static setLogLevel(level: LogLevelDesc): void { - log.setLevel(level); - } - - /** - * Generates new peer id with Ed25519 private key. - */ - static async generatePeerId(): Promise { - return await PeerId.create({ keyType: 'Ed25519' }); - } - - /** - * Create FluenceClient without connecting it to a relay - * - * @param peerId – client's peer id. Must contain a private key. See `generatePeerId()` - */ - static async local(peerId?: PeerId): Promise { - if (!peerId) { - peerId = await Fluence.generatePeerId(); - } - - let client = new FluenceClient(peerId); - await client.instantiateInterpreter(); - - return client; - } - - /** - * Connect to Fluence node. - * - * @param multiaddr must contain host peer id - * @param peerId your peer id. Should be with a private key. Could be generated by `generatePeerId()` function - */ - static async connect(multiaddr: string | Multiaddr, peerId?: PeerId): Promise { - let client = await Fluence.local(peerId); - - await client.connect(multiaddr); - return client; - } - - /// Parses script and returns AST in JSON format - /// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant? - static async parseAIR(script: string): Promise { - let closure = await parseAstClosure(); - return closure(script); - } -} - -declare global { - interface Window { - Fluence: Fluence; - } -} - -if (typeof window !== 'undefined') { - window.Fluence = Fluence; -} diff --git a/src/fluenceClient.ts b/src/fluenceClient.ts deleted file mode 100644 index 50adec5d..00000000 --- a/src/fluenceClient.ts +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { build, Particle } from './particle'; -import { StepperOutcome } from './stepperOutcome'; -import * as PeerId from 'peer-id'; -import Multiaddr from 'multiaddr'; -import { FluenceConnection } from './fluenceConnection'; -import { Subscriptions } from './subscriptions'; -import { enqueueParticle, getCurrentParticleId, popParticle, setCurrentParticleId } from './globalState'; -import { instantiateInterpreter, InterpreterInvoke } from './stepper'; -import log from 'loglevel'; -import { waitService } from './helpers/waitService'; -import { ModuleConfig } from './moduleConfig'; - -const bs58 = require('bs58'); - -const INFO_LOG_LEVEL = 2; - -export class FluenceClient { - readonly selfPeerId: PeerId; - readonly selfPeerIdStr: string; - - private nodePeerIdStr: string; - private subscriptions = new Subscriptions(); - private interpreter: InterpreterInvoke = undefined; - - connection: FluenceConnection; - - constructor(selfPeerId: PeerId) { - this.selfPeerId = selfPeerId; - this.selfPeerIdStr = selfPeerId.toB58String(); - } - - /** - * Pass a particle to a interpreter and send a result to other services. - */ - private async handleParticle(particle: Particle): Promise { - // if a current particle is processing, add new particle to the queue - if (getCurrentParticleId() !== undefined && getCurrentParticleId() !== particle.id) { - enqueueParticle(particle); - } else { - if (this.interpreter === undefined) { - throw new Error("Undefined. Interpreter is not initialized. Use 'Fluence.connect' to create a client."); - } - // start particle processing if queue is empty - try { - setCurrentParticleId(particle.id); - // check if a particle is relevant - let now = Date.now(); - let actualTtl = particle.timestamp + particle.ttl - now; - if (actualTtl <= 0) { - log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`); - } else { - // if there is no subscription yet, previous data is empty - let prevData: Uint8Array = Buffer.from([]); - let prevParticle = this.subscriptions.get(particle.id); - if (prevParticle) { - prevData = prevParticle.data; - // update a particle in a subscription - this.subscriptions.update(particle); - } else { - // set a particle with actual ttl - this.subscriptions.subscribe(particle, actualTtl); - } - let stepperOutcomeStr = this.interpreter( - particle.init_peer_id, - particle.script, - prevData, - particle.data, - ); - let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr); - - if (log.getLevel() <= INFO_LOG_LEVEL) { - log.info('inner interpreter outcome:'); - log.info(stepperOutcome) - } - - // update data after aquamarine execution - let newParticle: Particle = { ...particle }; - newParticle.data = stepperOutcome.data; - - this.subscriptions.update(newParticle); - - // do nothing if there is no `next_peer_pks` or if client isn't connected to the network - if (stepperOutcome.next_peer_pks.length > 0 && this.connection) { - await this.connection.sendParticle(newParticle).catch((reason) => { - console.error(`Error on sending particle with id ${particle.id}: ${reason}`); - }); - } - } - } finally { - // get last particle from the queue - let nextParticle = popParticle(); - // start the processing of a new particle if it exists - if (nextParticle) { - // update current particle - setCurrentParticleId(nextParticle.id); - await this.handleParticle(nextParticle); - } else { - // wait for a new call (do nothing) if there is no new particle in a queue - setCurrentParticleId(undefined); - } - } - } - } - - /** - * Handle incoming particle from a relay. - */ - private handleExternalParticle(): (particle: Particle) => Promise { - let _this = this; - - return async (particle: Particle) => { - let data: any = particle.data; - let error: any = data['protocol!error']; - if (error !== undefined) { - log.error('error in external particle: '); - log.error(error); - } else { - log.info('handle external particle: '); - log.info(particle); - await _this.handleParticle(particle); - } - }; - } - - async disconnect(): Promise { - return this.connection.disconnect(); - } - - /** - * Instantiate WebAssembly with AIR interpreter to execute AIR scripts - */ - async instantiateInterpreter() { - this.interpreter = await instantiateInterpreter(this.selfPeerId); - } - - /** - * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. - * - * @param multiaddr - */ - async connect(multiaddr: string | Multiaddr) { - multiaddr = Multiaddr(multiaddr); - - if (!this.interpreter) { - throw Error("you must call 'instantiateInterpreter' before 'connect'"); - } - - let nodePeerId = multiaddr.getPeerId(); - this.nodePeerIdStr = nodePeerId; - if (!nodePeerId) { - throw Error("'multiaddr' did not contain a valid peer id"); - } - - let firstConnection: boolean = true; - if (this.connection) { - firstConnection = false; - await this.connection.disconnect(); - } - - let node = PeerId.createFromB58String(nodePeerId); - let connection = new FluenceConnection(multiaddr, node, this.selfPeerId, this.handleExternalParticle()); - await connection.connect(); - - this.connection = connection; - } - - async sendParticle(particle: Particle): Promise { - await this.handleParticle(particle); - return particle.id; - } - - async executeParticle(particle: Particle) { - await this.handleParticle(particle); - } - - nodeIdentityCall(): string { - return `(call "${this.nodePeerIdStr}" ("op" "identity") [] void[])`; - } - - async requestResponse( - name: string, - call: (nodeId: string) => string, - returnValue: string, - data: Map, - handleResponse: (args: any[]) => T, - nodeId?: string, - ttl?: number, - ): Promise { - if (!ttl) { - ttl = 10000; - } - - if (!nodeId) { - nodeId = this.nodePeerIdStr; - } - - let serviceCall = call(nodeId); - - let namedPromise = waitService(name, handleResponse, ttl); - - let script = `(seq - ${this.nodeIdentityCall()} - (seq - (seq - ${serviceCall} - ${this.nodeIdentityCall()} - ) - (call "${this.selfPeerIdStr}" ("${namedPromise.name}" "") [${returnValue}] void[]) - ) - ) - `; - - let particle = await build(this.selfPeerId, script, data, ttl); - await this.sendParticle(particle); - - return namedPromise.promise; - } - - /** - * Send a script to add module to a relay. Waiting for a response from a relay. - */ - async addModule( - name: string, - moduleBase64: string, - config?: ModuleConfig, - nodeId?: string, - ttl?: number, - ): Promise { - if (!config) { - config = { - name: name, - mem_pages_count: 100, - logger_enabled: true, - wasi: { - envs: {}, - preopened_files: ['/tmp'], - mapped_dirs: {}, - }, - }; - } - - let data = new Map(); - data.set('module_bytes', moduleBase64); - data.set('module_config', config); - - let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_module") [module_bytes module_config] void[])`; - - return this.requestResponse('addModule', call, '', data, () => {}, nodeId, ttl); - } - - /** - * Send a script to add module to a relay. Waiting for a response from a relay. - */ - async addBlueprint( - name: string, - dependencies: string[], - blueprintId?: string, - nodeId?: string, - ttl?: number, - ): Promise { - let returnValue = 'blueprint_id'; - let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_blueprint") [blueprint] ${returnValue})`; - - let data = new Map(); - data.set('blueprint', { name: name, dependencies: dependencies, id: blueprintId }); - - return this.requestResponse( - 'addBlueprint', - call, - returnValue, - data, - (args: any[]) => args[0] as string, - nodeId, - ttl, - ); - } - - /** - * Send a script to create a service to a relay. Waiting for a response from a relay. - */ - async createService(blueprintId: string, nodeId?: string, ttl?: number): Promise { - let returnValue = 'service_id'; - let call = (nodeId: string) => `(call "${nodeId}" ("srv" "create") [blueprint_id] ${returnValue})`; - - let data = new Map(); - data.set('blueprint_id', blueprintId); - - return this.requestResponse( - 'createService', - call, - returnValue, - data, - (args: any[]) => args[0] as string, - nodeId, - ttl, - ); - } - - /** - * Get all available modules hosted on a connected relay. - */ - async getAvailableModules(nodeId?: string, ttl?: number): Promise { - let returnValue = 'modules'; - let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_modules") [] ${returnValue})`; - - return this.requestResponse( - 'getAvailableModules', - call, - returnValue, - new Map(), - (args: any[]) => args[0] as string[], - nodeId, - ttl, - ); - } - - /** - * Get all available blueprints hosted on a connected relay. - */ - async getBlueprints(nodeId: string, ttl?: number): Promise { - let returnValue = 'blueprints'; - let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_blueprints") [] ${returnValue})`; - - return this.requestResponse( - 'getBlueprints', - call, - returnValue, - new Map(), - (args: any[]) => args[0] as string[], - nodeId, - ttl, - ); - } - - /** - * Add a provider to DHT network to neighborhood around a key. - */ - async addProvider( - key: Buffer, - providerPeer: string, - providerServiceId?: string, - nodeId?: string, - ttl?: number, - ): Promise { - let call = (nodeId: string) => `(call "${nodeId}" ("dht" "add_provider") [key provider] void[])`; - - key = bs58.encode(key); - - let provider = { - peer: providerPeer, - service_id: providerServiceId, - }; - - let data = new Map(); - data.set('key', key); - data.set('provider', provider); - - return this.requestResponse('addProvider', call, '', data, () => {}, nodeId, ttl); - } - - /** - * Get a provider from DHT network from neighborhood around a key.. - */ - async getProviders(key: Buffer, nodeId?: string, ttl?: number): Promise { - key = bs58.encode(key); - - let returnValue = 'providers'; - let call = (nodeId: string) => `(call "${nodeId}" ("dht" "get_providers") [key] providers[])`; - - let data = new Map(); - data.set('key', key); - - return this.requestResponse('getProviders', call, returnValue, data, (args) => args[0], nodeId, ttl); - } - - /** - * Get relays neighborhood - */ - async neighborhood(node: string, ttl?: number): Promise { - let returnValue = 'neighborhood'; - let call = (nodeId: string) => `(call "${nodeId}" ("dht" "neighborhood") [node] ${returnValue})`; - - let data = new Map(); - data.set('node', node); - - return this.requestResponse('neighborhood', call, returnValue, data, (args) => args[0] as string[], node, ttl); - } - - /** - * Call relays 'identity' method. It should return passed 'fields' - */ - async relayIdentity(fields: string[], data: Map, nodeId?: string, ttl?: number): Promise { - let returnValue = 'id'; - let call = (nodeId: string) => `(call "${nodeId}" ("op" "identity") [${fields.join(' ')}] ${returnValue})`; - - return this.requestResponse('getIdentity', call, returnValue, data, (args: any[]) => args[0], nodeId, ttl); - } -} diff --git a/src/globalState.ts b/src/globalState.ts deleted file mode 100644 index f05670d7..00000000 --- a/src/globalState.ts +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Service } from './service'; -import { Particle } from './particle'; - -// TODO put state with wasm file in each created FluenceClient -let services: Map = new Map(); -let particlesQueue: Particle[] = []; -let currentParticle: string | undefined = undefined; - -export function getCurrentParticleId(): string | undefined { - return currentParticle; -} - -export function setCurrentParticleId(particle: string | undefined) { - currentParticle = particle; -} - -export function enqueueParticle(particle: Particle): void { - particlesQueue.push(particle); -} - -export function popParticle(): Particle | undefined { - return particlesQueue.pop(); -} - -export function registerService(service: Service) { - services.set(service.serviceId, service); -} - -export function deleteService(serviceId: string): boolean { - return services.delete(serviceId); -} - -export function getService(serviceId: string): Service | undefined { - return services.get(serviceId); -} diff --git a/src/helpers/waitService.ts b/src/helpers/waitService.ts deleted file mode 100644 index ff8ffb8c..00000000 --- a/src/helpers/waitService.ts +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Creates service that will wait for a response from external peers. - */ -import { genUUID } from '../particle'; -import log from 'loglevel'; -import { ServiceMultiple } from '../service'; -import { deleteService, registerService } from '../globalState'; -import { delay } from '../utils'; - -interface NamedPromise { - promise: Promise; - name: string; -} - -/** - * Generates a service and a name of a service. - * Name should be used in a script. - * Promise will wait a result from a script or will be resolved after `ttl` milliseconds. - * @param ttl - */ -export function waitResult(ttl: number): NamedPromise { - return waitService(genUUID(), (args: any[]) => args, ttl); -} - -export function waitService(functionName: string, func: (args: any[]) => T, ttl: number): NamedPromise { - let serviceName = `${functionName}-${genUUID()}`; - log.info(`Create waiting service '${serviceName}'`); - let service = new ServiceMultiple(serviceName); - registerService(service); - - let promise: Promise = new Promise(function (resolve) { - service.registerFunction('', (args: any[]) => { - resolve(func(args)); - return {}; - }); - }); - - let timeout = delay(ttl, 'Timeout on waiting ' + serviceName); - - return { - name: serviceName, - promise: Promise.race([promise, timeout]).finally(() => { - deleteService(serviceName); - }), - }; -} diff --git a/src/stepperOutcome.ts b/src/index.ts similarity index 68% rename from src/stepperOutcome.ts rename to src/index.ts index 9109d99e..c48be3ca 100644 --- a/src/stepperOutcome.ts +++ b/src/index.ts @@ -14,8 +14,8 @@ * limitations under the License. */ -export interface StepperOutcome { - ret_code: number; - data: Uint8Array; - next_peer_pks: string[]; -} +export { seedToPeerId, peerIdToSeed, generatePeerId } from './internal/peerIdUtils'; +export { FluenceClient } from './FluenceClient'; +export { SecurityTetraplet, PeerIdB58 } from './internal/commonTypes'; +export * from './api'; +export { Particle } from './internal/particle'; diff --git a/src/internal/FluenceClientBase.ts b/src/internal/FluenceClientBase.ts new file mode 100644 index 00000000..73ca310b --- /dev/null +++ b/src/internal/FluenceClientBase.ts @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { build } from './particle'; +import * as PeerId from 'peer-id'; +import Multiaddr from 'multiaddr'; +import { FluenceConnection } from './FluenceConnection'; + +import { ParticleProcessor } from './ParticleProcessor'; +import { ParticleProcessorStrategy } from './ParticleProcessorStrategy'; +import log from 'loglevel'; +import { PeerIdB58 } from './commonTypes'; + +export abstract class FluenceClientBase { + readonly selfPeerIdFull: PeerId; + + get relayPeerId(): PeerIdB58 { + return this.connection?.nodePeerId.toB58String(); + } + + get selfPeerId(): PeerIdB58 { + return this.selfPeerIdFull.toB58String(); + } + + get isConnected(): boolean { + return this.connection?.isConnected(); + } + + protected connection: FluenceConnection; + protected processor: ParticleProcessor; + protected abstract strategy: ParticleProcessorStrategy; + + constructor(selfPeerIdFull: PeerId) { + this.selfPeerIdFull = selfPeerIdFull; + } + + async disconnect(): Promise { + await this.connection.disconnect(); + await this.processor.destroy(); + } + + // HACK:: this is only needed to fix tests. + // Particle processor should be tested instead + async local(): Promise { + await this.processor.init(); + } + + /** + * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. + * + * @param multiaddr + */ + async connect(multiaddr: string | Multiaddr): Promise { + multiaddr = Multiaddr(multiaddr); + + const nodePeerId = multiaddr.getPeerId(); + if (!nodePeerId) { + throw Error("'multiaddr' did not contain a valid peer id"); + } + + if (this.connection) { + await this.connection.disconnect(); + } + + const node = PeerId.createFromB58String(nodePeerId); + const connection = new FluenceConnection( + multiaddr, + node, + this.selfPeerIdFull, + this.processor.executeExternalParticle.bind(this.processor), + ); + await connection.connect(); + await this.processor.init(); + + this.connection = connection; + } + + async sendScript(script: string, data?: Map, ttl?: number): Promise { + const particle = await build(this.selfPeerIdFull, script, data, ttl); + this.processor.executeLocalParticle(particle); + return particle.id; + } +} diff --git a/src/fluenceConnection.ts b/src/internal/FluenceConnection.ts similarity index 88% rename from src/fluenceConnection.ts rename to src/internal/FluenceConnection.ts index 9743c56f..81dfa0c5 100644 --- a/src/fluenceConnection.ts +++ b/src/internal/FluenceConnection.ts @@ -23,7 +23,7 @@ import pipe from 'it-pipe'; import Multiaddr from 'multiaddr'; import PeerId from 'peer-id'; import * as log from 'loglevel'; -import { build, parseParticle, Particle, toAction } from './particle'; +import { parseParticle, ParticleDto, toPayload } from './particle'; export const PROTOCOL_NAME = '/fluence/faas/1.0.0'; @@ -39,9 +39,14 @@ export class FluenceConnection { private readonly address: Multiaddr; readonly nodePeerId: PeerId; private readonly selfPeerIdStr: string; - private readonly handleParticle: (call: Particle) => void; + private readonly handleParticle: (call: ParticleDto) => void; - constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, handleParticle: (call: Particle) => void) { + constructor( + multiaddr: Multiaddr, + hostPeerId: PeerId, + selfPeerId: PeerId, + handleParticle: (call: ParticleDto) => void, + ) { this.selfPeerId = selfPeerId; this.handleParticle = handleParticle; this.selfPeerIdStr = selfPeerId.toB58String(); @@ -114,14 +119,10 @@ export class FluenceConnection { this.status = Status.Disconnected; } - async buildParticle(script: string, data: Map, ttl?: number): Promise { - return build(this.selfPeerId, script, data, ttl); - } - - async sendParticle(particle: Particle): Promise { + async sendParticle(particle: ParticleDto): Promise { this.checkConnectedOrThrow(); - let action = toAction(particle) + let action = toPayload(particle); let particleStr = JSON.stringify(action); log.debug('send particle: \n' + JSON.stringify(action, undefined, 2)); diff --git a/src/internal/ParticleProcessor.ts b/src/internal/ParticleProcessor.ts new file mode 100644 index 00000000..1972ef3c --- /dev/null +++ b/src/internal/ParticleProcessor.ts @@ -0,0 +1,239 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ParticleDto } from './particle'; +import * as PeerId from 'peer-id'; +import { instantiateInterpreter, InterpreterInvoke } from './stepper'; +import { ParticleHandler, SecurityTetraplet, StepperOutcome } from './commonTypes'; +import log from 'loglevel'; +import { ParticleProcessorStrategy } from './ParticleProcessorStrategy'; + +// HACK:: make an api for aqua stepper to accept variables in an easy way! +let magicParticleStorage: Map> = new Map(); + +// HACK:: make an api for aqua stepper to accept variables in an easy way! +export function injectDataIntoParticle(particleId: string, data: Map, ttl: number) { + log.debug(`setting data for ${particleId}`, data); + magicParticleStorage.set(particleId, data); + setTimeout(() => { + log.debug(`data for ${particleId} is deleted`); + magicParticleStorage.delete(particleId); + }, ttl); +} + +// HACK:: make an api for aqua stepper to accept variables in an easy way! +const wrapWithDataInjectionHandling = ( + handler: ParticleHandler, + getCurrentParticleId: () => string, +): ParticleHandler => { + return (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => { + if (serviceId === '__magic' && fnName === 'load') { + const current = getCurrentParticleId(); + const data = magicParticleStorage.get(current); + + const res = data ? data.get(args[0]) : {}; + return { + ret_code: 0, + result: JSON.stringify(res), + }; + } + + return handler(serviceId, fnName, args, tetraplets); + }; +}; + +export class ParticleProcessor { + private interpreter: InterpreterInvoke; + private subscriptions: Map = new Map(); + private particlesQueue: ParticleDto[] = []; + private currentParticle?: string; + + strategy: ParticleProcessorStrategy; + peerId: PeerId; + + constructor(strategy: ParticleProcessorStrategy, peerId: PeerId) { + this.strategy = strategy; + this.peerId = peerId; + } + + async init() { + await this.instantiateInterpreter(); + } + + async destroy() { + // TODO: destroy interpreter + } + + async executeLocalParticle(particle: ParticleDto) { + this.strategy?.onLocalParticleRecieved(particle); + await this.handleParticle(particle).catch((err) => { + log.error('particle processing failed: ' + err); + }); + } + + async executeExternalParticle(particle: ParticleDto) { + this.strategy?.onExternalParticleRecieved(particle); + await this.handleExternalParticle(particle); + } + + /* + * private + */ + + private getCurrentParticleId(): string | undefined { + return this.currentParticle; + } + + private setCurrentParticleId(particle: string | undefined) { + this.currentParticle = particle; + } + + private enqueueParticle(particle: ParticleDto): void { + this.particlesQueue.push(particle); + } + + private popParticle(): ParticleDto | undefined { + return this.particlesQueue.pop(); + } + + /** + * Subscriptions will be applied by outside message if id will be the same. + * + * @param particle + * @param ttl time to live, subscription will be deleted after this time + */ + subscribe(particle: ParticleDto, ttl: number) { + let self = this; + setTimeout(() => { + self.subscriptions.delete(particle.id); + self.strategy?.onParticleTimeout(particle, Date.now()); + }, ttl); + this.subscriptions.set(particle.id, particle); + } + + updateSubscription(particle: ParticleDto): boolean { + if (this.subscriptions.has(particle.id)) { + this.subscriptions.set(particle.id, particle); + return true; + } else { + return false; + } + } + + getSubscription(id: string): ParticleDto | undefined { + return this.subscriptions.get(id); + } + + hasSubscription(particle: ParticleDto): boolean { + return this.subscriptions.has(particle.id); + } + + /** + * Pass a particle to a interpreter and send a result to other services. + */ + private async handleParticle(particle: ParticleDto): Promise { + // if a current particle is processing, add new particle to the queue + if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) { + this.enqueueParticle(particle); + } else { + if (this.interpreter === undefined) { + throw new Error('Undefined. Interpreter is not initialized'); + } + // start particle processing if queue is empty + try { + this.setCurrentParticleId(particle.id); + // check if a particle is relevant + let now = Date.now(); + let actualTtl = particle.timestamp + particle.ttl - now; + if (actualTtl <= 0) { + this.strategy?.onParticleTimeout(particle, now); + } else { + // if there is no subscription yet, previous data is empty + let prevData: Uint8Array = Buffer.from([]); + let prevParticle = this.getSubscription(particle.id); + if (prevParticle) { + prevData = prevParticle.data; + // update a particle in a subscription + this.updateSubscription(particle); + } else { + // set a particle with actual ttl + this.subscribe(particle, actualTtl); + } + this.strategy.onStepperExecuting(particle); + let stepperOutcomeStr = this.interpreter( + particle.init_peer_id, + particle.script, + prevData, + particle.data, + ); + let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr); + + // update data after aquamarine execution + let newParticle: ParticleDto = { ...particle, data: stepperOutcome.data }; + this.strategy.onStepperExecuted(stepperOutcome); + + this.updateSubscription(newParticle); + + // do nothing if there is no `next_peer_pks` or if client isn't connected to the network + if (stepperOutcome.next_peer_pks.length > 0) { + this.strategy.sendParticleFurther(newParticle); + } + } + } finally { + // get last particle from the queue + let nextParticle = this.popParticle(); + // start the processing of a new particle if it exists + if (nextParticle) { + // update current particle + this.setCurrentParticleId(nextParticle.id); + await this.handleParticle(nextParticle); + } else { + // wait for a new call (do nothing) if there is no new particle in a queue + this.setCurrentParticleId(undefined); + } + } + } + } + + /** + * Handle incoming particle from a relay. + */ + private async handleExternalParticle(particle: ParticleDto): Promise { + let data: any = particle.data; + let error: any = data['protocol!error']; + if (error !== undefined) { + log.error('error in external particle: '); + log.error(error); + } else { + log.info('handle external particle: '); + log.info(particle); + await this.handleParticle(particle); + } + } + + /** + * Instantiate WebAssembly with AIR interpreter to execute AIR scripts + */ + async instantiateInterpreter() { + this.interpreter = await instantiateInterpreter( + wrapWithDataInjectionHandling( + this.strategy.particleHandler.bind(this), + this.getCurrentParticleId.bind(this), + ), + this.peerId, + ); + } +} diff --git a/src/moduleConfig.ts b/src/internal/ParticleProcessorStrategy.ts similarity index 51% rename from src/moduleConfig.ts rename to src/internal/ParticleProcessorStrategy.ts index 6f016434..1f6c6429 100644 --- a/src/moduleConfig.ts +++ b/src/internal/ParticleProcessorStrategy.ts @@ -14,16 +14,16 @@ * limitations under the License. */ -export interface ModuleConfig { - name: string; - mem_pages_count?: number; - logger_enabled?: boolean; - wasi?: Wasi; - mounted_binaries?: object; -} +import { ParticleHandler, StepperOutcome } from './commonTypes'; +import { ParticleDto } from './particle'; -export interface Wasi { - envs?: object; - preopened_files?: string[]; - mapped_dirs?: object; +export interface ParticleProcessorStrategy { + particleHandler: ParticleHandler; + sendParticleFurther: (particle: ParticleDto) => void; + + onParticleTimeout?: (particle: ParticleDto, now: number) => void; + onLocalParticleRecieved?: (particle: ParticleDto) => void; + onExternalParticleRecieved?: (particle: ParticleDto) => void; + onStepperExecuting?: (particle: ParticleDto) => void; + onStepperExecuted?: (stepperOutcome: StepperOutcome) => void; } diff --git a/src/aqua/index.d.ts b/src/internal/aqua/index.d.ts similarity index 100% rename from src/aqua/index.d.ts rename to src/internal/aqua/index.d.ts diff --git a/src/aqua/index.js b/src/internal/aqua/index.js similarity index 100% rename from src/aqua/index.js rename to src/internal/aqua/index.js diff --git a/src/aqua/index_bg.js b/src/internal/aqua/index_bg.js similarity index 100% rename from src/aqua/index_bg.js rename to src/internal/aqua/index_bg.js diff --git a/src/securityTetraplet.ts b/src/internal/commonTypes.ts similarity index 67% rename from src/securityTetraplet.ts rename to src/internal/commonTypes.ts index 721744df..71a0c566 100644 --- a/src/securityTetraplet.ts +++ b/src/internal/commonTypes.ts @@ -14,6 +14,24 @@ * limitations under the License. */ +export interface CallServiceResult { + ret_code: number; + result: string; +} + +export type ParticleHandler = ( + serviceId: string, + fnName: string, + args: any[], + tetraplets: SecurityTetraplet[][], +) => CallServiceResult; + +export interface StepperOutcome { + ret_code: number; + data: Uint8Array; + next_peer_pks: string[]; +} + export interface ResolvedTriplet { peer_pk: string; service_id: string; @@ -23,3 +41,5 @@ export interface ResolvedTriplet { export interface SecurityTetraplet extends ResolvedTriplet { json_path: string; } + +export type PeerIdB58 = string; diff --git a/src/particle.ts b/src/internal/particle.ts similarity index 65% rename from src/particle.ts rename to src/internal/particle.ts index 1c9c8e2f..e55244fe 100644 --- a/src/particle.ts +++ b/src/internal/particle.ts @@ -15,14 +15,36 @@ */ import { v4 as uuidv4 } from 'uuid'; -import {fromByteArray, toByteArray} from 'base64-js'; +import { fromByteArray, toByteArray } from 'base64-js'; import PeerId from 'peer-id'; import { encode } from 'bs58'; -import { addData } from './dataStorage'; +import { injectDataIntoParticle } from './ParticleProcessor'; const DEFAULT_TTL = 7000; -export interface Particle { +export class Particle { + script: string; + data: Map; + ttl: number; + + constructor(script: string, data?: Map | Record, ttl?: number) { + this.script = script; + if (data === undefined) { + this.data = new Map(); + } else if (data instanceof Map) { + this.data = data; + } else { + this.data = new Map(); + for (let k in data) { + this.data.set(k, data[k]); + } + } + + this.ttl = ttl ?? DEFAULT_TTL; + } +} + +export interface ParticleDto { id: string; init_peer_id: string; timestamp: number; @@ -36,8 +58,8 @@ export interface Particle { /** * Represents particle action to send to a node */ -interface ParticleAction { - action: 'Particle' +interface ParticlePayload { + action: 'Particle'; id: string; init_peer_id: string; timestamp: number; @@ -47,11 +69,11 @@ interface ParticleAction { data: string; } -function wrapScript(selfPeerId: string, script: string, fields: string[]): string { +function wrapWithVariableInjectionScript(script: string, fields: string[]): string { fields.forEach((v) => { script = ` (seq - (call %init_peer_id% ("" "load") ["${v}"] ${v}) + (call %init_peer_id% ("__magic" "load") ["${v}"] ${v}) ${script} ) `; @@ -60,18 +82,28 @@ function wrapScript(selfPeerId: string, script: string, fields: string[]): strin return script; } -export async function build(peerId: PeerId, script: string, data: Map, ttl?: number): Promise { - let id = genUUID(); +export async function build( + peerId: PeerId, + script: string, + data?: Map, + ttl?: number, + customId?: string, +): Promise { + const id = customId ?? genUUID(); let currentTime = new Date().getTime(); + if (data === undefined) { + data = new Map(); + } + if (ttl === undefined) { ttl = DEFAULT_TTL; } - addData(id, data, ttl); - script = wrapScript(peerId.toB58String(), script, Array.from(data.keys())); + injectDataIntoParticle(id, data, ttl); + script = wrapWithVariableInjectionScript(script, Array.from(data.keys())); - let particle: Particle = { + let particle: ParticleDto = { id: id, init_peer_id: peerId.toB58String(), timestamp: currentTime, @@ -89,9 +121,9 @@ export async function build(peerId: PeerId, script: string, data: Map { +export async function signParticle(peerId: PeerId, particle: ParticleDto): Promise { let bufToSign = canonicalBytes(particle); let signature = await peerId.privKey.sign(bufToSign); diff --git a/src/seed.ts b/src/internal/peerIdUtils.ts similarity index 90% rename from src/seed.ts rename to src/internal/peerIdUtils.ts index 81187963..9f81deea 100644 --- a/src/seed.ts +++ b/src/internal/peerIdUtils.ts @@ -32,3 +32,7 @@ export function peerIdToSeed(peerId: PeerId): string { let seedBuf = peerId.privKey.marshal().subarray(0, 32); return encode(seedBuf); } + +export async function generatePeerId(): Promise { + return await PeerId.create({ keyType: 'Ed25519' }); +} diff --git a/src/stepper.ts b/src/internal/stepper.ts similarity index 79% rename from src/stepper.ts rename to src/internal/stepper.ts index fbbf3ddf..40505026 100644 --- a/src/stepper.ts +++ b/src/internal/stepper.ts @@ -17,8 +17,8 @@ import { toByteArray } from 'base64-js'; import * as aqua from './aqua'; import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from './aqua'; +import { ParticleHandler, CallServiceResult, SecurityTetraplet } from './commonTypes'; -import { service } from './service'; import PeerId from 'peer-id'; import log from 'loglevel'; import { wasmBs64 } from '@fluencelabs/aquamarine-stepper'; @@ -26,7 +26,12 @@ import Instance = WebAssembly.Instance; import Exports = WebAssembly.Exports; import ExportValue = WebAssembly.ExportValue; -export type InterpreterInvoke = (init_user_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => string; +export type InterpreterInvoke = ( + init_user_id: string, + script: string, + prev_data: Uint8Array, + data: Uint8Array, +) => string; type ImportObject = { './aquamarine_client_bg.js': { // fn call_service_impl(service_id: String, fn_name: String, args: String, security_tetraplets: String) -> String; @@ -117,8 +122,32 @@ function log_import(cfg: HostImportsConfig): LogImport { }; } +const theParticleHandler = ( + callback: ParticleHandler, + service_id: string, + fn_name: string, + args: string, + tetraplets: string, +): CallServiceResult => { + try { + let argsObject = JSON.parse(args); + if (!Array.isArray(argsObject)) { + throw new Error('args is not an array'); + } + + let tetrapletsObject: SecurityTetraplet[][] = JSON.parse(tetraplets); + return callback(service_id, fn_name, argsObject, tetrapletsObject); + } catch (err) { + console.error('Cannot parse arguments: ' + JSON.stringify(err)); + return { + result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)), + ret_code: 1, + }; + } +}; + /// Returns import object that describes host functions called by AIR interpreter -function newImportObject(cfg: HostImportsConfig, peerId: PeerId): ImportObject { +function newImportObject(particleHandler: ParticleHandler, cfg: HostImportsConfig, peerId: PeerId): ImportObject { return { // __wbg_callserviceimpl_c0ca292e3c8c0c97 this is a function generated by bindgen. Could be changed. // If so, an error with a new name will be occurred after wasm initialization. @@ -131,7 +160,14 @@ function newImportObject(cfg: HostImportsConfig, peerId: PeerId): ImportObject { let fnName = getStringFromWasm0(wasm, arg3, arg4); let args = getStringFromWasm0(wasm, arg5, arg6); let tetraplets = getStringFromWasm0(wasm, arg7, arg8); - let serviceResult = service(serviceId, fnName, args, tetraplets); + /* + TODO:: parse and pack arguments into structure like the following + class Argument { + value: T, + SecurityTetraplet: tetraplet + } + */ + let serviceResult = theParticleHandler(particleHandler, serviceId, fnName, args, tetraplets); let resultStr = JSON.stringify(serviceResult); return_call_service_result(wasm, resultStr, arg0); } finally { @@ -167,9 +203,12 @@ function newLogImport(cfg: HostImportsConfig): ImportObject { /// Instantiates AIR interpreter, and returns its `invoke` function as closure /// NOTE: an interpreter is also called a stepper from time to time -export async function instantiateInterpreter(peerId: PeerId): Promise { +export async function instantiateInterpreter( + particleHandler: ParticleHandler, + peerId: PeerId, +): Promise { let cfg = new HostImportsConfig((cfg) => { - return newImportObject(cfg, peerId); + return newImportObject(particleHandler, cfg, peerId); }); let instance = await interpreterInstance(cfg); @@ -204,3 +243,10 @@ export async function parseAstClosure(): Promise<(script: string) => string> { return aqua.ast(instance.exports, script); }; } + +/// Parses script and returns AST in JSON format +/// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant? +export async function parseAIR(script: string): Promise { + let closure = await parseAstClosure(); + return closure(script); +} diff --git a/src/trust/certificate.ts b/src/internal/trust/certificate.ts similarity index 100% rename from src/trust/certificate.ts rename to src/internal/trust/certificate.ts diff --git a/src/trust/misc.ts b/src/internal/trust/misc.ts similarity index 100% rename from src/trust/misc.ts rename to src/internal/trust/misc.ts diff --git a/src/trust/trust.ts b/src/internal/trust/trust.ts similarity index 100% rename from src/trust/trust.ts rename to src/internal/trust/trust.ts diff --git a/src/trust/trust_graph.ts b/src/internal/trust/trust_graph.ts similarity index 94% rename from src/trust/trust_graph.ts rename to src/internal/trust/trust_graph.ts index c60840bb..9410d07e 100644 --- a/src/trust/trust_graph.ts +++ b/src/internal/trust/trust_graph.ts @@ -14,18 +14,15 @@ * limitations under the License. */ -import { FluenceClient } from '../fluenceClient'; import { Certificate, certificateFromString, certificateToString } from './certificate'; import * as log from 'loglevel'; // TODO update after 'aquamarine' implemented // The client to interact with the Fluence trust graph API export class TrustGraph { - client: FluenceClient; + //client: FluenceClient; - constructor(client: FluenceClient) { - this.client = client; - } + constructor() {} // Publish certificate to Fluence network. It will be published in Kademlia neighbourhood by `peerId` key. async publishCertificates(peerId: string, certs: Certificate[]) { diff --git a/src/service.ts b/src/service.ts deleted file mode 100644 index bf8ccc53..00000000 --- a/src/service.ts +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { getService } from './globalState'; -import { SecurityTetraplet } from './securityTetraplet'; - -export interface CallServiceResult { - ret_code: number; - result: string; -} - -export abstract class Service { - serviceId: string; - - /** - * Calls the function from local client - * @param fnName - name of the function to call - * @param args - arguments to be passed to the function - * @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number. - * If the argument is not an array the second array will always contain exactly one element. - * If the argument is an array the second index will correspond to the index of element in argument's array - */ - abstract call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult; -} - -/** - * Creates one function for all function names. - */ -export class ServiceOne implements Service { - serviceId: string; - fn: (fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => object; - - constructor(serviceId: string, fn: (fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => object) { - this.serviceId = serviceId; - this.fn = fn; - } - - /** - * Calls the function from local client - * @param fnName - name of the function to call - * @param args - arguments to be passed to the function - * @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number. - * If the argument is not an array the second array will always contain exactly one element. - * If the argument is an array the second index will correspond to the index of element in argument's array - */ - call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult { - try { - let result = this.fn(fnName, args, tetraplets); - return { - ret_code: 0, - result: JSON.stringify(result), - }; - } catch (err) { - return { - ret_code: 1, - result: JSON.stringify(err), - }; - } - } -} - -/** - * Creates function per function name. Returns an error when call a name without registered function. - */ -export class ServiceMultiple implements Service { - serviceId: string; - functions: Map object> = new Map(); - - constructor(serviceId: string) { - this.serviceId = serviceId; - } - - /** - * Registers a callback function into Aquamarine - * @param fnName - the function name to be registered - * @param fn - callback function which will be called from Aquamarine. - * The callback function has the following parameters: - * args - arguments to be passed to the function - * tetraplets - array of arrays of tetraplets. First index corresponds to argument number. - * If the argument is not an array the second array will always contain exactly one element. - * If the argument is an array the second index will correspond to the index of element in argument's array - */ - registerFunction(fnName: string, fn: (args: any[], tetraplets: SecurityTetraplet[][]) => object) { - this.functions.set(fnName, fn); - } - - /** - * Calls the function from local client - * @param fnName - name of the function to call - * @param args - arguments to be passed to the function - * @param tetraplets - array of arrays of tetraplets. First index corresponds to argument number. - * If the argument is not an array the second array will always contain exactly one element. - * If the argument is an array the second index will correspond to the index of element in argument's array - */ - call(fnName: string, args: any[], tetraplets: SecurityTetraplet[][]): CallServiceResult { - let fn = this.functions.get(fnName); - if (fn) { - try { - let result = fn(args, tetraplets); - return { - ret_code: 0, - result: JSON.stringify(result), - }; - } catch (err) { - return { - ret_code: 1, - result: JSON.stringify(err), - }; - } - } else { - let errorMsg = `Error. There is no function ${fnName}`; - return { - ret_code: 1, - result: JSON.stringify(errorMsg), - }; - } - } -} - -export function service(service_id: string, fn_name: string, args: string, tetraplets: string): CallServiceResult { - try { - let argsObject = JSON.parse(args); - if (!Array.isArray(argsObject)) { - throw new Error('args is not an array'); - } - - let tetrapletsObject: SecurityTetraplet[][] = JSON.parse(tetraplets); - - let service = getService(service_id); - if (service) { - return service.call(fn_name, argsObject, tetrapletsObject); - } else { - return { - result: JSON.stringify(`Error. There is no service: ${service_id}`), - ret_code: 0, - }; - } - } catch (err) { - console.error('Cannot parse arguments: ' + JSON.stringify(err)); - return { - result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)), - ret_code: 1, - }; - } -} diff --git a/src/subscriptions.ts b/src/subscriptions.ts deleted file mode 100644 index 8e036cdb..00000000 --- a/src/subscriptions.ts +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Particle } from './particle'; -import log from 'loglevel'; - -export class Subscriptions { - private subscriptions: Map = new Map(); - - constructor() {} - - /** - * Subscriptions will be applied by outside message if id will be the same. - * - * @param particle - * @param ttl time to live, subscription will be deleted after this time - */ - subscribe(particle: Particle, ttl: number) { - let _this = this; - setTimeout(() => { - _this.subscriptions.delete(particle.id); - log.info(`Particle with id ${particle.id} deleted by timeout`); - }, ttl); - this.subscriptions.set(particle.id, particle); - } - - update(particle: Particle): boolean { - if (this.subscriptions.has(particle.id)) { - this.subscriptions.set(particle.id, particle); - return true; - } else { - return false; - } - } - - get(id: string): Particle | undefined { - return this.subscriptions.get(id); - } - - hasSubscription(particle: Particle): boolean { - return this.subscriptions.has(particle.id); - } -} diff --git a/src/test/air.spec.ts b/src/test/air.spec.ts deleted file mode 100644 index 76be947f..00000000 --- a/src/test/air.spec.ts +++ /dev/null @@ -1,153 +0,0 @@ -import 'mocha'; -import Fluence from '../fluence'; -import { build } from '../particle'; -import { ServiceMultiple } from '../service'; -import { registerService } from '../globalState'; -import { expect } from 'chai'; -import { SecurityTetraplet } from '../securityTetraplet'; - -function registerPromiseService( - serviceId: string, - fnName: string, - f: (args: any[]) => T, -): Promise<[T, SecurityTetraplet[][]]> { - let service = new ServiceMultiple(serviceId); - registerService(service); - - return new Promise((resolve, reject) => { - service.registerFunction(fnName, (args: any[], tetraplets: SecurityTetraplet[][]) => { - resolve([f(args), tetraplets]); - - return { result: f(args) }; - }); - }); -} - -describe('== AIR suite', () => { - it('check init_peer_id', async function () { - let serviceId = 'init_peer'; - let fnName = 'id'; - let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]); - - let client = await Fluence.local(); - - let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`; - - let particle = await build(client.selfPeerId, script, new Map()); - - await client.executeParticle(particle); - - let args = (await checkPromise)[0]; - expect(args).to.be.equal(client.selfPeerIdStr); - }); - - it('call local function', async function () { - let serviceId = 'console'; - let fnName = 'log'; - let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]); - - let client = await Fluence.local(); - - let arg = 'hello'; - let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`; - - // Wrap script into particle, so it can be executed by local WASM runtime - let particle = await build(client.selfPeerId, script, new Map()); - - await client.executeParticle(particle); - - let [args, tetraplets] = await checkPromise; - expect(args).to.be.equal(arg); - }); - - it('check particle arguments', async function () { - let serviceId = 'check'; - let fnName = 'args'; - let checkPromise = registerPromiseService(serviceId, fnName, (args) => args[0]); - - let client = await Fluence.local(); - - let arg = 'arg1'; - let value = 'hello'; - let script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [${arg}])`; - - let data = new Map(); - data.set('arg1', value); - let particle = await build(client.selfPeerId, script, data); - - await client.executeParticle(particle); - - let [args, tetraplets] = await checkPromise; - expect(args).to.be.equal(value); - }); - - it('check security tetraplet', async function () { - let makeDataPromise = registerPromiseService('make_data_service', 'make_data', (args) => { - field: 42; - }); - let getDataPromise = registerPromiseService('get_data_service', 'get_data', (args) => args[0]); - - let client = await Fluence.local(); - - let script = ` - (seq - (call %init_peer_id% ("make_data_service" "make_data") [] result) - (call %init_peer_id% ("get_data_service" "get_data") [result.$.field]) - )`; - - let particle = await build(client.selfPeerId, script, new Map()); - - await client.executeParticle(particle); - - await makeDataPromise; - let [args, tetraplets] = await getDataPromise; - let tetraplet = tetraplets[0][0]; - - expect(tetraplet).to.contain({ - service_id: 'make_data_service', - function_name: 'make_data', - json_path: '$.field', - }); - }); - - it('check chain of services work properly', async function () { - this.timeout(5000); - let serviceId1 = 'check1'; - let fnName1 = 'fn1'; - let checkPromise1 = registerPromiseService(serviceId1, fnName1, (args) => args[0]); - - let serviceId2 = 'check2'; - let fnName2 = 'fn2'; - let checkPromise2 = registerPromiseService(serviceId2, fnName2, (args) => args[0]); - - let serviceId3 = 'check3'; - let fnName3 = 'fn3'; - let checkPromise3 = registerPromiseService(serviceId3, fnName3, (args) => args); - - let client = await Fluence.local(); - - let arg1 = 'arg1'; - let arg2 = 'arg2'; - - // language=Clojure - let script = `(seq - (seq - (call %init_peer_id% ("${serviceId1}" "${fnName1}") ["${arg1}"] result1) - (call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2)) - (call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2])) - `; - - let particle = await build(client.selfPeerId, script, new Map()); - - await client.executeParticle(particle); - - let args1 = (await checkPromise1)[0]; - expect(args1).to.be.equal(arg1); - - let args2 = (await checkPromise2)[0]; - expect(args2).to.be.equal(arg2); - - let args3 = (await checkPromise3)[0]; - expect(args3).to.be.deep.equal([{ result: arg1 }, { result: arg2 }]); - }); -}); diff --git a/src/test/client.spec.ts b/src/test/client.spec.ts deleted file mode 100644 index 11b4e983..00000000 --- a/src/test/client.spec.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { expect } from 'chai'; - -import 'mocha'; -import { encode } from 'bs58'; -import Fluence from '../fluence'; -import { certificateFromString, certificateToString, issue } from '../trust/certificate'; -import { TrustGraph } from '../trust/trust_graph'; -import { nodeRootCert } from '../trust/misc'; -import { peerIdToSeed, seedToPeerId } from '../seed'; -import { build } from '../particle'; -import { Service, ServiceOne } from '../service'; -import { registerService } from '../globalState'; -import { waitResult } from '../helpers/waitService'; - -describe('Typescript usage suite', () => { - it('should create private key from seed and back', async function () { - // prettier-ignore - let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201]; - let seedStr = encode(seed); - console.log('SEED STR: ' + seedStr); - let pid = await seedToPeerId(seedStr); - expect(peerIdToSeed(pid)).to.be.equal(seedStr); - }); - - it('should serialize and deserialize certificate correctly', async function () { - let cert = `11 -1111 -5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9 -3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr -158981172690500 -1589974723504 -2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7 -4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d -1590061123504 -1589974723504`; - - let deser = await certificateFromString(cert); - let ser = certificateToString(deser); - - expect(ser).to.be.equal(cert); - }); - - // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes - it.skip('test certs', async function () { - this.timeout(15000); - await testCerts(); - }); - - it.skip('', async function () { - let pid = await Fluence.generatePeerId(); - let cl = await Fluence.connect( - '/ip4/138.197.177.2/tcp/9001/ws/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9', - pid, - ); - - let service = new ServiceOne('test', (fnName: string, args: any[]) => { - console.log('called: ' + args); - return {}; - }); - registerService(service); - - let namedPromise = waitResult(30000); - - let script = ` - (seq ( - (call ( "${pid.toB58String()}" ("test" "test") (a b c d) result)) - (call ( "${pid.toB58String()}" ("${namedPromise.name}" "") (d c b a) void[])) - )) - `; - - let data: Map = new Map(); - data.set('a', 'some a'); - data.set('b', 'some b'); - data.set('c', 'some c'); - data.set('d', 'some d'); - - let particle = await build(pid, script, data, 30000); - - await cl.sendParticle(particle); - - let res = await namedPromise.promise; - expect(res).to.be.equal(['some d', 'some c', 'some b', 'some a']); - }); -}); - -const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)); - -export async function testCerts() { - let key1 = await Fluence.generatePeerId(); - let key2 = await Fluence.generatePeerId(); - - // connect to two different nodes - let cl1 = await Fluence.connect( - '/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb', - key1, - ); - let cl2 = await Fluence.connect( - '/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er', - key2, - ); - - let trustGraph1 = new TrustGraph(cl1); - let trustGraph2 = new TrustGraph(cl2); - - let issuedAt = new Date(); - let expiresAt = new Date(); - // certificate expires after one day - expiresAt.setDate(new Date().getDate() + 1); - - // create root certificate for key1 and extend it with key2 - let rootCert = await nodeRootCert(key1); - let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime()); - - // publish certificates to Fluence network - await trustGraph1.publishCertificates(key2.toB58String(), [extended]); - - // get certificates from network - let certs = await trustGraph2.getCertificates(key2.toB58String()); - - // root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date - expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String()); - expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature); - expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt); - expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt); - - await cl1.disconnect(); - await cl2.disconnect(); -} diff --git a/src/utils.ts b/src/utils.ts deleted file mode 100644 index 249e0ccc..00000000 --- a/src/utils.ts +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -export function delay(ms: number, error: string): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => reject(new Error(error)), ms); - }); -} diff --git a/tsconfig.json b/tsconfig.json index 6b161362..c9d20ddc 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,14 +13,14 @@ "allowSyntheticDefaultImports": true, "resolveJsonModule": true, "pretty": true, - "target": "esnext", + "target": "ES5", "module": "commonjs", "moduleResolution": "node", "declaration": true, "esModuleInterop": true, "declarationMap": true, "strict": true, - "noImplicitAny": true, + "noImplicitAny": false, "alwaysStrict": true, "noImplicitThis": true, "strictNullChecks": false @@ -29,7 +29,7 @@ "node_modules", "dist", "bundle", - "src/test" + "src/__test__" ], "include": ["src/**/*"] } diff --git a/webpack.config.js b/webpack.config.js index 78fc7dcd..a8fd9acb 100644 --- a/webpack.config.js +++ b/webpack.config.js @@ -5,9 +5,7 @@ const HtmlWebpackPlugin = require('html-webpack-plugin'); const production = (process.env.NODE_ENV === 'production'); const config = { - entry: { - app: ['./src/fluence.ts'] - }, + entry: './src/index.ts', module: { rules: [ {