From 9d9e998d777a29385781c908656480a912ceb790 Mon Sep 17 00:00:00 2001 From: Mitra Ardron Date: Wed, 26 Jun 2019 20:42:37 +1000 Subject: [PATCH] Stop Status and Info changes to support better status indicators and heartbeat on HTTP --- API.md | 9 ++-- Transport.js | 12 ++---- TransportHTTP.js | 97 ++++++++++++++++++++++++++++++++---------- TransportIPFS.js | 24 +++++------ TransportWEBTORRENT.js | 25 +++++------ Transports.js | 22 ++++++---- 6 files changed, 118 insertions(+), 71 deletions(-) diff --git a/API.md b/API.md index 367e576..72a71db 100644 --- a/API.md +++ b/API.md @@ -109,10 +109,11 @@ Code|Name|Means 3|STATUS_LOADED|Code loaded but havent tried to connect 4|STATUS_PAUSED|It was launched, probably connected, but now paused so will be ignored by validfor() -##### async p_stop(refreshstatus) -Stop the transport, +##### async stop(refreshstatus, cb) +Stop the transport, ``` refreshstatus (optional) callback(transport instance) to the UI to update status on display +cb(err, this) ``` ### Transport: General storage and retrieval of objects ##### p_rawstore(data) @@ -286,8 +287,8 @@ throw TransportError if invalid URL ##### validFor(url, func, opts) True if the url and/or function is supported and the Transport is connected appropriately. -##### p_info() -Return a JSON with info about the server. +##### p_info() and info(cb) +Return a JSON with info about the server via promise or callback ## Transports class The Transports Class manages multiple transports diff --git a/Transport.js b/Transport.js index d084f16..76035d4 100644 --- a/Transport.js +++ b/Transport.js @@ -54,15 +54,11 @@ class Transport { return t.p_setup2(cb); // And connect } /* Disconnect from the transport service - there is no guarrantee that a restart will be successfull so this is usually only for when exiting */ - p_stop(refreshstatus) { + stop(refreshstatus, cb) { // refreshstatus(Transport instance) => optional callback to the UI to update the status on the display - return new Promise((resolve, reject) => { - this.status = Transport.STATUS_FAILED; - if (refreshstatus) refreshstatus(this); - //if (err) { reject(err) } else { - resolve(); - //} - }); + this.status = Transport.STATUS_FAILED; + if (refreshstatus) refreshstatus(this); + cb(null, this); } togglePaused(cb) { /* diff --git a/TransportHTTP.js b/TransportHTTP.js index b2beaad..0b49a25 100644 --- a/TransportHTTP.js +++ b/TransportHTTP.js @@ -3,12 +3,13 @@ const Transports = require('./Transports'); // Manage all Transports that are lo const httptools = require('./httptools'); // Expose some of the httptools so that IPFS can use it as a backup const Url = require('url'); const stream = require('readable-stream'); -const debughttp = require('debug')('dweb-transports:http'); +const debug = require('debug')('dweb-transports:http'); const stringify = require('canonical-json'); defaulthttpoptions = { - urlbase: 'https://dweb.me' + urlbase: 'https://dweb.me', + heartbeat: { delay: 30000 } // By default check twice a minute }; servercommands = { // What the server wants to see to return each of these @@ -25,6 +26,16 @@ servercommands = { // What the server wants to see to return each of these class TransportHTTP extends Transport { + /* Subclass of Transport for handling HTTP - see API.md for docs + + options { + urlbase: e.g. https://dweb.me Where to go for URLS like /arc/... + heartbeat: { + delay // Time in milliseconds between checks - 30000 might be appropriate - if missing it wont do a heartbeat + statusCB // Callback cb(transport) when status changes + } + } + */ constructor(options) { super(options); // These are now options.http @@ -54,26 +65,57 @@ class TransportHTTP extends Transport { throw err; } } - async p_setup1(cb) { - this.status = Transport.STATUS_STARTING; - if (cb) cb(this); - await this.p_status(); - if (cb) cb(this); - return this; + + p_setup1(statusCB) { + return new Promise((resolve, unusedReject) => { + this.status = Transport.STATUS_STARTING; + if (statusCB) statusCB(this); + this.updateStatus((unusedErr, unusedRes) => { + if (statusCB) statusCB(this); + this.startHeartbeat(this.options.heartbeat); + resolve(this); // Note always resolve even if error from p_status as have set status to failed + }); + }) } - async p_status() { + async p_status(cb) { //TODO-API /* - Return a numeric code for the status of a transport. + Return (via cb or promise) a numeric code for the status of a transport. */ - try { - this.info = await this.p_info(); - this.status = Transport.STATUS_CONNECTED; - } catch(err) { - console.error(this.name, ": Error in p_status.info",err.message); - this.status = Transport.STATUS_FAILED; + if (cb) { try { this.updateStatus(cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { this.updateStatus((err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2f + } + updateStatus(cb) { //TODO-API + this.updateInfo((err, res) => { + if (err) { + debug("Error status call to info failed %s", err.message); + this.status = Transport.STATUS_FAILED; + cb(null, this.status); // DOnt pass error up, the status indicates the error + } else { + this.info = res; // Save result + this.status = Transport.STATUS_CONNECTED; + cb(null, this.status); + } + }); + } + + startHeartbeat({delay=undefined, statusCB=undefined}) { + if (delay) { + this.heartbeatTimer = setInterval(() => { + this.updateStatus((err, res)=>{ // Pings server and sets status + if (statusCB) statusCB(this); // repeatedly call callback if supplies + }, (unusedErr, unusedRes)=>{}); // Dont wait for status to complete + }, delay); } - return super.p_status(); + } + stopHeartbeat() { + if (this.heartbeatTimer) { + clearInterval(this.hearbeatTimer);} + } + stop(refreshstatus, cb) { + this.stopHeartbeat(); + this.status = Transport.STATUS_FAILED; + if (refreshstatus) { refreshstatus(this); } + cb(null, this); } _cmdurl(command) { @@ -180,7 +222,7 @@ class TransportHTTP extends Transport { :throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise */ //Logged by Transports - //debughttp("p_f_createreadstream %s", Url.parse(url).href); + //debug("p_f_createreadstream %s", Url.parse(url).href); try { let self = this; if (wanturl) { @@ -206,7 +248,7 @@ class TransportHTTP extends Transport { */ // This breaks in browsers ... as 's' doesn't have .pipe but has .pipeTo and .pipeThrough neither of which work with stream.PassThrough // TODO See https://github.com/nodejs/readable-stream/issues/406 in case its fixed in which case enable createReadStream in constructor above. - debughttp("createreadstream %s %o", Url.parse(url).href, opts); + debug("createreadstream %s %o", Url.parse(url).href, opts); let through; through = new stream.PassThrough(); httptools.p_GET(this._url(url, servercommands.rawfetch), Object.assign({wantstream: true}, opts)) @@ -233,7 +275,7 @@ class TransportHTTP extends Transport { :param opts: { start: byte to start from; end: optional end byte } :resolves to stream: The readable stream. */ - debughttp("createreadstream %s %o", Url.parse(url).href, opts); + debug("createreadstream %s %o", Url.parse(url).href, opts); try { return await httptools.p_GET(this._url(url, servercommands.rawfetch), Object.assign({wantstream: true}, opts)); } catch(err) { @@ -270,7 +312,7 @@ class TransportHTTP extends Transport { async p_set(url, keyvalues, value) { // url = yjs:/yjs/database/table/key if (!url || !keyvalues) throw new errors.CodingError("TransportHTTP.p_set: invalid parms", url, keyvalyes); // Logged by Transports - //debughttp("p_set %o %o %o", url, keyvalues, value); + //debug("p_set %o %o %o", url, keyvalues, value); if (typeof keyvalues === "string") { let data = stringify([{key: keyvalues, value: value}]); await httptools.p_POST(this._url(url, servercommands.set), {data, contenttype: "application/json"}); // Returns immediately @@ -309,11 +351,20 @@ class TransportHTTP extends Transport { return { table: "keyvaluetable", _map: await this.p_getall(url) - }; // Data struc is ok as SmartDict.p_fetch will pass to KVT constructor + }; // Data structure is ok as SmartDict.p_fetch will pass to KVT constructor } */ - p_info() { return httptools.p_GET(`${this.urlbase}/info`, {retries: 5}); } // Try info, but dont wait more than approx 10secs + async p_info() { //TODO-API + /* + Return (via cb or promise) a numeric code for the status of a transport. + */ + return new Promise((resolve, reject) => { try { this.updateInfo((err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}}) // Promisify pattern v2b (no CB) + } + + updateInfo(cb) { + httptools.p_GET(`${this.urlbase}/info`, {retries: 1}, cb); // Try info, but dont retry (usually heartbeat will reconnect) + } static async p_test(opts={}) { {console.log("TransportHTTP.test")} diff --git a/TransportIPFS.js b/TransportIPFS.js index a39b768..5b8eb4b 100644 --- a/TransportIPFS.js +++ b/TransportIPFS.js @@ -183,21 +183,19 @@ class TransportIPFS extends Transport { return this; } - p_stop(refreshstatus) { - return new Promise((resolve, reject) => { - if (this.ipfstype === "client") { - this.ipfs.stop((err, res) => { - this.status = Transport.STATUS_FAILED; - if (refreshstatus) refreshstatus(this); - if (err) { reject(err); } else { resolve(res); } - }); - } else { - // We didn't start it, don't try and stop it + stop(refreshstatus, cb) { //TODO-API p_stop > stop + if (this.ipfstype === "client") { + this.ipfs.stop((err, res) => { this.status = Transport.STATUS_FAILED; if (refreshstatus) refreshstatus(this); - resolve(this); - } - }) + cb(err, res); + }); + } else { + // We didn't start it, don't try and stop it + this.status = Transport.STATUS_FAILED; + if (refreshstatus) refreshstatus(this); + cb(miull, this); + } } async p_status() { /* diff --git a/TransportWEBTORRENT.js b/TransportWEBTORRENT.js index 86ab78b..68eb739 100644 --- a/TransportWEBTORRENT.js +++ b/TransportWEBTORRENT.js @@ -83,20 +83,17 @@ class TransportWEBTORRENT extends Transport { return this; } - p_stop(refreshstatus) { - return new Promise((resolve, reject) => { - this.webtorrent.destroy((err) => { - this.status = Transport.STATUS_FAILED; - if (refreshstatus) refreshstatus(this); - if (err) { - debug("Webtorrent error during stopping %o", err); - reject(err); - } else { - debug("Webtorrent stopped"); - resolve(); - } - }); - }) + stop(refreshstatus, cb) { + this.webtorrent.destroy((err) => { + this.status = Transport.STATUS_FAILED; + if (refreshstatus) refreshstatus(this); + if (err) { + debug("Webtorrent error during stopping %o", err); + } else { + debug("Webtorrent stopped"); + } + cb(err, this); + }); } async p_status() { diff --git a/Transports.js b/Transports.js index 0dd61eb..c432939 100644 --- a/Transports.js +++ b/Transports.js @@ -4,6 +4,7 @@ const utils = require('./utils'); const debug = require('debug')('dweb-transports'); const httptools = require('./httptools'); const each = require('async/each'); +const map = require('async/map'); class Transports { /* @@ -41,7 +42,7 @@ class Transports { /* resolves to: a dictionary of statuses of transports, e.g. { TransportHTTP: STATUS_CONNECTED } */ - const res = this._transports.map((t) => { return {"name": t.name, "status": t.status}}) + const res = Transports._transports.map((t) => { return {"name": t.name, "status": t.status}}) if (cb) { cb(null, res)} else { return new Promise((resolve, reject) => resolve(res))} } static validFor(urls, func, opts) { //TODO-RELOAD check for noCache support @@ -673,15 +674,18 @@ class Transports { })); if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern } - static p_stop(refreshstatus, cb) { + static p_stop(refreshstatus, cb) { //TODO-API cb + if (cb) { try { f.call(this, cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { f.call(this, (err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2 /* Disconnect from all services, may not be able to reconnect */ - - const prom = Promise.all(this._connected() - .map((t) => { - debug("Stopping %s", t.name); - return t.p_stop(refreshstatus); - })); - if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern + //TODO rewrite with async/map + function f(cb) { + map(this._connected(), + (t, cb2) => { + debug("Stopping %s", t.name); + t.stop(refreshstatus, cb2); + }, + cb); + } } static async refreshstatus(t) {