diff --git a/aqua/playground/ping.aqua b/aqua/playground/ping.aqua new file mode 100644 index 00000000..630c1e6d --- /dev/null +++ b/aqua/playground/ping.aqua @@ -0,0 +1,7 @@ +service Op("op"): + identity: string -> string + +func ping(arg: string) -> string: + on HOST_PEER_ID: + Op.identity(arg) + <- arg diff --git a/package.json b/package.json index 9c5abfee..8a4431e2 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "compile-aqua": "npm run compile-aqua:src && npm run compile-aqua:test", "compile-aqua:src": "aqua -i ./aqua/src/ -o ./src/internal/_aqua", "compile-aqua:test": "aqua -i ./aqua/tests/ -o ./src/__test__/_aqua", + "compile-aqua:playground": "aqua -i ./aqua/playground/ -o ./playground/_aqua", "test": "jest --watch", "test:all": "jest", "test:unit": "jest --testPathPattern=src/__test__/unit", diff --git a/playground/_aqua/ping.ts b/playground/_aqua/ping.ts new file mode 100644 index 00000000..f9aa9d29 --- /dev/null +++ b/playground/_aqua/ping.ts @@ -0,0 +1,119 @@ +/** + * + * This file is auto-generated. Do not edit manually: changes may be erased. + * Generated by Aqua compiler: https://github.com/fluencelabs/aqua/. + * If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues + * Aqua version: 0.7.0-287 + * + */ +import { Fluence, FluencePeer } from '../../src'; +import { CallParams, callFunction, registerService } from '../../src/internal/compilerSupport/v3'; + +// Services + +export interface OpDef { + identity: (arg0: string, callParams: CallParams<'arg0'>) => string | Promise; +} +export function registerOp(service: OpDef): void; +export function registerOp(serviceId: string, service: OpDef): void; +export function registerOp(peer: FluencePeer, service: OpDef): void; +export function registerOp(peer: FluencePeer, serviceId: string, service: OpDef): void; + +export function registerOp(...args: any) { + registerService(args, { + defaultServiceId: 'op', + functions: { + tag: 'labeledProduct', + fields: { + identity: { + tag: 'arrow', + domain: { + tag: 'unlabeledProduct', + items: [ + { + tag: 'scalar', + name: 'string', + }, + ], + }, + codomain: { + tag: 'unlabeledProduct', + items: [ + { + tag: 'scalar', + name: 'string', + }, + ], + }, + }, + }, + }, + }); +} + +// Functions + +export function ping(arg: string, config?: { ttl?: number }): Promise; + +export function ping(peer: FluencePeer, arg: string, config?: { ttl?: number }): Promise; + +export function ping(...args: any) { + let script = ` + (xor + (seq + (seq + (seq + (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) + (call %init_peer_id% ("getDataSrv" "arg") [] arg) + ) + (xor + (call -relay- ("op" "identity") [arg]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) + ) + ) + (xor + (call %init_peer_id% ("callbackSrv" "response") [arg]) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) + ) + ) + (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) + ) + `; + return callFunction( + args, + { + functionName: 'ping', + arrow: { + tag: 'arrow', + domain: { + tag: 'labeledProduct', + fields: { + arg: { + tag: 'scalar', + name: 'string', + }, + }, + }, + codomain: { + tag: 'unlabeledProduct', + items: [ + { + tag: 'scalar', + name: 'string', + }, + ], + }, + }, + names: { + relay: '-relay-', + getDataSrv: 'getDataSrv', + callbackSrv: 'callbackSrv', + responseSrv: 'callbackSrv', + responseFnName: 'response', + errorHandlingSrv: 'errorHandlingSrv', + errorFnName: 'error', + }, + }, + script, + ); +} diff --git a/playground/index.html b/playground/index.html index 87a197c2..760806f1 100644 --- a/playground/index.html +++ b/playground/index.html @@ -3,6 +3,22 @@ Fluence JS + - + +
+ + + + + +
+ diff --git a/playground/index.ts b/playground/index.ts index d513e851..20aa56f9 100644 --- a/playground/index.ts +++ b/playground/index.ts @@ -1,14 +1,45 @@ import { Fluence } from '../src/index'; import { krasnodar } from '@fluencelabs/fluence-network-environment'; +import { ping } from './_aqua/ping'; + +let interval; + +// localStorage.debug = 'libp2p:*'; +delete localStorage.debug; async function main() { - await Fluence.start({ - connectTo: krasnodar[4], + handle('connect', async () => { + await Fluence.start({ + connectTo: krasnodar[4], + skipCheckConnection: true, + }); + console.log('connected'); }); + handle('once', runPing); + handle('pings', () => { + interval = setInterval(runPing, 1000); + }); + handle('stop_pings', () => { + clearTimeout(interval); + }); + handle('disconnect', async () => { + await Fluence.stop(); + console.log('disconnected'); + }); +} - alert('started'); +function runPing() { + const ts = new Date().toLocaleTimeString(); + console.log(`pinging with ${ts}`); + ping(ts).then((res) => { + console.log(`${ts} pong at ${new Date().toLocaleTimeString()}`); + }); +} - await Fluence.stop(); +function handle(id: string, fn: () => void) { + document.getElementById(id).onclick = () => { + fn(); + }; } main(); diff --git a/src/internal/FluenceConnection.ts b/src/internal/FluenceConnection.ts index f5f82493..32134aa5 100644 --- a/src/internal/FluenceConnection.ts +++ b/src/internal/FluenceConnection.ts @@ -16,7 +16,7 @@ import Websockets from 'libp2p-websockets'; import Mplex from 'libp2p-mplex'; -import Lib2p2Peer from 'libp2p'; +import Lib2p2Peer, { MuxedStream } from 'libp2p'; import { decode, encode } from 'it-length-prefixed'; import pipe from 'it-pipe'; import * as log from 'loglevel'; @@ -27,6 +27,8 @@ import { Multiaddr } from 'multiaddr'; import { all as allow_all } from 'libp2p-websockets/src/filters'; import { Connection } from 'libp2p-interfaces/src/topology'; import Buffer from './Buffer'; +import ConnectionManager from 'libp2p/src/connection-manager'; +import { single } from 'rxjs'; export const PROTOCOL_NAME = '/fluence/particle/2.0.0'; @@ -79,27 +81,67 @@ export class FluenceConnection { dialer: { dialTimeout: options?.dialTimeoutMs, }, + connectionManager: {}, }); + console.log(res._lib2p2Peer.connectionManager.eventNames()); + res._lib2p2Peer.handle([PROTOCOL_NAME], async ({ connection, stream }) => { - pipe(stream.source, decode(), async (source: AsyncIterable) => { - try { - for await (const msg of source) { - try { - const particle = Particle.fromString(msg); - options.onIncomingParticle(particle); - } catch (e) { - log.error('error on handling a new incoming message: ' + e); + pipe( + // force new line + stream.source, + decode(), + async (source: AsyncIterable) => { + try { + for await (const msg of source) { + try { + const particle = Particle.fromString(msg); + options.onIncomingParticle(particle); + } catch (e) { + log.error('error on handling a new incoming message: ' + e); + } } + } catch (e) { + log.debug('connection closed: ' + e); } - } catch (e) { - log.debug('connection closed: ' + e); - } - }); + }, + ); }); res._relayAddress = options.relayAddress; + res._lib2p2Peer.handle('error', (err) => { + console.log('libp2p error: ', err); + }); + + res._lib2p2Peer.handle('peer:discovery', (peer) => { + console.log('libp2p peer:discovery: ', peer); + }); + + res._lib2p2Peer.connectionManager.on('peer:connect', (connection) => { + console.log('libp2p peer:connect: ', connection); + }); + + res._lib2p2Peer.connectionManager.on('peer:disconnect', (connection) => { + console.log('libp2p peer:disconnect: ', connection); + }); + + res._lib2p2Peer.peerStore.on('peer', (peerId) => { + console.log('libp2p peerStore peer: ', peerId); + }); + + res._lib2p2Peer.peerStore.on('change:multiaddrs', (x) => { + console.log('libp2p change:multiaddrs: ', x); + }); + + res._lib2p2Peer.peerStore.on('change:protocols', (x) => { + console.log('libp2p change:protocols: ', x); + }); + + res._lib2p2Peer.addressManager.on('change:addresses', () => { + console.log('libp2p change:addresses'); + }); + return res; } @@ -108,26 +150,43 @@ export class FluenceConnection { } public async sendParticle(particle: Particle): Promise { - particle.logTo('debug', 'sending particle:'); + // particle.logTo('debug', 'sending particle:'); + + // console.log(this._connection); + + // TODO:: find out why this doesn't work and a new connection has to be established each time + //if (this._connection.streams.length !== 1) { + // throw 'Incorrect number of streams in FluenceConnection'; + //} + + // const sink = this._connection.streams[0].sink; + + // const conn = await this._connection.newStream(PROTOCOL_NAME); + + // const sink = conn.stream.sink; /* - TODO:: find out why this doesn't work and a new connection has to be established each time - if (this._connection.streams.length !== 1) { - throw 'Incorrect number of streams in FluenceConnection'; - } - - const sink = this._connection.streams[0].sink; - */ - const conn = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME); const sink = conn.stream.sink; + */ - pipe( + // const buf = encode(Buffer.from(particle.toString(), 'utf8')); + + // await this._outStream.sink(buf); + + const { stream } = await this._connection.newStream(PROTOCOL_NAME); + const sink = stream.sink; + + stream.pipe( // force new line [Buffer.from(particle.toString(), 'utf8')], encode(), sink, ); + + stream.close(); + this._connection.removeStream(stream.id); + console.log('streams count: ', this._connection.streams.length); } public async connect() { @@ -137,6 +196,11 @@ export class FluenceConnection { try { this._connection = await this._lib2p2Peer.dial(this._relayAddress); + // const { stream } = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME); + // console.log(stream.id); + // this._outStream = stream; + // + // this._outStream. } catch (e1) { const e = e1 as any; if (e.name === 'AggregateError' && e._errors.length === 1) { @@ -148,6 +212,7 @@ export class FluenceConnection { } } + private _outStream: MuxedStream; private _lib2p2Peer: Lib2p2Peer; private _connection: Connection; private _relayAddress: Multiaddr; diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 82eb47c4..a605201b 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -21,7 +21,7 @@ import { FluenceConnection } from './FluenceConnection'; import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { KeyPair } from './KeyPair'; import { throwIfNotSupported, dataToString, jsonify } from './utils'; -import { concatMap, filter, pipe, Subject, tap } from 'rxjs'; +import { bufferTime, concatMap, filter, interval, map, mapTo, pipe, Subject, tap, throttle } from 'rxjs'; import log from 'loglevel'; import { builtInServices } from './builtins/common'; import { defaultSigGuard, Sig } from './builtins/Sig';