Hash in replyTo (#918)

This commit is contained in:
Dima 2020-06-30 16:34:05 +03:00 committed by GitHub
parent b34c0628c5
commit 4e89a13dee
6 changed files with 71 additions and 56 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "fluence", "name": "fluence",
"version": "0.6.5", "version": "0.6.6",
"description": "the browser js-libp2p client for the Fluence network", "description": "the browser js-libp2p client for the Fluence network",
"main": "./dist/fluence.js", "main": "./dist/fluence.js",
"typings": "./dist/fluence.d.ts", "typings": "./dist/fluence.d.ts",

View File

@ -18,7 +18,8 @@ import * as PeerId from "peer-id";
import {encode} from "bs58" import {encode} from "bs58"
export interface Address { export interface Address {
protocols: Protocol[] protocols: Protocol[],
hash?: string
} }
export interface Protocol { export interface Protocol {
@ -45,6 +46,10 @@ export function addressToString(address: Address): string {
} }
} }
if (address.hash) {
addressStr = addressStr + "#" + address.hash
}
return addressStr; return addressStr;
} }
@ -78,7 +83,7 @@ export function parseProtocol(protocol: string, protocolIterator: IterableIterat
} }
export async function createRelayAddress(relay: string, peerId: PeerId, withSig: boolean): Promise<Address> { export async function createRelayAddress(relay: string, peerId: PeerId, withSig: boolean, hash?: string): Promise<Address> {
let protocols = [ let protocols = [
{protocol: ProtocolType.Peer, value: relay}, {protocol: ProtocolType.Peer, value: relay},
@ -94,24 +99,27 @@ export async function createRelayAddress(relay: string, peerId: PeerId, withSig:
} }
return { return {
protocols: protocols protocols: protocols,
hash: hash
} }
} }
export function createServiceAddress(service: string): Address { export function createServiceAddress(service: string, hash?: string): Address {
let protocol = {protocol: ProtocolType.Providers, value: service}; let protocol = {protocol: ProtocolType.Providers, value: service};
return { return {
protocols: [protocol] protocols: [protocol],
hash: hash
} }
} }
export function createPeerAddress(peer: string): Address { export function createPeerAddress(peer: string, hash?: string): Address {
let protocol = {protocol: ProtocolType.Peer, value: peer}; let protocol = {protocol: ProtocolType.Peer, value: peer};
return { return {
protocols: [protocol] protocols: [protocol],
hash: hash
} }
} }
@ -121,7 +129,9 @@ export function parseAddress(str: string): Address {
// delete leading slashes // delete leading slashes
str = str.replace(/^\/+/, ''); str = str.replace(/^\/+/, '');
let parts = str.split("/"); let mainAndHash = str.split("#");
let parts = mainAndHash[0].split("/");
if (parts.length < 1) { if (parts.length < 1) {
throw Error("address parts should not be empty") throw Error("address parts should not be empty")
} }
@ -136,7 +146,14 @@ export function parseAddress(str: string): Address {
protocols.push(protocol); protocols.push(protocol);
} }
let hashPart = mainAndHash.slice(1, mainAndHash.length).join();
let hash = undefined;
if (hashPart) {
hash = hashPart;
}
return { return {
protocols: protocols protocols: protocols,
hash: hash
} }
} }

View File

@ -64,17 +64,22 @@ export class FluenceClient {
}); });
} }
private getPredicate(msgId: string): (args: any, target: Address) => (boolean | undefined) {
return (args: any, target: Address) => target.hash && target.hash === msgId && !args.reason;
}
/** /**
* Send call and wait a response. * Send call and wait a response.
* *
* @param target receiver * @param target receiver
* @param args message in the call * @param args message in the call
* @param predicate will be applied to each incoming call until it matches
* @param moduleId module name * @param moduleId module name
* @param fname functin name * @param fname functin name
*/ */
async sendCallWaitResponse(target: Address, args: any, predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined), moduleId?: string, fname?: string): Promise<any> { async sendCallWaitResponse(target: Address, args: any, moduleId?: string, fname?: string): Promise<any> {
await this.sendCall(target, args, true, moduleId, fname); let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendCall(target, args, true, moduleId, fname, replyHash, undefined);
return this.waitResponse(predicate); return this.waitResponse(predicate);
} }
@ -87,10 +92,11 @@ export class FluenceClient {
* @param moduleId module name * @param moduleId module name
* @param fname function name * @param fname function name
* @param name common field for debug purposes * @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
*/ */
async sendCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, name?: string) { async sendCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) { if (this.connection && this.connection.isConnected()) {
await this.connection.sendFunctionCall(target, args, reply, moduleId, fname, name); await this.connection.sendFunctionCall(target, args, reply, moduleId, fname, replyHash, name);
} else { } else {
throw Error("client is not connected") throw Error("client is not connected")
} }
@ -103,10 +109,11 @@ export class FluenceClient {
* @param args message to the service * @param args message to the service
* @param fname function name * @param fname function name
* @param name common field for debug purposes * @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
*/ */
async sendServiceCall(moduleId: string, args: any, fname?: string, name?: string) { async sendServiceCall(moduleId: string, args: any, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) { if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(moduleId, false, args, fname, name); await this.connection.sendServiceCall(moduleId, false, args, fname, replyHash, name);
} else { } else {
throw Error("client is not connected") throw Error("client is not connected")
} }
@ -119,10 +126,11 @@ export class FluenceClient {
* @param args message to the service * @param args message to the service
* @param fname function name * @param fname function name
* @param name common field for debug purposes * @param name common field for debug purposes
* @param replyHash hash that will be added to replyTo address
*/ */
async sendServiceLocalCall(moduleId: string, args: any, fname?: string, name?: string) { async sendServiceLocalCall(moduleId: string, args: any, fname?: string, replyHash?: string, name?: string) {
if (this.connection && this.connection.isConnected()) { if (this.connection && this.connection.isConnected()) {
await this.connection.sendServiceCall(moduleId, true, args, fname, name); await this.connection.sendServiceCall(moduleId, true, args, fname, replyHash, name);
} else { } else {
throw Error("client is not connected") throw Error("client is not connected")
} }
@ -133,11 +141,12 @@ export class FluenceClient {
* *
* @param moduleId * @param moduleId
* @param args message to the service * @param args message to the service
* @param predicate will be applied to each incoming call until it matches
* @param fname function name * @param fname function name
*/ */
async sendServiceCallWaitResponse(moduleId: string, args: any, predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined), fname?: string): Promise<any> { async sendServiceCallWaitResponse(moduleId: string, args: any, fname?: string): Promise<any> {
await this.sendServiceCall(moduleId, args, fname, fname); let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendServiceCall(moduleId, args, fname, replyHash, fname);
return await this.waitResponse(predicate); return await this.waitResponse(predicate);
} }
@ -146,11 +155,12 @@ export class FluenceClient {
* *
* @param moduleId * @param moduleId
* @param args message to the service * @param args message to the service
* @param predicate will be applied to each incoming call until it matches
* @param fname function name * @param fname function name
*/ */
async sendServiceLocalCallWaitResponse(moduleId: string, args: any, predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined), fname?: string): Promise<any> { async sendServiceLocalCallWaitResponse(moduleId: string, args: any, fname?: string): Promise<any> {
await this.sendServiceLocalCall(moduleId, args, fname); let replyHash = genUUID();
let predicate = this.getPredicate(replyHash);
await this.sendServiceLocalCall(moduleId, args, fname, replyHash, undefined);
return await this.waitResponse(predicate); return await this.waitResponse(predicate);
} }

View File

@ -63,6 +63,16 @@ export class FluenceConnection {
this.sender = sender this.sender = sender
} }
makeReplyTo(replyHash?: string): Address {
if (replyHash) {
let replyToWithHash = {...this.sender}
replyToWithHash.hash = replyHash;
return replyToWithHash;
} else {
return this.sender;
}
}
async connect() { async connect() {
let peerInfo = this.selfPeerInfo; let peerInfo = this.selfPeerInfo;
this.node = await Peer.create({ this.node = await Peer.create({
@ -89,7 +99,7 @@ export class FluenceConnection {
/** /**
* Sends remote service_id call. * Sends remote service_id call.
*/ */
async sendServiceCall(moduleId: string, isLocal: boolean, args: any, fname?: string, name?: string) { async sendServiceCall(moduleId: string, isLocal: boolean, args: any, fname?: string, replyHash?: string, name?: string) {
let target; let target;
if (isLocal) { if (isLocal) {
target = createPeerAddress(this.nodePeerId.toB58String()); target = createPeerAddress(this.nodePeerId.toB58String());
@ -97,7 +107,7 @@ export class FluenceConnection {
target = createServiceAddress(moduleId); target = createServiceAddress(moduleId);
} }
let regMsg = makeCall(moduleId, target, args, this.sender, this.sender, fname, name); let regMsg = makeCall(moduleId, target, args, this.sender, this.makeReplyTo(replyHash), fname, name);
await this.sendCall(regMsg); await this.sendCall(regMsg);
} }
@ -187,11 +197,11 @@ export class FluenceConnection {
/** /**
* Send FunctionCall to the connected node. * Send FunctionCall to the connected node.
*/ */
async sendFunctionCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, name?: string) { async sendFunctionCall(target: Address, args: any, reply?: boolean, moduleId?: string, fname?: string, replyHash?: string, name?: string) {
this.checkConnectedOrThrow(); this.checkConnectedOrThrow();
let replyTo; let replyTo;
if (reply) replyTo = this.sender; if (reply) replyTo = this.makeReplyTo(replyHash);
let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, name); let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, name);

View File

@ -3,7 +3,7 @@ import {
createRelayAddress, createRelayAddress,
createServiceAddress, createServiceAddress,
addressToString, addressToString,
parseAddress parseAddress, Address
} from "../address"; } from "../address";
import {expect} from 'chai'; import {expect} from 'chai';
@ -191,17 +191,10 @@ export async function testCalculator() {
await cl1.sendCall(req.reply_to, message); await cl1.sendCall(req.reply_to, message);
}); });
let req = {one: 12, two: 23};
// msgId is to identify response
let msgId = "calculate-it-for-me";
let req = {one: 12, two: 23, msgId: msgId};
let predicate: (args: any) => boolean | undefined = (args: any) => args.msgId && args.msgId === msgId;
// send call to `sum-calculator` service with two numbers // send call to `sum-calculator` service with two numbers
let response = await cl2.sendServiceCallWaitResponse(serviceId, req, predicate); let response = await cl2.sendServiceCallWaitResponse(serviceId, req);
let result = response.result; let result = response.result;
console.log(`calculation result is: ${result}`); console.log(`calculation result is: ${result}`);
@ -213,7 +206,7 @@ export async function testCalculator() {
// send call to `sum-calculator` service with two numbers // send call to `sum-calculator` service with two numbers
await cl2.sendServiceCall(serviceId, req, "calculator request"); await cl2.sendServiceCall(serviceId, req, "calculator request");
let response2 = await cl2.sendServiceCallWaitResponse(serviceId, req, predicate); let response2 = await cl2.sendServiceCallWaitResponse(serviceId, req);
let result2 = await response2.result; let result2 = await response2.result;
console.log(`calculation result AFTER RECONNECT is: ${result2}`); console.log(`calculation result AFTER RECONNECT is: ${result2}`);

View File

@ -34,22 +34,9 @@ export class TrustGraph {
certsStr.push(await certificateToString(cert)); certsStr.push(await certificateToString(cert));
} }
let msgId = genUUID()
let response = await this.client.sendServiceLocalCallWaitResponse("add_certificates", { let response = await this.client.sendServiceLocalCallWaitResponse("add_certificates", {
certificates: certsStr, certificates: certsStr,
msg_id: msgId,
peer_id: peerId peer_id: peerId
}, (args) => {
// check if it is a successful response
let isSuccessResponse = args.msg_id && args.msg_id === msgId
if (isSuccessResponse) {
return true
} else {
// check if it is an error for this msgId
return args.call && args.call.arguments && args.call.arguments.msg_id === msgId
}
}); });
if (response.reason) { if (response.reason) {
@ -63,11 +50,9 @@ export class TrustGraph {
// Get certificates that stores in Kademlia neighbourhood by `peerId` key. // Get certificates that stores in Kademlia neighbourhood by `peerId` key.
async getCertificates(peerId: string): Promise<Certificate[]> { async getCertificates(peerId: string): Promise<Certificate[]> {
let msgId = genUUID();
let resp = await this.client.sendServiceLocalCallWaitResponse("certificates", { let resp = await this.client.sendServiceLocalCallWaitResponse("certificates", {
msg_id: msgId,
peer_id: peerId peer_id: peerId
}, (args) => args.msg_id && args.msg_id === msgId) });
let certificatesRaw = resp.certificates let certificatesRaw = resp.certificates