This commit is contained in:
Pavel Murygin 2021-12-10 15:46:30 +03:00
parent 658a41b418
commit 157d6309c1

View File

@ -22,7 +22,7 @@ import { FluenceConnection } from './FluenceConnection';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { KeyPair } from './KeyPair'; import { KeyPair } from './KeyPair';
import { dataToString, avmLogFunction, w } from './utils'; import { dataToString, avmLogFunction, w } from './utils';
import { filter, pipe, Subject, tap } from 'rxjs'; import { catchError, concatMap, filter, map, pipe, Subject, tap } from 'rxjs';
import { RequestFlow } from './compilerSupport/v1'; import { RequestFlow } from './compilerSupport/v1';
import log from 'loglevel'; import log from 'loglevel';
import { defaultServices } from './defaultServices'; import { defaultServices } from './defaultServices';
@ -414,48 +414,110 @@ export class FluencePeer {
let particlesQueue = new Subject<ParticleQueueItem>(); let particlesQueue = new Subject<ParticleQueueItem>();
let prevData: Uint8Array = Buffer.from([]); let prevData: Uint8Array = Buffer.from([]);
particlesQueue const interpreted = particlesQueue.pipe(
.pipe(
// force new line
filterExpiredParticles(this._expireParticle.bind(this)), filterExpiredParticles(this._expireParticle.bind(this)),
tap((item) => {
item.particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
}),
concatMap((item) =>
this._worker
.run(
item.particle.script,
prevData,
item.particle.data,
{
initPeerId: item.particle.initPeerId,
currentPeerId: this.getStatus().peerId,
},
item.particle.callResults,
) )
.subscribe(async (item) => { .then((res) => {
const particle = item.particle; return { ...item, interpreterResult: res };
const result = await runAvmWorker(this.getStatus().peerId, this._worker, particle, prevData); }),
),
tap(({ interpreterResult }) => {
const toLog: any = { ...interpreterResult };
toLog.data = dataToString(toLog.data);
// Do not continue if there was an error in particle interpretation if (isInterpretationSuccessful(interpreterResult)) {
if (!isInterpretationSuccessful(result)) { log.debug('Interpreter result: ', w(toLog));
item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); } else {
return; log.error('Interpreter failed: ', w(toLog));
} }
}),
map((item) => {
return {
...item,
newData: Buffer.from(item.interpreterResult.data),
isInterpretationSuccessful: isInterpretationSuccessful(item.interpreterResult),
hasPeerPks: item.interpreterResult.nextPeerPks.length > 0,
hasCallRequests: item.interpreterResult.callRequests.length > 0,
};
}),
map((item) => {
prevData = item.newData;
return item;
}),
);
interpreted.pipe(
filter((x) => !x.isInterpretationSuccessful),
tap((item) =>
setTimeout(() => {
item.onStageChange({
stage: 'interpreterError',
errorMessage: item.interpreterResult.errorMessage,
});
return;
}, 0),
),
);
interpreted.pipe(
filter((x) => x.isInterpretationSuccessful),
tap((item) =>
setTimeout(() => { setTimeout(() => {
item.onStageChange({ stage: 'interpreted' }); item.onStageChange({ stage: 'interpreted' });
}, 0); }, 0),
),
);
const newData = Buffer.from(result.data); interpreted.pipe(
prevData = newData; filter((x) => x.isInterpretationSuccessful),
filter((x) => x.hasPeerPks),
// send particle further if requested tap((item) => {
if (result.nextPeerPks.length > 0) { const newParticle = item.particle.clone();
const newParticle = particle.clone(); newParticle.data = item.newData;
newParticle.data = newData;
this._outgoingParticles.next({ ...item, particle: newParticle }); this._outgoingParticles.next({ ...item, particle: newParticle });
} }),
);
// execute call requests if needed interpreted.pipe(
// and put particle with the results back to queue filter((x) => x.isInterpretationSuccessful),
if (result.callRequests.length > 0) { filter((x) => !x.hasCallRequests),
for (let [key, cr] of result.callRequests) { tap((item) => {
item.onStageChange({ stage: 'localWorkDone' });
}),
);
interpreted
.pipe(
filter((x) => x.isInterpretationSuccessful),
filter((x) => x.hasCallRequests),
concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)),
map(([item, key, cr]) => {
const req = { const req = {
fnName: cr.functionName, fnName: cr.functionName,
args: cr.arguments, args: cr.arguments,
serviceId: cr.serviceId, serviceId: cr.serviceId,
tetraplets: cr.tetraplets, tetraplets: cr.tetraplets,
particleContext: particle.getParticleContext(), particleContext: item.particle.getParticleContext(),
}; };
return [item, key, req] as const;
this._execSingleCallRequest(req) }),
concatMap(([item, key, req]) => {
return this._execSingleCallRequest(req)
.catch( .catch(
(err): CallServiceResult => ({ (err): CallServiceResult => ({
retCode: ResultCodes.exceptionInHandler, retCode: ResultCodes.exceptionInHandler,
@ -464,23 +526,22 @@ export class FluencePeer {
}" error: ${err.toString()}`, }" error: ${err.toString()}`,
}), }),
) )
.then((res) => { .then((res) => [item, key, res] as const);
}),
map(([item, key, res]) => {
const serviceResult = { const serviceResult = {
result: w(res.result), result: w(res.result),
retCode: res.retCode, retCode: res.retCode,
}; };
const newParticle = particle.clone(); const newParticle = item.particle.clone();
newParticle.callResults = [[key, serviceResult]]; newParticle.callResults = [[key, serviceResult]];
newParticle.data = Buffer.from([]); newParticle.data = Buffer.from([]);
particlesQueue.next({ ...item, particle: newParticle }); return { particle: newParticle, onStageChange: item.onStageChange };
}); }),
} )
} else { .subscribe((item) => particlesQueue.next(item));
item.onStageChange({ stage: 'localWorkDone' });
}
});
return particlesQueue; return particlesQueue;
} }