From a8643d275b4247a4080d47e81eab6905ea5b74a2 Mon Sep 17 00:00:00 2001 From: Mitra Ardron Date: Fri, 21 Jun 2019 23:03:23 +1000 Subject: [PATCH] Use a queue for http tasks to reduce Insufficient Resources errors --- TransportGUN.js | 2 +- httptools.js | 53 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/TransportGUN.js b/TransportGUN.js index 82d062d..553fcca 100644 --- a/TransportGUN.js +++ b/TransportGUN.js @@ -57,7 +57,7 @@ let defaultoptions = { for now just throw an error on undefined WORKAROUND-GUN-STORAGE: GUN defaults to local storage, which then fails on 5Mb or more of data, need to use radix, which has to be included and has bizarre config requirement I can't figure out TODO-GUN, handle error callbacks which are available in put etc - Errors and Promises: Note that GUN's use of promises is seriously uexpected (aka weird), see https://gun.eco/docs/SEA#errors + Errors and Promises: Note that GUN's use of promises is seriously unexpected (aka weird), see https://gun.eco/docs/SEA#errors instead of using .reject or throwing an error at async it puts the error in SEA.err, so how that works in async parallel context is anyone's guess */ diff --git a/httptools.js b/httptools.js index 123c95c..49b7428 100644 --- a/httptools.js +++ b/httptools.js @@ -1,6 +1,7 @@ const nodefetch = require('node-fetch'); // Note, were using node-fetch-npm which had a warning in webpack see https://github.com/bitinn/node-fetch/issues/421 and is intended for clients const errors = require('./Errors'); // Standard Dweb Errors const debug = require('debug')('dweb-transports:httptools'); +const queue = require('async/queue'); //var fetch,Headers,Request; //if (typeof(Window) === "undefined") { @@ -21,6 +22,56 @@ if (typeof(fetch) === "undefined") { httptools = {}; +let httpTaskQueue; + +function queueSetup({concurrency}) { + httpTaskQueue = queue((task, cb) => { + if (task.loopguard === ((typeof window != "undefined") && window.loopguard)) { + fetch(task.req) + .then(res => { + debug("Fetch of %s succeded", task.what); + httpTaskQueue.concurrency++; + debug("Raising concurrency to %s", httpTaskQueue.concurrency); + cb(null); + task.cb(null, res); + }) + .catch(err => { + //TODO-QUEUE add loopguard back in + httpTaskQueue.concurrency = Math.max(httpTaskQueue.concurrency-1, 6) + debug("Dropping concurrency to %s", httpTaskQueue.concurrency); + cb(err); + if (--task.count > 0) { + debug("Retrying fetch of %s in %s ms: %s", task.what, task.ms, err.message); + httpTaskQueue.push(task); + /* Alternative with timeouts - not needed + let timeout = task.ms; + task.ms = Math.floor(task.ms*(1+Math.random())); // Spread out delays incase all requesting same time + setTimeout(() => { httpTaskQueue.push(task);}, timeout); + */ + } else { + debug("Requeued fetch of %s failed: %s", task.what, err.message); + task.cb(err); + } + }); + } else { + debug("Dropping fetch of %s as window changed from %s to %s", task.what, task.loopguard, window.loopguard); + } + }, concurrency) +} +queueSetup({concurrency: 6}); + +function queuedFetch(req, ms, count, what) { + return new Promise((resolve, reject) => { + count = count || 1; // 0 means 1 + httpTaskQueue.push({ + req, count, ms, what, + loopguard: (typeof window != "undefined") && window.loopguard, // Optional global parameter, will cancel any loops if changes + cb: (err, res) => { + if(err) { reject(err); } else {resolve(res); } + }, + }); + }); +} async function loopfetch(req, ms, count, what) { /* @@ -70,7 +121,7 @@ httptools.p_httpfetch = async function(httpurl, init, {wantstream=false, retries // Using window.fetch, because it doesn't appear to be in scope otherwise in the browser. let req = new Request(httpurl, init); //let response = await fetch(req); - let response = await loopfetch(req, 500, retries, "fetching "+httpurl); + let response = await queuedFetch(req, 500, retries, "fetching "+httpurl); // fetch throws (on Chrome, untested on Firefox or Node) TypeError: Failed to fetch) // Note response.body gets a stream and response.blob gets a blob and response.arrayBuffer gets a buffer. if (response.ok) {