diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 7cd10413..77b769a9 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -22,7 +22,7 @@ import { FluenceConnection } from './FluenceConnection'; import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { KeyPair } from './KeyPair'; import { dataToString, avmLogFunction, w } from './utils'; -import { filter, pipe, Subject, tap } from 'rxjs'; +import { catchError, concatMap, filter, map, pipe, Subject, tap } from 'rxjs'; import { RequestFlow } from './compilerSupport/v1'; import log from 'loglevel'; import { defaultServices } from './defaultServices'; @@ -414,73 +414,134 @@ export class FluencePeer { let particlesQueue = new Subject(); let prevData: Uint8Array = Buffer.from([]); - particlesQueue - .pipe( - // force new line - filterExpiredParticles(this._expireParticle.bind(this)), - ) - .subscribe(async (item) => { - const particle = item.particle; - const result = await runAvmWorker(this.getStatus().peerId, this._worker, particle, prevData); + const interpreted = particlesQueue.pipe( + filterExpiredParticles(this._expireParticle.bind(this)), + tap((item) => { + item.particle.logTo('debug', 'Sending particle to interpreter'); + log.debug('prevData: ', dataToString(prevData)); + }), + concatMap((item) => + this._worker + .run( + item.particle.script, + prevData, + item.particle.data, + { + initPeerId: item.particle.initPeerId, + currentPeerId: this.getStatus().peerId, + }, + item.particle.callResults, + ) + .then((res) => { + return { ...item, interpreterResult: res }; + }), + ), + tap(({ interpreterResult }) => { + const toLog: any = { ...interpreterResult }; + toLog.data = dataToString(toLog.data); - // Do not continue if there was an error in particle interpretation - if (!isInterpretationSuccessful(result)) { - item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); - return; + if (isInterpretationSuccessful(interpreterResult)) { + log.debug('Interpreter result: ', w(toLog)); + } else { + log.error('Interpreter failed: ', w(toLog)); } + }), + map((item) => { + return { + ...item, + newData: Buffer.from(item.interpreterResult.data), + isInterpretationSuccessful: isInterpretationSuccessful(item.interpreterResult), + hasPeerPks: item.interpreterResult.nextPeerPks.length > 0, + hasCallRequests: item.interpreterResult.callRequests.length > 0, + }; + }), + map((item) => { + prevData = item.newData; + return item; + }), + ); + interpreted.pipe( + filter((x) => !x.isInterpretationSuccessful), + tap((item) => + setTimeout(() => { + item.onStageChange({ + stage: 'interpreterError', + errorMessage: item.interpreterResult.errorMessage, + }); + return; + }, 0), + ), + ); + + interpreted.pipe( + filter((x) => x.isInterpretationSuccessful), + tap((item) => setTimeout(() => { item.onStageChange({ stage: 'interpreted' }); - }, 0); + }, 0), + ), + ); - const newData = Buffer.from(result.data); - prevData = newData; + interpreted.pipe( + filter((x) => x.isInterpretationSuccessful), + filter((x) => x.hasPeerPks), + tap((item) => { + const newParticle = item.particle.clone(); + newParticle.data = item.newData; + this._outgoingParticles.next({ ...item, particle: newParticle }); + }), + ); - // send particle further if requested - if (result.nextPeerPks.length > 0) { - const newParticle = particle.clone(); - newParticle.data = newData; - this._outgoingParticles.next({ ...item, particle: newParticle }); - } + interpreted.pipe( + filter((x) => x.isInterpretationSuccessful), + filter((x) => !x.hasCallRequests), + tap((item) => { + item.onStageChange({ stage: 'localWorkDone' }); + }), + ); - // execute call requests if needed - // and put particle with the results back to queue - if (result.callRequests.length > 0) { - for (let [key, cr] of result.callRequests) { - const req = { - fnName: cr.functionName, - args: cr.arguments, - serviceId: cr.serviceId, - tetraplets: cr.tetraplets, - particleContext: particle.getParticleContext(), - }; + interpreted + .pipe( + filter((x) => x.isInterpretationSuccessful), + filter((x) => x.hasCallRequests), + concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)), + map(([item, key, cr]) => { + const req = { + fnName: cr.functionName, + args: cr.arguments, + serviceId: cr.serviceId, + tetraplets: cr.tetraplets, + particleContext: item.particle.getParticleContext(), + }; + return [item, key, req] as const; + }), + concatMap(([item, key, req]) => { + return this._execSingleCallRequest(req) + .catch( + (err): CallServiceResult => ({ + retCode: ResultCodes.exceptionInHandler, + result: `Handler failed. fnName="${req.fnName}" serviceId="${ + req.serviceId + }" error: ${err.toString()}`, + }), + ) + .then((res) => [item, key, res] as const); + }), + map(([item, key, res]) => { + const serviceResult = { + result: w(res.result), + retCode: res.retCode, + }; - this._execSingleCallRequest(req) - .catch( - (err): CallServiceResult => ({ - retCode: ResultCodes.exceptionInHandler, - result: `Handler failed. fnName="${req.fnName}" serviceId="${ - req.serviceId - }" error: ${err.toString()}`, - }), - ) - .then((res) => { - const serviceResult = { - result: w(res.result), - retCode: res.retCode, - }; + const newParticle = item.particle.clone(); + newParticle.callResults = [[key, serviceResult]]; + newParticle.data = Buffer.from([]); - const newParticle = particle.clone(); - newParticle.callResults = [[key, serviceResult]]; - newParticle.data = Buffer.from([]); - - particlesQueue.next({ ...item, particle: newParticle }); - }); - } - } else { - item.onStageChange({ stage: 'localWorkDone' }); - } - }); + return { particle: newParticle, onStageChange: item.onStageChange }; + }), + ) + .subscribe((item) => particlesQueue.next(item)); return particlesQueue; }