Use a queue for http tasks to reduce Insufficient Resources errors

This commit is contained in:
Mitra Ardron 2019-06-21 23:03:23 +10:00
parent c8520c6c7c
commit a8643d275b
2 changed files with 53 additions and 2 deletions

View File

@ -57,7 +57,7 @@ let defaultoptions = {
for now just throw an error on undefined
WORKAROUND-GUN-STORAGE: GUN defaults to local storage, which then fails on 5Mb or more of data, need to use radix, which has to be included and has bizarre config requirement I can't figure out
TODO-GUN, handle error callbacks which are available in put etc
Errors and Promises: Note that GUN's use of promises is seriously uexpected (aka weird), see https://gun.eco/docs/SEA#errors
Errors and Promises: Note that GUN's use of promises is seriously unexpected (aka weird), see https://gun.eco/docs/SEA#errors
instead of using .reject or throwing an error at async it puts the error in SEA.err, so how that works in async parallel context is anyone's guess
*/

View File

@ -1,6 +1,7 @@
const nodefetch = require('node-fetch'); // Note, were using node-fetch-npm which had a warning in webpack see https://github.com/bitinn/node-fetch/issues/421 and is intended for clients
const errors = require('./Errors'); // Standard Dweb Errors
const debug = require('debug')('dweb-transports:httptools');
const queue = require('async/queue');
//var fetch,Headers,Request;
//if (typeof(Window) === "undefined") {
@ -21,6 +22,56 @@ if (typeof(fetch) === "undefined") {
httptools = {};
let httpTaskQueue;
function queueSetup({concurrency}) {
httpTaskQueue = queue((task, cb) => {
if (task.loopguard === ((typeof window != "undefined") && window.loopguard)) {
fetch(task.req)
.then(res => {
debug("Fetch of %s succeded", task.what);
httpTaskQueue.concurrency++;
debug("Raising concurrency to %s", httpTaskQueue.concurrency);
cb(null);
task.cb(null, res);
})
.catch(err => {
//TODO-QUEUE add loopguard back in
httpTaskQueue.concurrency = Math.max(httpTaskQueue.concurrency-1, 6)
debug("Dropping concurrency to %s", httpTaskQueue.concurrency);
cb(err);
if (--task.count > 0) {
debug("Retrying fetch of %s in %s ms: %s", task.what, task.ms, err.message);
httpTaskQueue.push(task);
/* Alternative with timeouts - not needed
let timeout = task.ms;
task.ms = Math.floor(task.ms*(1+Math.random())); // Spread out delays incase all requesting same time
setTimeout(() => { httpTaskQueue.push(task);}, timeout);
*/
} else {
debug("Requeued fetch of %s failed: %s", task.what, err.message);
task.cb(err);
}
});
} else {
debug("Dropping fetch of %s as window changed from %s to %s", task.what, task.loopguard, window.loopguard);
}
}, concurrency)
}
queueSetup({concurrency: 6});
function queuedFetch(req, ms, count, what) {
return new Promise((resolve, reject) => {
count = count || 1; // 0 means 1
httpTaskQueue.push({
req, count, ms, what,
loopguard: (typeof window != "undefined") && window.loopguard, // Optional global parameter, will cancel any loops if changes
cb: (err, res) => {
if(err) { reject(err); } else {resolve(res); }
},
});
});
}
async function loopfetch(req, ms, count, what) {
/*
@ -70,7 +121,7 @@ httptools.p_httpfetch = async function(httpurl, init, {wantstream=false, retries
// Using window.fetch, because it doesn't appear to be in scope otherwise in the browser.
let req = new Request(httpurl, init);
//let response = await fetch(req);
let response = await loopfetch(req, 500, retries, "fetching "+httpurl);
let response = await queuedFetch(req, 500, retries, "fetching "+httpurl);
// fetch throws (on Chrome, untested on Firefox or Node) TypeError: Failed to fetch)
// Note response.body gets a stream and response.blob gets a blob and response.arrayBuffer gets a buffer.
if (response.ok) {