Stop Status and Info changes to support better status indicators and heartbeat on HTTP

This commit is contained in:
Mitra Ardron 2019-06-26 20:42:37 +10:00
parent b137ab82c1
commit 9d9e998d77
6 changed files with 118 additions and 71 deletions

9
API.md
View File

@ -109,10 +109,11 @@ Code|Name|Means
3|STATUS_LOADED|Code loaded but havent tried to connect
4|STATUS_PAUSED|It was launched, probably connected, but now paused so will be ignored by validfor()
##### async p_stop(refreshstatus)
Stop the transport,
##### async stop(refreshstatus, cb)
Stop the transport,
```
refreshstatus (optional) callback(transport instance) to the UI to update status on display
cb(err, this)
```
### Transport: General storage and retrieval of objects
##### p_rawstore(data)
@ -286,8 +287,8 @@ throw TransportError if invalid URL
##### validFor(url, func, opts)
True if the url and/or function is supported and the Transport is connected appropriately.
##### p_info()
Return a JSON with info about the server.
##### p_info() and info(cb)
Return a JSON with info about the server via promise or callback
## Transports class
The Transports Class manages multiple transports

View File

@ -54,15 +54,11 @@ class Transport {
return t.p_setup2(cb); // And connect
}
/* Disconnect from the transport service - there is no guarrantee that a restart will be successfull so this is usually only for when exiting */
p_stop(refreshstatus) {
stop(refreshstatus, cb) {
// refreshstatus(Transport instance) => optional callback to the UI to update the status on the display
return new Promise((resolve, reject) => {
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
//if (err) { reject(err) } else {
resolve();
//}
});
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
cb(null, this);
}
togglePaused(cb) {
/*

View File

@ -3,12 +3,13 @@ const Transports = require('./Transports'); // Manage all Transports that are lo
const httptools = require('./httptools'); // Expose some of the httptools so that IPFS can use it as a backup
const Url = require('url');
const stream = require('readable-stream');
const debughttp = require('debug')('dweb-transports:http');
const debug = require('debug')('dweb-transports:http');
const stringify = require('canonical-json');
defaulthttpoptions = {
urlbase: 'https://dweb.me'
urlbase: 'https://dweb.me',
heartbeat: { delay: 30000 } // By default check twice a minute
};
servercommands = { // What the server wants to see to return each of these
@ -25,6 +26,16 @@ servercommands = { // What the server wants to see to return each of these
class TransportHTTP extends Transport {
/* Subclass of Transport for handling HTTP - see API.md for docs
options {
urlbase: e.g. https://dweb.me Where to go for URLS like /arc/...
heartbeat: {
delay // Time in milliseconds between checks - 30000 might be appropriate - if missing it wont do a heartbeat
statusCB // Callback cb(transport) when status changes
}
}
*/
constructor(options) {
super(options); // These are now options.http
@ -54,26 +65,57 @@ class TransportHTTP extends Transport {
throw err;
}
}
async p_setup1(cb) {
this.status = Transport.STATUS_STARTING;
if (cb) cb(this);
await this.p_status();
if (cb) cb(this);
return this;
p_setup1(statusCB) {
return new Promise((resolve, unusedReject) => {
this.status = Transport.STATUS_STARTING;
if (statusCB) statusCB(this);
this.updateStatus((unusedErr, unusedRes) => {
if (statusCB) statusCB(this);
this.startHeartbeat(this.options.heartbeat);
resolve(this); // Note always resolve even if error from p_status as have set status to failed
});
})
}
async p_status() {
async p_status(cb) { //TODO-API
/*
Return a numeric code for the status of a transport.
Return (via cb or promise) a numeric code for the status of a transport.
*/
try {
this.info = await this.p_info();
this.status = Transport.STATUS_CONNECTED;
} catch(err) {
console.error(this.name, ": Error in p_status.info",err.message);
this.status = Transport.STATUS_FAILED;
if (cb) { try { this.updateStatus(cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { this.updateStatus((err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2f
}
updateStatus(cb) { //TODO-API
this.updateInfo((err, res) => {
if (err) {
debug("Error status call to info failed %s", err.message);
this.status = Transport.STATUS_FAILED;
cb(null, this.status); // DOnt pass error up, the status indicates the error
} else {
this.info = res; // Save result
this.status = Transport.STATUS_CONNECTED;
cb(null, this.status);
}
});
}
startHeartbeat({delay=undefined, statusCB=undefined}) {
if (delay) {
this.heartbeatTimer = setInterval(() => {
this.updateStatus((err, res)=>{ // Pings server and sets status
if (statusCB) statusCB(this); // repeatedly call callback if supplies
}, (unusedErr, unusedRes)=>{}); // Dont wait for status to complete
}, delay);
}
return super.p_status();
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.hearbeatTimer);}
}
stop(refreshstatus, cb) {
this.stopHeartbeat();
this.status = Transport.STATUS_FAILED;
if (refreshstatus) { refreshstatus(this); }
cb(null, this);
}
_cmdurl(command) {
@ -180,7 +222,7 @@ class TransportHTTP extends Transport {
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
*/
//Logged by Transports
//debughttp("p_f_createreadstream %s", Url.parse(url).href);
//debug("p_f_createreadstream %s", Url.parse(url).href);
try {
let self = this;
if (wanturl) {
@ -206,7 +248,7 @@ class TransportHTTP extends Transport {
*/
// This breaks in browsers ... as 's' doesn't have .pipe but has .pipeTo and .pipeThrough neither of which work with stream.PassThrough
// TODO See https://github.com/nodejs/readable-stream/issues/406 in case its fixed in which case enable createReadStream in constructor above.
debughttp("createreadstream %s %o", Url.parse(url).href, opts);
debug("createreadstream %s %o", Url.parse(url).href, opts);
let through;
through = new stream.PassThrough();
httptools.p_GET(this._url(url, servercommands.rawfetch), Object.assign({wantstream: true}, opts))
@ -233,7 +275,7 @@ class TransportHTTP extends Transport {
:param opts: { start: byte to start from; end: optional end byte }
:resolves to stream: The readable stream.
*/
debughttp("createreadstream %s %o", Url.parse(url).href, opts);
debug("createreadstream %s %o", Url.parse(url).href, opts);
try {
return await httptools.p_GET(this._url(url, servercommands.rawfetch), Object.assign({wantstream: true}, opts));
} catch(err) {
@ -270,7 +312,7 @@ class TransportHTTP extends Transport {
async p_set(url, keyvalues, value) { // url = yjs:/yjs/database/table/key
if (!url || !keyvalues) throw new errors.CodingError("TransportHTTP.p_set: invalid parms", url, keyvalyes);
// Logged by Transports
//debughttp("p_set %o %o %o", url, keyvalues, value);
//debug("p_set %o %o %o", url, keyvalues, value);
if (typeof keyvalues === "string") {
let data = stringify([{key: keyvalues, value: value}]);
await httptools.p_POST(this._url(url, servercommands.set), {data, contenttype: "application/json"}); // Returns immediately
@ -309,11 +351,20 @@ class TransportHTTP extends Transport {
return {
table: "keyvaluetable",
_map: await this.p_getall(url)
}; // Data struc is ok as SmartDict.p_fetch will pass to KVT constructor
}; // Data structure is ok as SmartDict.p_fetch will pass to KVT constructor
}
*/
p_info() { return httptools.p_GET(`${this.urlbase}/info`, {retries: 5}); } // Try info, but dont wait more than approx 10secs
async p_info() { //TODO-API
/*
Return (via cb or promise) a numeric code for the status of a transport.
*/
return new Promise((resolve, reject) => { try { this.updateInfo((err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}}) // Promisify pattern v2b (no CB)
}
updateInfo(cb) {
httptools.p_GET(`${this.urlbase}/info`, {retries: 1}, cb); // Try info, but dont retry (usually heartbeat will reconnect)
}
static async p_test(opts={}) {
{console.log("TransportHTTP.test")}

View File

@ -183,21 +183,19 @@ class TransportIPFS extends Transport {
return this;
}
p_stop(refreshstatus) {
return new Promise((resolve, reject) => {
if (this.ipfstype === "client") {
this.ipfs.stop((err, res) => {
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
if (err) { reject(err); } else { resolve(res); }
});
} else {
// We didn't start it, don't try and stop it
stop(refreshstatus, cb) { //TODO-API p_stop > stop
if (this.ipfstype === "client") {
this.ipfs.stop((err, res) => {
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
resolve(this);
}
})
cb(err, res);
});
} else {
// We didn't start it, don't try and stop it
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
cb(miull, this);
}
}
async p_status() {
/*

View File

@ -83,20 +83,17 @@ class TransportWEBTORRENT extends Transport {
return this;
}
p_stop(refreshstatus) {
return new Promise((resolve, reject) => {
this.webtorrent.destroy((err) => {
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
if (err) {
debug("Webtorrent error during stopping %o", err);
reject(err);
} else {
debug("Webtorrent stopped");
resolve();
}
});
})
stop(refreshstatus, cb) {
this.webtorrent.destroy((err) => {
this.status = Transport.STATUS_FAILED;
if (refreshstatus) refreshstatus(this);
if (err) {
debug("Webtorrent error during stopping %o", err);
} else {
debug("Webtorrent stopped");
}
cb(err, this);
});
}
async p_status() {

View File

@ -4,6 +4,7 @@ const utils = require('./utils');
const debug = require('debug')('dweb-transports');
const httptools = require('./httptools');
const each = require('async/each');
const map = require('async/map');
class Transports {
/*
@ -41,7 +42,7 @@ class Transports {
/*
resolves to: a dictionary of statuses of transports, e.g. { TransportHTTP: STATUS_CONNECTED }
*/
const res = this._transports.map((t) => { return {"name": t.name, "status": t.status}})
const res = Transports._transports.map((t) => { return {"name": t.name, "status": t.status}})
if (cb) { cb(null, res)} else { return new Promise((resolve, reject) => resolve(res))}
}
static validFor(urls, func, opts) { //TODO-RELOAD check for noCache support
@ -673,15 +674,18 @@ class Transports {
}));
if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
}
static p_stop(refreshstatus, cb) {
static p_stop(refreshstatus, cb) { //TODO-API cb
if (cb) { try { f.call(this, cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { f.call(this, (err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2
/* Disconnect from all services, may not be able to reconnect */
const prom = Promise.all(this._connected()
.map((t) => {
debug("Stopping %s", t.name);
return t.p_stop(refreshstatus);
}));
if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
//TODO rewrite with async/map
function f(cb) {
map(this._connected(),
(t, cb2) => {
debug("Stopping %s", t.name);
t.stop(refreshstatus, cb2);
},
cb);
}
}
static async refreshstatus(t) {