fix subscribtions

This commit is contained in:
Pavel Murygin 2021-12-10 16:07:46 +03:00
parent 157d6309c1
commit ba76e4919d

View File

@ -461,87 +461,92 @@ export class FluencePeer {
}), }),
); );
interpreted.pipe( const successful = interpreted.pipe(
// force new line
filter((x) => x.isInterpretationSuccessful),
);
const failed = interpreted.pipe(
// force new line
filter((x) => !x.isInterpretationSuccessful), filter((x) => !x.isInterpretationSuccessful),
tap((item) =>
setTimeout(() => {
item.onStageChange({
stage: 'interpreterError',
errorMessage: item.interpreterResult.errorMessage,
});
return;
}, 0),
),
); );
interpreted.pipe( failed.subscribe((item) =>
filter((x) => x.isInterpretationSuccessful), setTimeout(() => {
tap((item) => item.onStageChange({
setTimeout(() => { stage: 'interpreterError',
item.onStageChange({ stage: 'interpreted' }); errorMessage: item.interpreterResult.errorMessage,
}, 0), });
), return;
}, 0),
); );
interpreted.pipe( successful.subscribe((item) =>
filter((x) => x.isInterpretationSuccessful), setTimeout(() => {
filter((x) => x.hasPeerPks), item.onStageChange({ stage: 'interpreted' });
tap((item) => { }, 0),
);
successful
.pipe(
// force new line
filter((x) => x.hasPeerPks),
)
.subscribe((item) => {
const newParticle = item.particle.clone(); const newParticle = item.particle.clone();
newParticle.data = item.newData; newParticle.data = item.newData;
this._outgoingParticles.next({ ...item, particle: newParticle }); this._outgoingParticles.next({ ...item, particle: newParticle });
}), });
);
interpreted.pipe( successful
filter((x) => x.isInterpretationSuccessful),
filter((x) => !x.hasCallRequests),
tap((item) => {
item.onStageChange({ stage: 'localWorkDone' });
}),
);
interpreted
.pipe( .pipe(
filter((x) => x.isInterpretationSuccessful), // force new line
filter((x) => x.hasCallRequests), filter((x) => !x.hasCallRequests),
concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)),
map(([item, key, cr]) => {
const req = {
fnName: cr.functionName,
args: cr.arguments,
serviceId: cr.serviceId,
tetraplets: cr.tetraplets,
particleContext: item.particle.getParticleContext(),
};
return [item, key, req] as const;
}),
concatMap(([item, key, req]) => {
return this._execSingleCallRequest(req)
.catch(
(err): CallServiceResult => ({
retCode: ResultCodes.exceptionInHandler,
result: `Handler failed. fnName="${req.fnName}" serviceId="${
req.serviceId
}" error: ${err.toString()}`,
}),
)
.then((res) => [item, key, res] as const);
}),
map(([item, key, res]) => {
const serviceResult = {
result: w(res.result),
retCode: res.retCode,
};
const newParticle = item.particle.clone();
newParticle.callResults = [[key, serviceResult]];
newParticle.data = Buffer.from([]);
return { particle: newParticle, onStageChange: item.onStageChange };
}),
) )
.subscribe((item) => particlesQueue.next(item)); .subscribe((item) => {
item.onStageChange({ stage: 'localWorkDone' });
});
const readyToSend = successful.pipe(
filter((x) => x.hasCallRequests),
concatMap((item) => item.interpreterResult.callRequests.map(([key, cr]) => [item, key, cr] as const)),
map(([item, key, cr]) => {
const req = {
fnName: cr.functionName,
args: cr.arguments,
serviceId: cr.serviceId,
tetraplets: cr.tetraplets,
particleContext: item.particle.getParticleContext(),
};
return [item, key, req] as const;
}),
concatMap(([item, key, req]) => {
return this._execSingleCallRequest(req)
.catch(
(err): CallServiceResult => ({
retCode: ResultCodes.exceptionInHandler,
result: `Handler failed. fnName="${req.fnName}" serviceId="${
req.serviceId
}" error: ${err.toString()}`,
}),
)
.then((res) => [item, key, res] as const);
}),
map(([item, key, res]) => {
const serviceResult = {
result: w(res.result),
retCode: res.retCode,
};
const newParticle = item.particle.clone();
newParticle.callResults = [[key, serviceResult]];
newParticle.data = Buffer.from([]);
return { particle: newParticle, onStageChange: item.onStageChange };
}),
);
readyToSend.subscribe((item) => particlesQueue.next(item));
return particlesQueue; return particlesQueue;
} }