Remove schema from js-client

This commit is contained in:
Akim Mamedov 2023-11-13 17:45:24 +07:00
parent 2f316cc8fb
commit 426f154377
26 changed files with 348 additions and 685 deletions

View File

@ -14,60 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
import type { SecurityTetraplet } from "@fluencelabs/avm";
import { InterfaceToType, MaybePromise } from "./utils.js";
/** /**
* Peer ID's id as a base58 string (multihash/CIDv0). * Peer ID's id as a base58 string (multihash/CIDv0).
*/ */
export type PeerIdB58 = string; export type PeerIdB58 = string;
/**
* Additional information about a service call
* @typeparam ArgName
*/
export type CallParams<ArgName extends string | null> = {
/**
* The identifier of particle which triggered the call
*/
particleId: string;
/**
* The peer id which created the particle
*/
initPeerId: PeerIdB58;
/**
* Particle's timestamp when it was created
*/
timestamp: number;
/**
* Time to live in milliseconds. The time after the particle should be expired
*/
ttl: number;
/**
* Particle's signature
*/
signature?: string;
/**
* Security tetraplets
*/
tetraplets: ArgName extends string
? Record<ArgName, InterfaceToType<SecurityTetraplet>[]>
: Record<string, never>;
};
export type ServiceImpl = Record<
string,
(
...args: [...JSONArray, CallParams<string>]
) => MaybePromise<JSONValue | undefined>
>;
export type JSONValue = export type JSONValue =
| string | string
| number | number

View File

@ -17,8 +17,8 @@
import { JSONValue } from "@fluencelabs/interfaces"; import { JSONValue } from "@fluencelabs/interfaces";
import { it, describe, expect } from "vitest"; import { it, describe, expect } from "vitest";
import { SendError } from "../../jsPeer/errors.js";
import { CallServiceData } from "../../jsServiceHost/interfaces.js"; import { CallServiceData } from "../../jsServiceHost/interfaces.js";
import { doNothing } from "../../jsServiceHost/serviceUtils.js";
import { handleTimeout } from "../../particle/Particle.js"; import { handleTimeout } from "../../particle/Particle.js";
import { registerHandlersHelper, withClient } from "../../util/testUtils.js"; import { registerHandlersHelper, withClient } from "../../util/testUtils.js";
import { checkConnection } from "../checkConnection.js"; import { checkConnection } from "../checkConnection.js";
@ -71,7 +71,11 @@ describe("FluenceClient usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(result).toBe("hello world!"); expect(result).toBe("hello world!");
@ -124,7 +128,11 @@ describe("FluenceClient usage test suite", () => {
throw particle; throw particle;
} }
peer1.internals.initiateParticle(particle, doNothing); peer1.internals.initiateParticle(
particle,
() => {},
() => {},
);
expect(await res).toEqual("test"); expect(await res).toEqual("test");
}); });
@ -172,13 +180,13 @@ describe("FluenceClient usage test suite", () => {
); );
}); });
it("With connection options: defaultTTL", async () => { it.only("With connection options: defaultTTL", async () => {
await withClient(RELAY, { defaultTtlMs: 1 }, async (peer) => { await withClient(RELAY, { defaultTtlMs: 1 }, async (peer) => {
const isConnected = await checkConnection(peer); const isConnected = await checkConnection(peer);
expect(isConnected).toBeFalsy(); expect(isConnected).toBeFalsy();
}); });
}); }, 1000);
}); });
it.skip("Should throw correct error when the client tries to send a particle not to the relay", async () => { it.skip("Should throw correct error when the client tries to send a particle not to the relay", async () => {
@ -206,11 +214,15 @@ describe("FluenceClient usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, (stage) => { peer.internals.initiateParticle(
if (stage.stage === "sendingError") { particle,
reject(stage.errorMessage); () => {},
(error: Error) => {
if (error instanceof SendError) {
reject(error.message);
} }
}); },
);
}); });
await promise; await promise;

View File

@ -110,6 +110,7 @@ export const checkConnection = async (
peer.internals.initiateParticle( peer.internals.initiateParticle(
particle, particle,
() => {},
handleTimeout(() => { handleTimeout(() => {
reject("particle timed out"); reject("particle timed out");
}), }),

View File

@ -16,13 +16,7 @@
import assert from "assert"; import assert from "assert";
import { import { FnConfig, JSONValue } from "@fluencelabs/interfaces";
FnConfig,
FunctionCallDef,
getArgumentTypes,
isReturnTypeVoid,
PassedArgs,
} from "@fluencelabs/interfaces";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
import { logger } from "../util/logger.js"; import { logger } from "../util/logger.js";
@ -36,6 +30,7 @@ import {
ServiceDescription, ServiceDescription,
userHandlerService, userHandlerService,
} from "./services.js"; } from "./services.js";
import { ServiceImpl } from "./types.js";
const log = logger("aqua"); const log = logger("aqua");
@ -52,43 +47,35 @@ const log = logger("aqua");
*/ */
type CallAquaFunctionArgs = { type CallAquaFunctionArgs = {
def: FunctionCallDef;
script: string; script: string;
config: FnConfig; config: FnConfig;
peer: FluencePeer; peer: FluencePeer;
args: PassedArgs; args: { [key: string]: JSONValue | ServiceImpl[string] };
}; };
export const callAquaFunction = async ({ export const callAquaFunction = async ({
def,
script, script,
config, config,
peer, peer,
args, args,
}: CallAquaFunctionArgs) => { }: CallAquaFunctionArgs) => {
// TODO: this function should be rewritten. We can remove asserts if we wont check definition there // TODO: this function should be rewritten. We can remove asserts if we wont check definition there
log.trace("calling aqua function %j", { def, script, config, args }); log.trace("calling aqua function %j", { script, config, args });
const argumentTypes = getArgumentTypes(def);
const particle = await peer.internals.createNewParticle(script, config.ttl); const particle = await peer.internals.createNewParticle(script, config.ttl);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
for (const [name, argVal] of Object.entries(args)) { for (const [name, argVal] of Object.entries(args)) {
const type = argumentTypes[name];
let service: ServiceDescription; let service: ServiceDescription;
if (type.tag === "arrow") { if (typeof argVal === "function") {
// TODO: Add validation here // TODO: Add validation here
assert( assert(
typeof argVal === "function", typeof argVal === "function",
"Should not be possible, bad types", "Should not be possible, bad types",
); );
service = userHandlerService( service = userHandlerService("callbackSrv", name, argVal);
def.names.callbackSrv,
[name, type],
argVal,
);
} else { } else {
// TODO: Add validation here // TODO: Add validation here
assert( assert(
@ -96,50 +83,31 @@ export const callAquaFunction = async ({
"Should not be possible, bad types", "Should not be possible, bad types",
); );
service = injectValueService(def.names.getDataSrv, name, type, argVal); console.log("inject service", name, argVal);
service = injectValueService("getDataSrv", name, argVal);
} }
registerParticleScopeService(peer, particle, service); registerParticleScopeService(peer, particle, service);
} }
registerParticleScopeService(peer, particle, responseService(def, resolve)); registerParticleScopeService(peer, particle, responseService(resolve));
registerParticleScopeService(peer, particle, injectRelayService(def, peer)); registerParticleScopeService(peer, particle, injectRelayService(peer));
registerParticleScopeService( registerParticleScopeService(peer, particle, errorHandlingService(reject));
peer,
particle,
errorHandlingService(def, reject),
);
peer.internals.initiateParticle(particle, (stage) => {
// If function is void, then it's completed when one of the two conditions is met: // If function is void, then it's completed when one of the two conditions is met:
// 1. The particle is sent to the network (state 'sent') // 1. The particle is sent to the network (state 'sent')
// 2. All CallRequests are executed, e.g., all variable loading and local function calls are completed (state 'localWorkDone') // 2. All CallRequests are executed, e.g., all variable loading and local function calls are completed (state 'localWorkDone')
if (
isReturnTypeVoid(def) &&
(stage.stage === "sent" || stage.stage === "localWorkDone")
) {
resolve(undefined);
}
if (stage.stage === "sendingError") { // TODO: make test
reject( // if (
`Could not send particle for ${def.functionName}: not connected (particle id: ${particle.id})`, // isReturnTypeVoid(def) &&
); // (stage.stage === "sent" || stage.stage === "localWorkDone")
} // ) {
// resolve(undefined);
// }
// },
if (stage.stage === "expired") { peer.internals.initiateParticle(particle, resolve, reject);
reject(
`Particle expired after ttl of ${particle.ttl}ms for function ${def.functionName} (particle id: ${particle.id})`,
);
}
if (stage.stage === "interpreterError") {
reject(
`Script interpretation failed for ${def.functionName}: ${stage.errorMessage} (particle id: ${particle.id})`,
);
}
});
}); });
}; };

View File

@ -14,66 +14,64 @@
* limitations under the License. * limitations under the License.
*/ */
import type { ServiceDef, ServiceImpl } from "@fluencelabs/interfaces";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
import { logger } from "../util/logger.js"; import { logger } from "../util/logger.js";
import { registerGlobalService, userHandlerService } from "./services.js"; import { registerGlobalService, userHandlerService } from "./services.js";
import { ServiceImpl } from "./types.js";
const log = logger("aqua"); const log = logger("aqua");
interface RegisterServiceArgs { interface RegisterServiceArgs {
peer: FluencePeer; peer: FluencePeer;
def: ServiceDef;
serviceId: string | undefined; serviceId: string | undefined;
service: ServiceImpl; service: ServiceImpl;
} }
const findAllPossibleServiceMethods = (service: ServiceImpl): Set<string> => {
let prototype: Record<string, unknown> = service;
const serviceMethods = new Set<string>();
do {
Object.getOwnPropertyNames(prototype)
.filter((prop) => {
return typeof prototype[prop] === "function" && prop !== "constructor";
})
.forEach((prop) => {
return serviceMethods.add(prop);
});
// Satisfying typescript here
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
prototype = Object.getPrototypeOf(prototype) as Record<string, unknown>;
} while (prototype.constructor !== Object);
return serviceMethods;
};
export const registerService = ({ export const registerService = ({
peer, peer,
def, serviceId,
serviceId = def.defaultServiceId,
service, service,
}: RegisterServiceArgs) => { }: RegisterServiceArgs) => {
// TODO: Need to refactor this. We can compute function types from service implementation, making func more type safe // TODO: Need to refactor this. We can compute function types from service implementation, making func more type safe
log.trace("registering aqua service %o", { def, serviceId, service }); log.trace("registering aqua service %o", { serviceId, service });
// Checking for missing keys
const requiredKeys =
def.functions.tag === "nil" ? [] : Object.keys(def.functions.fields);
const incorrectServiceDefinitions = requiredKeys.filter((f) => {
return !(f in service);
});
if (serviceId == null) { if (serviceId == null) {
throw new Error("Service ID must be specified"); throw new Error("Service ID must be specified");
} }
if (incorrectServiceDefinitions.length > 0) { const serviceMethods = findAllPossibleServiceMethods(service);
throw new Error(
`Error registering service ${serviceId}: missing functions: ` +
incorrectServiceDefinitions
.map((d) => {
return "'" + d + "'";
})
.join(", "),
);
}
const singleFunctions = for (const method of serviceMethods) {
def.functions.tag === "nil" ? [] : Object.entries(def.functions.fields); // The function has type of (arg1, arg2, arg3, ... , ParticleContext) => CallServiceResultType | void
for (const singleFunction of singleFunctions) {
const [name] = singleFunction;
// The function has type of (arg1, arg2, arg3, ... , callParams) => CallServiceResultType | void
// Account for the fact that user service might be defined as a class - .bind(...) // Account for the fact that user service might be defined as a class - .bind(...)
const userDefinedHandler = service[name].bind(service); const handler = service[method];
const userDefinedHandler = handler.bind(service);
const serviceDescription = userHandlerService( const serviceDescription = userHandlerService(
serviceId, serviceId,
singleFunction, method,
userDefinedHandler, userDefinedHandler,
); );

View File

@ -14,32 +14,18 @@
* limitations under the License. * limitations under the License.
*/ */
import { SecurityTetraplet } from "@fluencelabs/avm"; import { JSONValue } from "@fluencelabs/interfaces";
import {
CallParams,
ArrowWithoutCallbacks,
FunctionCallDef,
NonArrowType,
ServiceImpl,
JSONValue,
} from "@fluencelabs/interfaces";
import { fromUint8Array } from "js-base64";
import { match } from "ts-pattern";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
import { import {
CallServiceData, CallServiceData,
GenericCallServiceHandler, GenericCallServiceHandler,
ParticleContext,
ResultCodes, ResultCodes,
} from "../jsServiceHost/interfaces.js"; } from "../jsServiceHost/interfaces.js";
import { Particle } from "../particle/Particle.js"; import { Particle } from "../particle/Particle.js";
import { import { ServiceImpl } from "./types.js";
aquaArgs2Ts,
responseServiceValue2ts,
returnType2Aqua,
ts2aqua,
} from "./conversions.js";
export interface ServiceDescription { export interface ServiceDescription {
serviceId: string; serviceId: string;
@ -50,10 +36,10 @@ export interface ServiceDescription {
/** /**
* Creates a service which injects relay's peer id into aqua space * Creates a service which injects relay's peer id into aqua space
*/ */
export const injectRelayService = (def: FunctionCallDef, peer: FluencePeer) => { export const injectRelayService = (peer: FluencePeer) => {
return { return {
serviceId: def.names.getDataSrv, serviceId: "getDataSrv",
fnName: def.names.relay, fnName: "-relay-",
handler: () => { handler: () => {
return { return {
retCode: ResultCodes.success, retCode: ResultCodes.success,
@ -69,7 +55,6 @@ export const injectRelayService = (def: FunctionCallDef, peer: FluencePeer) => {
export const injectValueService = ( export const injectValueService = (
serviceId: string, serviceId: string,
fnName: string, fnName: string,
valueType: NonArrowType,
value: JSONValue, value: JSONValue,
) => { ) => {
return { return {
@ -78,7 +63,7 @@ export const injectValueService = (
handler: () => { handler: () => {
return { return {
retCode: ResultCodes.success, retCode: ResultCodes.success,
result: ts2aqua(value, valueType), result: value,
}; };
}, },
}; };
@ -87,15 +72,17 @@ export const injectValueService = (
/** /**
* Creates a service which is used to return value from aqua function into typescript space * Creates a service which is used to return value from aqua function into typescript space
*/ */
export const responseService = ( export const responseService = (resolveCallback: (val: JSONValue) => void) => {
def: FunctionCallDef,
resolveCallback: (val: JSONValue) => void,
) => {
return { return {
serviceId: def.names.responseSrv, serviceId: "callbackSrv",
fnName: def.names.responseFnName, fnName: "response",
handler: (req: CallServiceData) => { handler: (req: CallServiceData) => {
const userFunctionReturn = responseServiceValue2ts(req, def.arrow); const userFunctionReturn =
req.args.length === 0
? null
: req.args.length === 1
? req.args[0]
: req.args;
setTimeout(() => { setTimeout(() => {
resolveCallback(userFunctionReturn); resolveCallback(userFunctionReturn);
@ -113,12 +100,11 @@ export const responseService = (
* Creates a service which is used to return errors from aqua function into typescript space * Creates a service which is used to return errors from aqua function into typescript space
*/ */
export const errorHandlingService = ( export const errorHandlingService = (
def: FunctionCallDef,
rejectCallback: (err: JSONValue) => void, rejectCallback: (err: JSONValue) => void,
) => { ) => {
return { return {
serviceId: def.names.errorHandlingSrv, serviceId: "errorHandlingSrv",
fnName: def.names.errorFnName, fnName: "error",
handler: (req: CallServiceData) => { handler: (req: CallServiceData) => {
const [err] = req.args; const [err] = req.args;
@ -139,21 +125,21 @@ export const errorHandlingService = (
*/ */
export const userHandlerService = ( export const userHandlerService = (
serviceId: string, serviceId: string,
arrowType: [string, ArrowWithoutCallbacks], fnName: string,
userHandler: ServiceImpl[string], userHandler: ServiceImpl[string],
) => { ) => {
const [fnName, type] = arrowType;
return { return {
serviceId, serviceId,
fnName, fnName,
handler: async (req: CallServiceData) => { handler: async (req: CallServiceData) => {
const args: [...JSONValue[], CallParams<string>] = [ const args: [...JSONValue[], ParticleContext] = [
...aquaArgs2Ts(req, type), ...req.args,
extractCallParams(req, type), req.particleContext,
]; ];
const rawResult = await userHandler.bind(null)(...args); const result = await userHandler.bind(null)(...args);
const result = returnType2Aqua(rawResult, type);
console.log(result, "userHandlerService result", serviceId, fnName);
return { return {
retCode: ResultCodes.success, retCode: ResultCodes.success,
@ -163,46 +149,6 @@ export const userHandlerService = (
}; };
}; };
/**
* Extracts call params from from call service data according to aqua type definition
*/
const extractCallParams = (
req: CallServiceData,
arrow: ArrowWithoutCallbacks,
): CallParams<string> => {
const names: (string | undefined)[] = match(arrow.domain)
.with({ tag: "nil" }, () => {
return [];
})
.with({ tag: "unlabeledProduct" }, (x) => {
return x.items.map((_, index) => {
return "arg" + index;
});
})
.with({ tag: "labeledProduct" }, (x) => {
return Object.keys(x.fields);
})
.exhaustive();
const tetraplets: Record<string, SecurityTetraplet[]> = {};
for (let i = 0; i < req.args.length; i++) {
const name = names[i];
if (name != null) {
tetraplets[name] = req.tetraplets[i];
}
}
const callParams = {
...req.particleContext,
signature: fromUint8Array(req.particleContext.signature),
tetraplets,
};
return callParams;
};
export const registerParticleScopeService = ( export const registerParticleScopeService = (
peer: FluencePeer, peer: FluencePeer,
particle: Particle, particle: Particle,

View File

@ -22,7 +22,5 @@ export type MaybePromise<T> = T | Promise<T>;
export type ServiceImpl = Record< export type ServiceImpl = Record<
string, string,
( (...args: [...JSONArray, ParticleContext]) => MaybePromise<JSONValue>
...args: [...JSONArray, ParticleContext]
) => MaybePromise<JSONValue | undefined>
>; >;

View File

@ -22,6 +22,7 @@ import {
KeyPairFormat, KeyPairFormat,
serializeAvmArgs, serializeAvmArgs,
} from "@fluencelabs/avm"; } from "@fluencelabs/avm";
import { JSONValue } from "@fluencelabs/interfaces";
import { fromUint8Array } from "js-base64"; import { fromUint8Array } from "js-base64";
import { import {
concatMap, concatMap,
@ -55,7 +56,6 @@ import {
getActualTTL, getActualTTL,
hasExpired, hasExpired,
Particle, Particle,
ParticleExecutionStage,
ParticleQueueItem, ParticleQueueItem,
} from "../particle/Particle.js"; } from "../particle/Particle.js";
import { registerSig } from "../services/_aqua/services.js"; import { registerSig } from "../services/_aqua/services.js";
@ -67,6 +67,8 @@ import { Tracing } from "../services/Tracing.js";
import { logger } from "../util/logger.js"; import { logger } from "../util/logger.js";
import { jsonify, isString, getErrorMessage } from "../util/utils.js"; import { jsonify, isString, getErrorMessage } from "../util/utils.js";
import { ExpirationError, InterpreterError, SendError } from "./errors.js";
const log_particle = logger("particle"); const log_particle = logger("particle");
const log_peer = logger("peer"); const log_peer = logger("peer");
@ -247,11 +249,13 @@ export abstract class FluencePeer {
/** /**
* Initiates a new particle execution starting from local peer * Initiates a new particle execution starting from local peer
* @param particle - particle to start execution of * @param particle - particle to start execution of
* @param onStageChange - callback for reacting on particle state changes * @param onSuccess - callback which is called when particle execution succeed
* @param onError - callback which is called when particle execution fails
*/ */
initiateParticle: ( initiateParticle: (
particle: IParticle, particle: IParticle,
onStageChange: (stage: ParticleExecutionStage) => void, onSuccess: (result: JSONValue) => void,
onError: (error: Error) => void,
): void => { ): void => {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error( throw new Error(
@ -268,7 +272,8 @@ export abstract class FluencePeer {
this._incomingParticles.next({ this._incomingParticles.next({
particle: particle, particle: particle,
callResults: [], callResults: [],
onStageChange: onStageChange, onSuccess,
onError,
}); });
}, },
@ -336,7 +341,8 @@ export abstract class FluencePeer {
this._incomingParticles.next({ this._incomingParticles.next({
particle: p, particle: p,
callResults: [], callResults: [],
onStageChange: () => {}, onSuccess: () => {},
onError: () => {},
}); });
}, },
}, },
@ -471,10 +477,11 @@ export abstract class FluencePeer {
item.result.message, item.result.message,
); );
item.onStageChange({ item.onError(
stage: "interpreterError", new InterpreterError(
errorMessage: item.result.message, `Script interpretation failed: ${item.result.message} (particle id: ${item.particle.id})`,
}); ),
);
return; return;
} }
@ -493,10 +500,11 @@ export abstract class FluencePeer {
this.decodeAvmData(item.result.data), this.decodeAvmData(item.result.data),
); );
item.onStageChange({ item.onError(
stage: "interpreterError", new InterpreterError(
errorMessage: item.result.errorMessage, `Script interpretation failed: ${item.result.errorMessage} (particle id: ${item.particle.id})`,
}); ),
);
return; return;
} }
@ -508,10 +516,6 @@ export abstract class FluencePeer {
this.decodeAvmData(item.result.data), this.decodeAvmData(item.result.data),
); );
setTimeout(() => {
item.onStageChange({ stage: "interpreted" });
}, 0);
let connectionPromise: Promise<void> = Promise.resolve(); let connectionPromise: Promise<void> = Promise.resolve();
// send particle further if requested // send particle further if requested
@ -527,6 +531,8 @@ export abstract class FluencePeer {
item.result.nextPeerPks.toString(), item.result.nextPeerPks.toString(),
); );
const interpreterResult = item.result;
connectionPromise = this.connection connectionPromise = this.connection
.sendParticle(item.result.nextPeerPks, newParticle) .sendParticle(item.result.nextPeerPks, newParticle)
.then(() => { .then(() => {
@ -535,7 +541,10 @@ export abstract class FluencePeer {
newParticle.id, newParticle.id,
); );
item.onStageChange({ stage: "sent" }); if (interpreterResult.callRequests.length === 0) {
// Nothing to call, just fire-and-forget behavior
item.onSuccess(null);
}
}) })
.catch((e: unknown) => { .catch((e: unknown) => {
log_particle.error( log_particle.error(
@ -544,10 +553,13 @@ export abstract class FluencePeer {
e, e,
); );
item.onStageChange({ const message = getErrorMessage(e);
stage: "sendingError",
errorMessage: getErrorMessage(e), item.onError(
}); new SendError(
`Could not send particle: (particle id: ${item.particle.id}, message: ${message})`,
),
);
}); });
} }
@ -560,7 +572,10 @@ export abstract class FluencePeer {
args: cr.arguments, args: cr.arguments,
serviceId: cr.serviceId, serviceId: cr.serviceId,
tetraplets: cr.tetraplets, tetraplets: cr.tetraplets,
particleContext: getParticleContext(item.particle), particleContext: getParticleContext(
item.particle,
cr.tetraplets,
),
}; };
void this._execSingleCallRequest(req) void this._execSingleCallRequest(req)
@ -582,6 +597,14 @@ export abstract class FluencePeer {
}; };
}) })
.then((res) => { .then((res) => {
if (
req.serviceId === "callbackSrv" &&
req.fnName === "response"
) {
// Particle already processed
return;
}
const serviceResult = { const serviceResult = {
result: jsonify(res.result), result: jsonify(res.result),
retCode: res.retCode, retCode: res.retCode,
@ -600,7 +623,8 @@ export abstract class FluencePeer {
}); });
} }
} else { } else {
item.onStageChange({ stage: "localWorkDone" }); // Every air instruction executed or particle will go to relay
item.onSuccess(null);
} }
return connectionPromise; return connectionPromise;
@ -613,6 +637,7 @@ export abstract class FluencePeer {
} }
private _expireParticle(item: ParticleQueueItem) { private _expireParticle(item: ParticleQueueItem) {
console.log(item);
const particleId = item.particle.id; const particleId = item.particle.id;
log_particle.debug( log_particle.debug(
@ -623,7 +648,11 @@ export abstract class FluencePeer {
this.jsServiceHost.removeParticleScopeHandlers(particleId); this.jsServiceHost.removeParticleScopeHandlers(particleId);
item.onStageChange({ stage: "expired" }); item.onError(
new ExpirationError(
`Particle expired after ttl of ${item.particle.ttl}ms (particle id: ${item.particle.id})`,
),
);
} }
private decodeAvmData(data: Uint8Array) { private decodeAvmData(data: Uint8Array) {

View File

@ -44,7 +44,11 @@ describe("Basic AVM functionality in Fluence Peer tests", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toBe("1"); expect(res).toBe("1");
@ -85,7 +89,11 @@ describe("Basic AVM functionality in Fluence Peer tests", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toStrictEqual(["1", "2"]); expect(res).toStrictEqual(["1", "2"]);
@ -126,7 +134,11 @@ describe("Basic AVM functionality in Fluence Peer tests", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toBe("fast_result"); expect(res).toBe("fast_result");
@ -178,7 +190,11 @@ describe("Basic AVM functionality in Fluence Peer tests", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toBe("failed_with_timeout"); expect(res).toBe("failed_with_timeout");

View File

@ -94,7 +94,11 @@ describe("FluencePeer flow tests", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toEqual(expect.arrayContaining(["test1", "test1"])); expect(res).toEqual(expect.arrayContaining(["test1", "test1"]));

View File

@ -72,7 +72,11 @@ describe("FluencePeer usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toBe("test"); expect(res).toBe("test");
@ -130,7 +134,11 @@ describe("FluencePeer usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
expect(res).toBe(null); expect(res).toBe(null);
@ -167,7 +175,11 @@ describe("FluencePeer usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(
particle,
() => {},
handleTimeout(reject),
);
}); });
await expect(promise).rejects.toMatchObject({ await expect(promise).rejects.toMatchObject({
@ -205,6 +217,6 @@ async function callIncorrectService(peer: FluencePeer) {
}, },
}); });
peer.internals.initiateParticle(particle, handleTimeout(reject)); peer.internals.initiateParticle(particle, () => {}, handleTimeout(reject));
}); });
} }

View File

@ -0,0 +1,21 @@
/**
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export class ExpirationError extends Error {}
export class InterpreterError extends Error {}
export class SendError extends Error {}

View File

@ -79,7 +79,7 @@ export enum ResultCodes {
/** /**
* Particle context. Contains additional information about particle which triggered `call` air instruction from AVM * Particle context. Contains additional information about particle which triggered `call` air instruction from AVM
*/ */
export interface ParticleContext { export type ParticleContext = {
/** /**
* The identifier of particle which triggered the call * The identifier of particle which triggered the call
*/ */
@ -104,7 +104,12 @@ export interface ParticleContext {
* Particle's signature * Particle's signature
*/ */
signature: Uint8Array; signature: Uint8Array;
}
/**
* Security Tetraplets received from AVM and copied here
*/
tetraplets: SecurityTetraplet[][];
};
/** /**
* Represents the information passed from AVM when a `call` air instruction is executed on the local peer * Represents the information passed from AVM when a `call` air instruction is executed on the local peer

View File

@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
import { SecurityTetraplet } from "@fluencelabs/avm";
import { JSONArray } from "@fluencelabs/interfaces"; import { JSONArray } from "@fluencelabs/interfaces";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
@ -28,10 +29,6 @@ import {
ResultCodes, ResultCodes,
} from "./interfaces.js"; } from "./interfaces.js";
export const doNothing = () => {
return undefined;
};
export const WrapFnIntoServiceCall = ( export const WrapFnIntoServiceCall = (
fn: (args: JSONArray) => CallServiceResultType | undefined, fn: (args: JSONArray) => CallServiceResultType | undefined,
) => { ) => {
@ -51,13 +48,17 @@ export class ServiceError extends Error {
} }
} }
export const getParticleContext = (particle: IParticle): ParticleContext => { export const getParticleContext = (
particle: IParticle,
tetraplets: SecurityTetraplet[][],
): ParticleContext => {
return { return {
particleId: particle.id, particleId: particle.id,
initPeerId: particle.initPeerId, initPeerId: particle.initPeerId,
timestamp: particle.timestamp, timestamp: particle.timestamp,
ttl: particle.ttl, ttl: particle.ttl,
signature: particle.signature, signature: particle.signature,
tetraplets,
}; };
}; };

View File

@ -24,6 +24,8 @@ import { KeyPair } from "../keypair/index.js";
import { numberToLittleEndianBytes } from "../util/bytes.js"; import { numberToLittleEndianBytes } from "../util/bytes.js";
import { IParticle } from "./interfaces.js"; import { IParticle } from "./interfaces.js";
import { JSONValue } from "@fluencelabs/interfaces";
import { ExpirationError } from "../jsPeer/errors.js";
const particleSchema = z.object({ const particleSchema = z.object({
id: z.string(), id: z.string(),
@ -183,15 +185,16 @@ export type ParticleExecutionStage =
export interface ParticleQueueItem { export interface ParticleQueueItem {
particle: IParticle; particle: IParticle;
callResults: CallResultsArray; callResults: CallResultsArray;
onStageChange: (state: ParticleExecutionStage) => void; onSuccess: (result: JSONValue) => void;
onError: (error: Error) => void;
} }
/** /**
* Helper function to handle particle at expired stage * Helper function to handle particle at expired stage
*/ */
export const handleTimeout = (fn: () => void) => { export const handleTimeout = (fn: () => void) => {
return (stage: ParticleExecutionStage) => { return (error: Error) => {
if (stage.stage === "expired") { if (error instanceof ExpirationError) {
fn(); fn();
} }
}; };

View File

@ -14,58 +14,45 @@
* limitations under the License. * limitations under the License.
*/ */
import { Buffer } from "buffer"; import { readFile } from "fs/promises";
import * as fs from "fs";
import { CallParams } from "@fluencelabs/interfaces";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
import { ParticleContext } from "../jsServiceHost/interfaces.js";
import { getErrorMessage } from "../util/utils.js"; import { getErrorMessage } from "../util/utils.js";
import { NodeUtilsDef, registerNodeUtils } from "./_aqua/node-utils.js"; import { registerNodeUtils } from "./_aqua/node-utils.js";
import { SecurityGuard } from "./securityGuard.js"; import { SecurityGuard } from "./securityGuard.js";
import { defaultGuard } from "./SingleModuleSrv.js"; import { defaultGuard } from "./SingleModuleSrv.js";
export class NodeUtils implements NodeUtilsDef { export class NodeUtils {
constructor(private peer: FluencePeer) { constructor(private peer: FluencePeer) {
this.securityGuard_readFile = defaultGuard(this.peer); this.securityGuard_readFile = defaultGuard(this.peer);
} }
securityGuard_readFile: SecurityGuard<"path">; securityGuard_readFile: SecurityGuard;
async read_file(path: string, callParams: CallParams<"path">) { async read_file(path: string, callParams: ParticleContext) {
if (!this.securityGuard_readFile(callParams)) { if (!this.securityGuard_readFile(callParams)) {
return { return {
success: false, success: false,
error: "Security guard validation failed", error: ["Security guard validation failed"],
content: null, content: null,
}; };
} }
try { try {
// Strange enough, but Buffer type works here, while reading with encoding 'utf-8' doesn't // Strange enough, but Buffer type works here, while reading with encoding 'utf-8' doesn't
const data = await new Promise<Buffer>((resolve, reject) => { const data = await readFile(path, "base64");
fs.readFile(path, (err, data) => {
if (err != null) {
reject(err);
return;
}
resolve(data);
});
});
return { return {
success: true, success: true,
// TODO: this is strange bug. content: [data],
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
content: data as unknown as string,
error: null, error: null,
}; };
} catch (err: unknown) { } catch (err: unknown) {
return { return {
success: false, success: false,
error: getErrorMessage(err), error: [getErrorMessage(err)],
content: null, content: null,
}; };
} }

View File

@ -14,11 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
import { CallParams, PeerIdB58 } from "@fluencelabs/interfaces"; import { PeerIdB58 } from "@fluencelabs/interfaces";
import { ParticleContext } from "../jsServiceHost/interfaces.js";
import { KeyPair } from "../keypair/index.js"; import { KeyPair } from "../keypair/index.js";
import { SigDef } from "./_aqua/services.js";
import { import {
allowOnlyParticleOriginatedAt, allowOnlyParticleOriginatedAt,
allowServiceFn, allowServiceFn,
@ -28,7 +28,7 @@ import {
} from "./securityGuard.js"; } from "./securityGuard.js";
export const defaultSigGuard = (peerId: PeerIdB58) => { export const defaultSigGuard = (peerId: PeerIdB58) => {
return and<"data">( return and(
allowOnlyParticleOriginatedAt(peerId), allowOnlyParticleOriginatedAt(peerId),
or( or(
allowServiceFn("trust-graph", "get_trust_bytes"), allowServiceFn("trust-graph", "get_trust_bytes"),
@ -43,23 +43,23 @@ export const defaultSigGuard = (peerId: PeerIdB58) => {
type SignReturnType = type SignReturnType =
| { | {
error: null; error: [];
signature: number[]; signature: [number[]];
success: true; success: true;
} }
| { | {
error: string; error: [string];
signature: null; signature: [];
success: false; success: false;
}; };
export class Sig implements SigDef { export class Sig {
constructor(private keyPair: KeyPair) {} constructor(private keyPair: KeyPair) {}
/** /**
* Configurable security guard for sign method * Configurable security guard for sign method
*/ */
securityGuard: SecurityGuard<"data"> = () => { securityGuard: SecurityGuard = () => {
return true; return true;
}; };
@ -75,13 +75,13 @@ export class Sig implements SigDef {
*/ */
async sign( async sign(
data: number[], data: number[],
callParams: CallParams<"data">, context: ParticleContext,
): Promise<SignReturnType> { ): Promise<SignReturnType> {
if (!this.securityGuard(callParams)) { if (!this.securityGuard(context)) {
return { return {
success: false, success: false,
error: "Security guard validation failed", error: ["Security guard validation failed"],
signature: null, signature: [],
}; };
} }
@ -89,8 +89,8 @@ export class Sig implements SigDef {
return { return {
success: true, success: true,
error: null, error: [],
signature: Array.from(signedData), signature: [Array.from(signedData)],
}; };
} }

View File

@ -16,13 +16,12 @@
import { Buffer } from "buffer"; import { Buffer } from "buffer";
import { CallParams } from "@fluencelabs/interfaces";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { FluencePeer } from "../jsPeer/FluencePeer.js"; import { FluencePeer } from "../jsPeer/FluencePeer.js";
import { ParticleContext } from "../jsServiceHost/interfaces.js";
import { getErrorMessage } from "../util/utils.js"; import { getErrorMessage } from "../util/utils.js";
import { SrvDef } from "./_aqua/single-module-srv.js";
import { import {
allowOnlyParticleOriginatedAt, allowOnlyParticleOriginatedAt,
SecurityGuard, SecurityGuard,
@ -32,7 +31,7 @@ export const defaultGuard = (peer: FluencePeer) => {
return allowOnlyParticleOriginatedAt(peer.keyPair.getPeerId()); return allowOnlyParticleOriginatedAt(peer.keyPair.getPeerId());
}; };
export class Srv implements SrvDef { export class Srv {
private services: Set<string> = new Set(); private services: Set<string> = new Set();
constructor(private peer: FluencePeer) { constructor(private peer: FluencePeer) {
@ -40,16 +39,13 @@ export class Srv implements SrvDef {
this.securityGuard_remove = defaultGuard(this.peer); this.securityGuard_remove = defaultGuard(this.peer);
} }
securityGuard_create: SecurityGuard<"wasm_b64_content">; securityGuard_create: SecurityGuard;
async create( async create(wasm_b64_content: string, callParams: ParticleContext) {
wasm_b64_content: string,
callParams: CallParams<"wasm_b64_content">,
) {
if (!this.securityGuard_create(callParams)) { if (!this.securityGuard_create(callParams)) {
return { return {
success: false, success: false,
error: "Security guard validation failed", error: ["Security guard validation failed"],
service_id: null, service_id: null,
}; };
} }
@ -66,25 +62,25 @@ export class Srv implements SrvDef {
return { return {
success: true, success: true,
service_id: newServiceId, service_id: [newServiceId],
error: null, error: null,
}; };
} catch (err: unknown) { } catch (err: unknown) {
return { return {
success: true, success: true,
service_id: null, service_id: null,
error: getErrorMessage(err), error: [getErrorMessage(err)],
}; };
} }
} }
securityGuard_remove: SecurityGuard<"service_id">; securityGuard_remove: SecurityGuard;
async remove(service_id: string, callParams: CallParams<"service_id">) { async remove(service_id: string, callParams: ParticleContext) {
if (!this.securityGuard_remove(callParams)) { if (!this.securityGuard_remove(callParams)) {
return { return {
success: false, success: false,
error: "Security guard validation failed", error: ["Security guard validation failed"],
service_id: null, service_id: null,
}; };
} }
@ -92,7 +88,7 @@ export class Srv implements SrvDef {
if (!this.services.has(service_id)) { if (!this.services.has(service_id)) {
return { return {
success: false, success: false,
error: `Service with id ${service_id} not found`, error: [`Service with id ${service_id} not found`],
}; };
} }

View File

@ -16,11 +16,14 @@
import assert from "assert"; import assert from "assert";
import { CallParams, JSONArray } from "@fluencelabs/interfaces"; import { JSONArray } from "@fluencelabs/interfaces";
import { toUint8Array } from "js-base64"; import { toUint8Array } from "js-base64";
import { it, describe, expect, test } from "vitest"; import { it, describe, expect, test } from "vitest";
import { CallServiceData } from "../../jsServiceHost/interfaces.js"; import {
CallServiceData,
ParticleContext,
} from "../../jsServiceHost/interfaces.js";
import { KeyPair } from "../../keypair/index.js"; import { KeyPair } from "../../keypair/index.js";
import { builtInServices } from "../builtins.js"; import { builtInServices } from "../builtins.js";
import { allowServiceFn } from "../securityGuard.js"; import { allowServiceFn } from "../securityGuard.js";
@ -149,6 +152,7 @@ describe("Tests for default handler", () => {
timestamp: 595951200, timestamp: 595951200,
ttl: 595961200, ttl: 595961200,
signature: new Uint8Array([]), signature: new Uint8Array([]),
tetraplets: [],
}, },
}; };
@ -185,6 +189,7 @@ describe("Tests for default handler", () => {
timestamp: 595951200, timestamp: 595951200,
ttl: 595961200, ttl: 595961200,
signature: new Uint8Array([]), signature: new Uint8Array([]),
tetraplets: [],
}, },
}; };
@ -243,14 +248,15 @@ const makeTestTetraplet = (
initPeerId: string, initPeerId: string,
serviceId: string, serviceId: string,
fnName: string, fnName: string,
): CallParams<"data"> => { ): ParticleContext => {
return { return {
particleId: "", particleId: "",
timestamp: 0, timestamp: 0,
ttl: 0, ttl: 0,
initPeerId: initPeerId, initPeerId: initPeerId,
tetraplets: { signature: new Uint8Array([]),
data: [ tetraplets: [
[
{ {
peer_pk: initPeerId, peer_pk: initPeerId,
function_name: fnName, function_name: fnName,
@ -258,7 +264,7 @@ const makeTestTetraplet = (
json_path: "", json_path: "",
}, },
], ],
}, ],
}; };
}; };
@ -273,7 +279,7 @@ describe("Sig service tests", () => {
); );
expect(res.success).toBe(true); expect(res.success).toBe(true);
expect(res.signature).toStrictEqual(testDataSig); expect(res.signature).toStrictEqual([testDataSig]);
}); });
it("sig.verify should return true for the correct signature", async () => { it("sig.verify should return true for the correct signature", async () => {
@ -305,7 +311,7 @@ describe("Sig service tests", () => {
expect(signature.success).toBe(true); expect(signature.success).toBe(true);
assert(signature.success); assert(signature.success);
const res = await sig.verify(signature.signature, testData); const res = await sig.verify(signature.signature[0], testData);
expect(res).toBe(true); expect(res).toBe(true);
}); });
@ -334,7 +340,7 @@ describe("Sig service tests", () => {
); );
expect(res.success).toBe(false); expect(res.success).toBe(false);
expect(res.error).toBe("Security guard validation failed"); expect(res.error).toStrictEqual(["Security guard validation failed"]);
}); });
it("sig.sign with defaultSigGuard should not allow particles initiated from other peers", async () => { it("sig.sign with defaultSigGuard should not allow particles initiated from other peers", async () => {
@ -352,7 +358,7 @@ describe("Sig service tests", () => {
); );
expect(res.success).toBe(false); expect(res.success).toBe(false);
expect(res.error).toBe("Security guard validation failed"); expect(res.error).toStrictEqual(["Security guard validation failed"]);
}); });
it("changing securityGuard should work", async () => { it("changing securityGuard should work", async () => {

View File

@ -17,7 +17,6 @@
import { it, describe, expect, beforeEach, afterEach } from "vitest"; import { it, describe, expect, beforeEach, afterEach } from "vitest";
import { FluencePeer } from "../../jsPeer/FluencePeer.js"; import { FluencePeer } from "../../jsPeer/FluencePeer.js";
import { doNothing } from "../../jsServiceHost/serviceUtils.js";
import { mkTestPeer } from "../../util/testUtils.js"; import { mkTestPeer } from "../../util/testUtils.js";
let peer: FluencePeer; let peer: FluencePeer;
@ -72,7 +71,11 @@ describe("Sig service test suite", () => {
}); });
const p = await peer.internals.createNewParticle(script); const p = await peer.internals.createNewParticle(script);
peer.internals.initiateParticle(p, doNothing); peer.internals.initiateParticle(
p,
() => {},
() => {},
);
const [ const [
nestedFirst, nestedFirst,

View File

@ -17,10 +17,10 @@
import * as path from "path"; import * as path from "path";
import * as url from "url"; import * as url from "url";
import { ServiceDef, ServiceImpl } from "@fluencelabs/interfaces";
import { it, describe, expect, beforeAll } from "vitest"; import { it, describe, expect, beforeAll } from "vitest";
import { registerService } from "../../compilerSupport/registerService.js"; import { registerService } from "../../compilerSupport/registerService.js";
import { ServiceImpl } from "../../compilerSupport/types.js";
import { KeyPair } from "../../keypair/index.js"; import { KeyPair } from "../../keypair/index.js";
import { compileAqua, CompiledFnCall, withPeer } from "../../util/testUtils.js"; import { compileAqua, CompiledFnCall, withPeer } from "../../util/testUtils.js";
import { allowServiceFn } from "../securityGuard.js"; import { allowServiceFn } from "../securityGuard.js";
@ -29,8 +29,6 @@ import { Sig } from "../Sig.js";
const __dirname = url.fileURLToPath(new URL(".", import.meta.url)); const __dirname = url.fileURLToPath(new URL(".", import.meta.url));
let aqua: Record<string, CompiledFnCall>; let aqua: Record<string, CompiledFnCall>;
let sigDef: ServiceDef;
let dataProviderDef: ServiceDef;
describe("Sig service test suite", () => { describe("Sig service test suite", () => {
beforeAll(async () => { beforeAll(async () => {
@ -39,14 +37,12 @@ describe("Sig service test suite", () => {
"../../../aqua_test/sigService.aqua", "../../../aqua_test/sigService.aqua",
); );
const { services, functions } = await compileAqua(pathToAquaFiles); const { functions } = await compileAqua(pathToAquaFiles);
aqua = functions; aqua = functions;
sigDef = services["Sig"];
dataProviderDef = services["DataProvider"];
}); });
it("Use custom sig service, success path", async () => { it.only("Use custom sig service, success path", async () => {
await withPeer(async (peer) => { await withPeer(async (peer) => {
const customKeyPair = await KeyPair.randomEd25519(); const customKeyPair = await KeyPair.randomEd25519();
const customSig = new Sig(customKeyPair); const customSig = new Sig(customKeyPair);
@ -54,7 +50,6 @@ describe("Sig service test suite", () => {
registerService({ registerService({
peer, peer,
def: sigDef,
serviceId: "CustomSig", serviceId: "CustomSig",
// TODO: fix this after changing registerService signature // TODO: fix this after changing registerService signature
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@ -63,7 +58,6 @@ describe("Sig service test suite", () => {
registerService({ registerService({
peer, peer,
def: dataProviderDef,
serviceId: "data", serviceId: "data",
service: { service: {
provide_data: () => { provide_data: () => {
@ -81,7 +75,7 @@ describe("Sig service test suite", () => {
const isSigCorrect = await customSig.verify( const isSigCorrect = await customSig.verify(
// TODO: Use compiled ts wrappers // TODO: Use compiled ts wrappers
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
(result as { signature: number[] }).signature, (result as { signature: [number[]] }).signature[0],
data, data,
); );
@ -97,7 +91,6 @@ describe("Sig service test suite", () => {
registerService({ registerService({
peer, peer,
def: sigDef,
serviceId: "CustomSig", serviceId: "CustomSig",
// TODO: fix this after changing registerService signature // TODO: fix this after changing registerService signature
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@ -106,7 +99,6 @@ describe("Sig service test suite", () => {
registerService({ registerService({
peer, peer,
def: dataProviderDef,
serviceId: "data", serviceId: "data",
service: { service: {
provide_data: () => { provide_data: () => {
@ -122,7 +114,7 @@ describe("Sig service test suite", () => {
}); });
}); });
it("Default sig service should be resolvable by peer id", async () => { it.only("Default sig service should be resolvable by peer id", async () => {
await withPeer(async (peer) => { await withPeer(async (peer) => {
const sig = peer.internals.getServices().sig; const sig = peer.internals.getServices().sig;
@ -130,7 +122,6 @@ describe("Sig service test suite", () => {
registerService({ registerService({
peer: peer, peer: peer,
def: dataProviderDef,
serviceId: "data", serviceId: "data",
service: { service: {
provide_data: () => { provide_data: () => {
@ -146,6 +137,11 @@ describe("Sig service test suite", () => {
}); });
expect(callAsSigRes).toHaveProperty("success", false); expect(callAsSigRes).toHaveProperty("success", false);
expect(callAsPeerIdRes).toHaveProperty("error", [
"Security guard validation failed",
]);
expect(callAsPeerIdRes).toHaveProperty("success", false); expect(callAsPeerIdRes).toHaveProperty("success", false);
sig.securityGuard = () => { sig.securityGuard = () => {
@ -167,7 +163,8 @@ describe("Sig service test suite", () => {
const isValid = await sig.verify( const isValid = await sig.verify(
// TODO: Use compiled ts wrappers // TODO: Use compiled ts wrappers
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
(callAsSigResAfterGuardChange as { signature: number[] }).signature, (callAsSigResAfterGuardChange as { signature: [number[]] })
.signature[0],
data, data,
); );

View File

@ -14,30 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
/**
* This compiled aqua file was modified to make it work in monorepo
*/
import { CallParams, ServiceImpl } from "@fluencelabs/interfaces";
import { registerService } from "../../compilerSupport/registerService.js"; import { registerService } from "../../compilerSupport/registerService.js";
import { ServiceImpl } from "../../compilerSupport/types.js";
import { FluencePeer } from "../../jsPeer/FluencePeer.js"; import { FluencePeer } from "../../jsPeer/FluencePeer.js";
import { NodeUtils } from "../NodeUtils.js"; import { NodeUtils } from "../NodeUtils.js";
// Services
export interface NodeUtilsDef {
read_file: (
path: string,
callParams: CallParams<"path">,
) =>
| { content: string | null; error: string | null; success: boolean }
| Promise<{
content: string | null;
error: string | null;
success: boolean;
}>;
}
export function registerNodeUtils( export function registerNodeUtils(
peer: FluencePeer, peer: FluencePeer,
serviceId: string, serviceId: string,
@ -49,55 +30,6 @@ export function registerNodeUtils(
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
service: service as unknown as ServiceImpl, service: service as unknown as ServiceImpl,
serviceId, serviceId,
def: {
defaultServiceId: "node_utils",
functions: {
tag: "labeledProduct",
fields: {
read_file: {
tag: "arrow",
domain: {
tag: "labeledProduct",
fields: {
path: {
tag: "scalar",
name: "string",
},
},
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "struct",
name: "ReadFileResult",
fields: {
content: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
error: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
success: {
tag: "scalar",
name: "bool",
},
},
},
],
},
},
},
},
},
}); });
} }

View File

@ -14,33 +14,30 @@
* limitations under the License. * limitations under the License.
*/ */
/**
* This compiled aqua file was modified to make it work in monorepo
*/
import { CallParams, ServiceImpl } from "@fluencelabs/interfaces";
import { registerService } from "../../compilerSupport/registerService.js"; import { registerService } from "../../compilerSupport/registerService.js";
import { ServiceImpl } from "../../compilerSupport/types.js";
import { FluencePeer } from "../../jsPeer/FluencePeer.js"; import { FluencePeer } from "../../jsPeer/FluencePeer.js";
import { ParticleContext } from "../../jsServiceHost/interfaces.js";
import { Sig } from "../Sig.js"; import { Sig } from "../Sig.js";
// Services // Services
export interface SigDef { export interface SigDef {
get_peer_id: (callParams: CallParams<null>) => string | Promise<string>; get_peer_id: (callParams: ParticleContext) => string | Promise<string>;
sign: ( sign: (
data: number[], data: number[],
callParams: CallParams<"data">, callParams: ParticleContext,
) => ) =>
| { error: string | null; signature: number[] | null; success: boolean } | { error: [string?]; signature: [number[]?]; success: boolean }
| Promise<{ | Promise<{
error: string | null; error: [string?];
signature: number[] | null; signature: [number[]?];
success: boolean; success: boolean;
}>; }>;
verify: ( verify: (
signature: number[], signature: number[],
data: number[], data: number[],
callParams: CallParams<"signature" | "data">, callParams: ParticleContext,
) => boolean | Promise<boolean>; ) => boolean | Promise<boolean>;
} }
@ -55,107 +52,6 @@ export function registerSig(
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
service: service as unknown as ServiceImpl, service: service as unknown as ServiceImpl,
serviceId, serviceId,
def: {
defaultServiceId: "sig",
functions: {
tag: "labeledProduct",
fields: {
get_peer_id: {
tag: "arrow",
domain: {
tag: "nil",
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "scalar",
name: "string",
},
],
},
},
sign: {
tag: "arrow",
domain: {
tag: "labeledProduct",
fields: {
data: {
tag: "array",
type: {
tag: "scalar",
name: "u8",
},
},
},
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "struct",
name: "SignResult",
fields: {
error: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
signature: {
tag: "option",
type: {
tag: "array",
type: {
tag: "scalar",
name: "u8",
},
},
},
success: {
tag: "scalar",
name: "bool",
},
},
},
],
},
},
verify: {
tag: "arrow",
domain: {
tag: "labeledProduct",
fields: {
signature: {
tag: "array",
type: {
tag: "scalar",
name: "u8",
},
},
data: {
tag: "array",
type: {
tag: "scalar",
name: "u8",
},
},
},
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "scalar",
name: "bool",
},
],
},
},
},
},
},
}); });
} }

View File

@ -14,37 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
/**
* This compiled aqua file was modified to make it work in monorepo
*/
import { CallParams, ServiceImpl } from "@fluencelabs/interfaces";
import { registerService } from "../../compilerSupport/registerService.js"; import { registerService } from "../../compilerSupport/registerService.js";
import { ServiceImpl } from "../../compilerSupport/types.js";
import { FluencePeer } from "../../jsPeer/FluencePeer.js"; import { FluencePeer } from "../../jsPeer/FluencePeer.js";
import { Srv } from "../SingleModuleSrv.js"; import { Srv } from "../SingleModuleSrv.js";
// Services
export interface SrvDef {
create: (
wasm_b64_content: string,
callParams: CallParams<"wasm_b64_content">,
) =>
| { error: string | null; service_id: string | null; success: boolean }
| Promise<{
error: string | null;
service_id: string | null;
success: boolean;
}>;
list: (callParams: CallParams<null>) => string[] | Promise<string[]>;
remove: (
service_id: string,
callParams: CallParams<"service_id">,
) =>
| { error: string | null; success: boolean }
| Promise<{ error: string | null; success: boolean }>;
}
export function registerSrv( export function registerSrv(
peer: FluencePeer, peer: FluencePeer,
serviceId: string, serviceId: string,
@ -56,107 +30,6 @@ export function registerSrv(
// TODO: fix this after changing registerService signature // TODO: fix this after changing registerService signature
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
service: service as unknown as ServiceImpl, service: service as unknown as ServiceImpl,
def: {
defaultServiceId: "single_module_srv",
functions: {
tag: "labeledProduct",
fields: {
create: {
tag: "arrow",
domain: {
tag: "labeledProduct",
fields: {
wasm_b64_content: {
tag: "scalar",
name: "string",
},
},
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "struct",
name: "ServiceCreationResult",
fields: {
error: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
service_id: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
success: {
tag: "scalar",
name: "bool",
},
},
},
],
},
},
list: {
tag: "arrow",
domain: {
tag: "nil",
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "array",
type: {
tag: "scalar",
name: "string",
},
},
],
},
},
remove: {
tag: "arrow",
domain: {
tag: "labeledProduct",
fields: {
service_id: {
tag: "scalar",
name: "string",
},
},
},
codomain: {
tag: "unlabeledProduct",
items: [
{
tag: "struct",
name: "RemoveResult",
fields: {
error: {
tag: "option",
type: {
tag: "scalar",
name: "string",
},
},
success: {
tag: "scalar",
name: "bool",
},
},
},
],
},
},
},
},
},
}); });
} }

View File

@ -15,25 +15,23 @@
*/ */
import { SecurityTetraplet } from "@fluencelabs/avm"; import { SecurityTetraplet } from "@fluencelabs/avm";
import { CallParams, PeerIdB58 } from "@fluencelabs/interfaces"; import { PeerIdB58 } from "@fluencelabs/interfaces";
type ArgName = string | null; import { ParticleContext } from "../jsServiceHost/interfaces.js";
/** /**
* A predicate of call params for sig service's sign method which determines whether signing operation is allowed or not * A predicate of call params for sig service's sign method which determines whether signing operation is allowed or not
*/ */
export type SecurityGuard<T extends ArgName> = ( export type SecurityGuard = (params: ParticleContext) => boolean;
params: CallParams<T>,
) => boolean;
/** /**
* Only allow calls when tetraplet for 'data' argument satisfies the predicate * Only allow calls when tetraplet for 'data' argument satisfies the predicate
*/ */
export const allowTetraplet = <T extends ArgName>( export const allowTetraplet = (
pred: (tetraplet: SecurityTetraplet) => boolean, pred: (tetraplet: SecurityTetraplet) => boolean,
): SecurityGuard<T> => { ): SecurityGuard => {
return (params) => { return (params) => {
const t = params.tetraplets["data"][0]; const t = params.tetraplets[0][0];
return pred(t); return pred(t);
}; };
}; };
@ -41,10 +39,10 @@ export const allowTetraplet = <T extends ArgName>(
/** /**
* Only allow data which comes from the specified serviceId and fnName * Only allow data which comes from the specified serviceId and fnName
*/ */
export const allowServiceFn = <T extends ArgName>( export const allowServiceFn = (
serviceId: string, serviceId: string,
fnName: string, fnName: string,
): SecurityGuard<T> => { ): SecurityGuard => {
return allowTetraplet((t) => { return allowTetraplet((t) => {
return t.service_id === serviceId && t.function_name === fnName; return t.service_id === serviceId && t.function_name === fnName;
}); });
@ -53,9 +51,7 @@ export const allowServiceFn = <T extends ArgName>(
/** /**
* Only allow data originated from the specified json_path * Only allow data originated from the specified json_path
*/ */
export const allowExactJsonPath = <T extends ArgName>( export const allowExactJsonPath = (jsonPath: string): SecurityGuard => {
jsonPath: string,
): SecurityGuard<T> => {
return allowTetraplet((t) => { return allowTetraplet((t) => {
return t.json_path === jsonPath; return t.json_path === jsonPath;
}); });
@ -64,9 +60,9 @@ export const allowExactJsonPath = <T extends ArgName>(
/** /**
* Only allow signing when particle is initiated at the specified peer * Only allow signing when particle is initiated at the specified peer
*/ */
export const allowOnlyParticleOriginatedAt = <T extends ArgName>( export const allowOnlyParticleOriginatedAt = (
peerId: PeerIdB58, peerId: PeerIdB58,
): SecurityGuard<T> => { ): SecurityGuard => {
return (params) => { return (params) => {
return params.initPeerId === peerId; return params.initPeerId === peerId;
}; };
@ -76,9 +72,7 @@ export const allowOnlyParticleOriginatedAt = <T extends ArgName>(
* Only allow signing when all of the predicates are satisfied. * Only allow signing when all of the predicates are satisfied.
* Useful for predicates reuse * Useful for predicates reuse
*/ */
export const and = <T extends ArgName>( export const and = (...predicates: SecurityGuard[]): SecurityGuard => {
...predicates: SecurityGuard<T>[]
): SecurityGuard<T> => {
return (params) => { return (params) => {
return predicates.every((x) => { return predicates.every((x) => {
return x(params); return x(params);
@ -90,9 +84,7 @@ export const and = <T extends ArgName>(
* Only allow signing when any of the predicates are satisfied. * Only allow signing when any of the predicates are satisfied.
* Useful for predicates reuse * Useful for predicates reuse
*/ */
export const or = <T extends ArgName>( export const or = (...predicates: SecurityGuard[]): SecurityGuard => {
...predicates: SecurityGuard<T>[]
): SecurityGuard<T> => {
return (params) => { return (params) => {
return predicates.some((x) => { return predicates.some((x) => {
return x(params); return x(params);

View File

@ -20,7 +20,7 @@ import { Path, Aqua } from "@fluencelabs/aqua-api/aqua-api.js";
import { import {
FunctionCallDef, FunctionCallDef,
JSONArray, JSONArray,
PassedArgs, JSONValue,
ServiceDef, ServiceDef,
} from "@fluencelabs/interfaces"; } from "@fluencelabs/interfaces";
import { Subject, Subscribable } from "rxjs"; import { Subject, Subscribable } from "rxjs";
@ -30,7 +30,10 @@ import { ClientConfig, RelayOptions } from "../clientPeer/types.js";
import { callAquaFunction } from "../compilerSupport/callFunction.js"; import { callAquaFunction } from "../compilerSupport/callFunction.js";
import { IConnection } from "../connection/interfaces.js"; import { IConnection } from "../connection/interfaces.js";
import { DEFAULT_CONFIG, FluencePeer } from "../jsPeer/FluencePeer.js"; import { DEFAULT_CONFIG, FluencePeer } from "../jsPeer/FluencePeer.js";
import { CallServiceResultType } from "../jsServiceHost/interfaces.js"; import {
CallServiceResultType,
ParticleContext,
} from "../jsServiceHost/interfaces.js";
import { JsServiceHost } from "../jsServiceHost/JsServiceHost.js"; import { JsServiceHost } from "../jsServiceHost/JsServiceHost.js";
import { WrapFnIntoServiceCall } from "../jsServiceHost/serviceUtils.js"; import { WrapFnIntoServiceCall } from "../jsServiceHost/serviceUtils.js";
import { KeyPair } from "../keypair/index.js"; import { KeyPair } from "../keypair/index.js";
@ -73,6 +76,18 @@ interface FunctionInfo {
funcDef: FunctionCallDef; funcDef: FunctionCallDef;
} }
/**
* Type for callback passed as aqua function argument
*/
export type ArgCallbackFunction = (
...args: [...JSONValue[], ParticleContext]
) => JSONValue | Promise<JSONValue>;
/**
* Arguments passed to Aqua function
*/
export type PassedArgs = { [key: string]: JSONValue | ArgCallbackFunction };
export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => { export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => {
await fs.access(aquaFile); await fs.access(aquaFile);
@ -88,11 +103,12 @@ export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => {
); );
} }
console.log(compilationResult);
const functions = Object.entries(compilationResult.functions) const functions = Object.entries(compilationResult.functions)
.map(([name, fnInfo]: [string, FunctionInfo]) => { .map(([name, fnInfo]: [string, FunctionInfo]) => {
const callFn = (peer: FluencePeer, args: PassedArgs) => { const callFn = (peer: FluencePeer, args: PassedArgs) => {
return callAquaFunction({ return callAquaFunction({
def: fnInfo.funcDef,
script: fnInfo.script, script: fnInfo.script,
config: {}, config: {},
peer: peer, peer: peer,