This commit is contained in:
Pavel Murygin 2022-05-11 11:38:31 +03:00
parent 1d9e11a86e
commit 3a88fd9177
7 changed files with 268 additions and 29 deletions

View File

@ -0,0 +1,7 @@
service Op("op"):
identity: string -> string
func ping(arg: string) -> string:
on HOST_PEER_ID:
Op.identity(arg)
<- arg

View File

@ -8,6 +8,7 @@
"compile-aqua": "npm run compile-aqua:src && npm run compile-aqua:test", "compile-aqua": "npm run compile-aqua:src && npm run compile-aqua:test",
"compile-aqua:src": "aqua -i ./aqua/src/ -o ./src/internal/_aqua", "compile-aqua:src": "aqua -i ./aqua/src/ -o ./src/internal/_aqua",
"compile-aqua:test": "aqua -i ./aqua/tests/ -o ./src/__test__/_aqua", "compile-aqua:test": "aqua -i ./aqua/tests/ -o ./src/__test__/_aqua",
"compile-aqua:playground": "aqua -i ./aqua/playground/ -o ./playground/_aqua",
"test": "jest --watch", "test": "jest --watch",
"test:all": "jest", "test:all": "jest",
"test:unit": "jest --testPathPattern=src/__test__/unit", "test:unit": "jest --testPathPattern=src/__test__/unit",

119
playground/_aqua/ping.ts Normal file
View File

@ -0,0 +1,119 @@
/**
*
* This file is auto-generated. Do not edit manually: changes may be erased.
* Generated by Aqua compiler: https://github.com/fluencelabs/aqua/.
* If you find any bugs, please write an issue on GitHub: https://github.com/fluencelabs/aqua/issues
* Aqua version: 0.7.0-287
*
*/
import { Fluence, FluencePeer } from '../../src';
import { CallParams, callFunction, registerService } from '../../src/internal/compilerSupport/v3';
// Services
export interface OpDef {
identity: (arg0: string, callParams: CallParams<'arg0'>) => string | Promise<string>;
}
export function registerOp(service: OpDef): void;
export function registerOp(serviceId: string, service: OpDef): void;
export function registerOp(peer: FluencePeer, service: OpDef): void;
export function registerOp(peer: FluencePeer, serviceId: string, service: OpDef): void;
export function registerOp(...args: any) {
registerService(args, {
defaultServiceId: 'op',
functions: {
tag: 'labeledProduct',
fields: {
identity: {
tag: 'arrow',
domain: {
tag: 'unlabeledProduct',
items: [
{
tag: 'scalar',
name: 'string',
},
],
},
codomain: {
tag: 'unlabeledProduct',
items: [
{
tag: 'scalar',
name: 'string',
},
],
},
},
},
},
});
}
// Functions
export function ping(arg: string, config?: { ttl?: number }): Promise<string>;
export function ping(peer: FluencePeer, arg: string, config?: { ttl?: number }): Promise<string>;
export function ping(...args: any) {
let script = `
(xor
(seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "arg") [] arg)
)
(xor
(call -relay- ("op" "identity") [arg])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
)
(xor
(call %init_peer_id% ("callbackSrv" "response") [arg])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
`;
return callFunction(
args,
{
functionName: 'ping',
arrow: {
tag: 'arrow',
domain: {
tag: 'labeledProduct',
fields: {
arg: {
tag: 'scalar',
name: 'string',
},
},
},
codomain: {
tag: 'unlabeledProduct',
items: [
{
tag: 'scalar',
name: 'string',
},
],
},
},
names: {
relay: '-relay-',
getDataSrv: 'getDataSrv',
callbackSrv: 'callbackSrv',
responseSrv: 'callbackSrv',
responseFnName: 'response',
errorHandlingSrv: 'errorHandlingSrv',
errorFnName: 'error',
},
},
script,
);
}

View File

@ -3,6 +3,22 @@
<head> <head>
<meta charset="utf-8" /> <meta charset="utf-8" />
<title>Fluence JS</title> <title>Fluence JS</title>
<style>
#main {
display: flex;
flex-direction: column;
margin: 100px 300px;
gap: 10px;
}
</style>
</head> </head>
<body></body> <body>
<div id="main">
<button id="connect">connect</button>
<button id="once">run once</button>
<button id="pings">run pings</button>
<button id="stop_pings">stop pings</button>
<button id="disconnect">disconnect</button>
</div>
</body>
</html> </html>

View File

@ -1,14 +1,45 @@
import { Fluence } from '../src/index'; import { Fluence } from '../src/index';
import { krasnodar } from '@fluencelabs/fluence-network-environment'; import { krasnodar } from '@fluencelabs/fluence-network-environment';
import { ping } from './_aqua/ping';
let interval;
// localStorage.debug = 'libp2p:*';
delete localStorage.debug;
async function main() { async function main() {
await Fluence.start({ handle('connect', async () => {
connectTo: krasnodar[4], await Fluence.start({
connectTo: krasnodar[4],
skipCheckConnection: true,
});
console.log('connected');
}); });
handle('once', runPing);
handle('pings', () => {
interval = setInterval(runPing, 1000);
});
handle('stop_pings', () => {
clearTimeout(interval);
});
handle('disconnect', async () => {
await Fluence.stop();
console.log('disconnected');
});
}
alert('started'); function runPing() {
const ts = new Date().toLocaleTimeString();
console.log(`pinging with ${ts}`);
ping(ts).then((res) => {
console.log(`${ts} pong at ${new Date().toLocaleTimeString()}`);
});
}
await Fluence.stop(); function handle(id: string, fn: () => void) {
document.getElementById(id).onclick = () => {
fn();
};
} }
main(); main();

View File

@ -16,7 +16,7 @@
import Websockets from 'libp2p-websockets'; import Websockets from 'libp2p-websockets';
import Mplex from 'libp2p-mplex'; import Mplex from 'libp2p-mplex';
import Lib2p2Peer from 'libp2p'; import Lib2p2Peer, { MuxedStream } from 'libp2p';
import { decode, encode } from 'it-length-prefixed'; import { decode, encode } from 'it-length-prefixed';
import pipe from 'it-pipe'; import pipe from 'it-pipe';
import * as log from 'loglevel'; import * as log from 'loglevel';
@ -27,6 +27,8 @@ import { Multiaddr } from 'multiaddr';
import { all as allow_all } from 'libp2p-websockets/src/filters'; import { all as allow_all } from 'libp2p-websockets/src/filters';
import { Connection } from 'libp2p-interfaces/src/topology'; import { Connection } from 'libp2p-interfaces/src/topology';
import Buffer from './Buffer'; import Buffer from './Buffer';
import ConnectionManager from 'libp2p/src/connection-manager';
import { single } from 'rxjs';
export const PROTOCOL_NAME = '/fluence/particle/2.0.0'; export const PROTOCOL_NAME = '/fluence/particle/2.0.0';
@ -79,27 +81,67 @@ export class FluenceConnection {
dialer: { dialer: {
dialTimeout: options?.dialTimeoutMs, dialTimeout: options?.dialTimeoutMs,
}, },
connectionManager: {},
}); });
console.log(res._lib2p2Peer.connectionManager.eventNames());
res._lib2p2Peer.handle([PROTOCOL_NAME], async ({ connection, stream }) => { res._lib2p2Peer.handle([PROTOCOL_NAME], async ({ connection, stream }) => {
pipe(stream.source, decode(), async (source: AsyncIterable<string>) => { pipe(
try { // force new line
for await (const msg of source) { stream.source,
try { decode(),
const particle = Particle.fromString(msg); async (source: AsyncIterable<string>) => {
options.onIncomingParticle(particle); try {
} catch (e) { for await (const msg of source) {
log.error('error on handling a new incoming message: ' + e); try {
const particle = Particle.fromString(msg);
options.onIncomingParticle(particle);
} catch (e) {
log.error('error on handling a new incoming message: ' + e);
}
} }
} catch (e) {
log.debug('connection closed: ' + e);
} }
} catch (e) { },
log.debug('connection closed: ' + e); );
}
});
}); });
res._relayAddress = options.relayAddress; res._relayAddress = options.relayAddress;
res._lib2p2Peer.handle('error', (err) => {
console.log('libp2p error: ', err);
});
res._lib2p2Peer.handle('peer:discovery', (peer) => {
console.log('libp2p peer:discovery: ', peer);
});
res._lib2p2Peer.connectionManager.on('peer:connect', (connection) => {
console.log('libp2p peer:connect: ', connection);
});
res._lib2p2Peer.connectionManager.on('peer:disconnect', (connection) => {
console.log('libp2p peer:disconnect: ', connection);
});
res._lib2p2Peer.peerStore.on('peer', (peerId) => {
console.log('libp2p peerStore peer: ', peerId);
});
res._lib2p2Peer.peerStore.on('change:multiaddrs', (x) => {
console.log('libp2p change:multiaddrs: ', x);
});
res._lib2p2Peer.peerStore.on('change:protocols', (x) => {
console.log('libp2p change:protocols: ', x);
});
res._lib2p2Peer.addressManager.on('change:addresses', () => {
console.log('libp2p change:addresses');
});
return res; return res;
} }
@ -108,26 +150,43 @@ export class FluenceConnection {
} }
public async sendParticle(particle: Particle): Promise<void> { public async sendParticle(particle: Particle): Promise<void> {
particle.logTo('debug', 'sending particle:'); // particle.logTo('debug', 'sending particle:');
// console.log(this._connection);
// TODO:: find out why this doesn't work and a new connection has to be established each time
//if (this._connection.streams.length !== 1) {
// throw 'Incorrect number of streams in FluenceConnection';
//}
// const sink = this._connection.streams[0].sink;
// const conn = await this._connection.newStream(PROTOCOL_NAME);
// const sink = conn.stream.sink;
/* /*
TODO:: find out why this doesn't work and a new connection has to be established each time
if (this._connection.streams.length !== 1) {
throw 'Incorrect number of streams in FluenceConnection';
}
const sink = this._connection.streams[0].sink;
*/
const conn = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME); const conn = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME);
const sink = conn.stream.sink; const sink = conn.stream.sink;
*/
pipe( // const buf = encode(Buffer.from(particle.toString(), 'utf8'));
// await this._outStream.sink(buf);
const { stream } = await this._connection.newStream(PROTOCOL_NAME);
const sink = stream.sink;
stream.pipe(
// force new line // force new line
[Buffer.from(particle.toString(), 'utf8')], [Buffer.from(particle.toString(), 'utf8')],
encode(), encode(),
sink, sink,
); );
stream.close();
this._connection.removeStream(stream.id);
console.log('streams count: ', this._connection.streams.length);
} }
public async connect() { public async connect() {
@ -137,6 +196,11 @@ export class FluenceConnection {
try { try {
this._connection = await this._lib2p2Peer.dial(this._relayAddress); this._connection = await this._lib2p2Peer.dial(this._relayAddress);
// const { stream } = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME);
// console.log(stream.id);
// this._outStream = stream;
//
// this._outStream.
} catch (e1) { } catch (e1) {
const e = e1 as any; const e = e1 as any;
if (e.name === 'AggregateError' && e._errors.length === 1) { if (e.name === 'AggregateError' && e._errors.length === 1) {
@ -148,6 +212,7 @@ export class FluenceConnection {
} }
} }
private _outStream: MuxedStream;
private _lib2p2Peer: Lib2p2Peer; private _lib2p2Peer: Lib2p2Peer;
private _connection: Connection; private _connection: Connection;
private _relayAddress: Multiaddr; private _relayAddress: Multiaddr;

View File

@ -21,7 +21,7 @@ import { FluenceConnection } from './FluenceConnection';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { KeyPair } from './KeyPair'; import { KeyPair } from './KeyPair';
import { throwIfNotSupported, dataToString, jsonify } from './utils'; import { throwIfNotSupported, dataToString, jsonify } from './utils';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs'; import { bufferTime, concatMap, filter, interval, map, mapTo, pipe, Subject, tap, throttle } from 'rxjs';
import log from 'loglevel'; import log from 'loglevel';
import { builtInServices } from './builtins/common'; import { builtInServices } from './builtins/common';
import { defaultSigGuard, Sig } from './builtins/Sig'; import { defaultSigGuard, Sig } from './builtins/Sig';