diff --git a/package.json b/package.json index 8c33202a..df8c77b3 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,7 @@ "multistream-select": "^0.15.0", "once": "^1.4.0", "p-any": "^2.1.0", + "p-fifo": "^1.0.0", "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", diff --git a/src/dialer.js b/src/dialer.js index 6000c8d6..3306af07 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -39,8 +39,6 @@ class Dialer { this.timeout = timeout this.perPeerLimit = perPeerLimit this.tokens = [...new Array(concurrency)].map((_, index) => index) - - this.releaseToken = this.releaseToken.bind(this) } /** @@ -68,7 +66,10 @@ class Dialer { * @returns {Promise} */ async connectToMultiaddrs (addrs, options = {}) { - const dialAction = (addr, options) => this.transportManager.dial(addr, options) + const dialAction = (addr, options) => { + if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED') + return this.transportManager.dial(addr, options) + } const dialRequest = new DialRequest({ addrs, dialAction, diff --git a/src/dialer/dial-request.js b/src/dialer/dial-request.js index b2739799..910ced43 100644 --- a/src/dialer/dial-request.js +++ b/src/dialer/dial-request.js @@ -2,14 +2,13 @@ const AbortController = require('abort-controller') const AggregateError = require('aggregate-error') -const pDefer = require('p-defer') const anySignal = require('any-signal') const debug = require('debug') +const errCode = require('err-code') const log = debug('libp2p:dialer:request') log.error = debug('libp2p:dialer:request:error') -const { AbortError } = require('libp2p-interfaces/src/transport/errors') - -const { TokenHolder } = require('./token-holder') +const FIFO = require('p-fifo') +const pAny = require('p-any') class DialRequest { /** @@ -37,161 +36,36 @@ class DialRequest { * @returns {Connection} */ async run (options) { - // Determine how many tokens we need - const tokensWanted = Math.min(this.addrs.length, this.dialer.perPeerLimit) - // Get the tokens - const tokens = this.dialer.getTokens(tokensWanted) + const tokens = this.dialer.getTokens(this.addrs.length) // If no tokens are available, throw if (tokens.length < 1) { - throw Object.assign(new Error('No dial tokens available'), { code: 'ERR_NO_DIAL_TOKENS' }) + throw errCode(new Error('No dial tokens available'), 'ERR_NO_DIAL_TOKENS') } - // For every token, run a multiaddr dial - // If there are tokens left, release them - // If there are multiaddrs left, wait for tokens to finish - const th = new TokenHolder(tokens, t => this.dialer.releaseToken(t)) + const th = new FIFO() + tokens.forEach(t => th.push(t)) + const dialAbortControllers = this.addrs.map(() => new AbortController()) - // Create the dial functions - const dials = this.addrs.map(addr => { - return () => this._abortableDial(addr, options) - }) - - const dialResolver = new DialResolver() - while (dials.length > 0) { - if (dialResolver.finished) break - // Wait for the next available token - const token = await th.getToken() - const dial = dials.shift() - dialResolver.add(dial, () => th.releaseToken(token)) - } - - // Start giving back the tokens - th.drain() - // Flush all the dials to get the final response - return dialResolver.flush() - } - - /** - * @private - * @param {Multiaddr} addr - * @param {object} options - * @param {AbortSignal} options.signal An AbortController signal - * @param {number} options.timeout The max dial time for each request in ms - * @returns {{abort: function(), promise: Promise}} An AbortableDial - */ - _abortableDial (addr, options) { - log('starting dial to %s', addr) - const controller = new AbortController() - const signals = [controller.signal] - options.signal && signals.push(options.signal) - const signal = anySignal([controller.signal, options.signal]) - - const promise = this.dialAction(addr, { signal, timeout: options.timeout }) - return { - abort: () => controller.abort(), - promise - } - } -} - -class DialResolver { - constructor () { - this.dials = new Set() - this.errors = [] - this.finished = false - this.didFlush = false - this._waiting = null - } - - /** - * Adds a dial function to the resolver. The function will be immediately - * executed and its resolution tracked. - * @async - * @param {function()} dial A function that returns an AbortableDial - * @param {function()} [finallyHandler] Called when the dial resolves or rejects - */ - async add (dial, finallyHandler) { - if (this.finished) return - const abortableDial = dial() - this.dials.add(abortableDial) try { - this._onResolve(await abortableDial.promise) - } catch (err) { - this._onReject(err) + return await pAny(this.addrs.map(async (addr, i) => { + const token = await th.shift() // get token + let conn + try { + const signal = dialAbortControllers[i].signal + conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) }) + // Remove the successful AbortController so it is no aborted + dialAbortControllers.splice(i, 1) + } catch (err) { + th.push(token) // return to token holder on error so another ma can be attempted + throw err + } + return conn + })) } finally { - this._onFinally(abortableDial) - finallyHandler && finallyHandler() + dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else + tokens.forEach(t => this.dialer.releaseToken(t)) // release tokens back to the dialer } } - - /** - * Called when a dial resolves - * @param {Connection} result - */ - _onResolve (result) { - this.result = result - } - - /** - * Called when a dial rejects - * @param {Error} err - */ - _onReject (err) { - if (err.code === AbortError.code) return - this.errors.push(err) - } - - _onFinally (dial) { - this.dials.delete(dial) - // If we have a result, or all dials have finished - if (this.result || (this._waiting && this.dials.size === 0)) { - this._onFinish() - } - } - - /** - * Called when dialing is completed, which means one of: - * 1. One dial succeeded - * 2. All dials failed - * 3. All dials were aborted - * @private - */ - _onFinish () { - this.finished = true - // Abort all remaining dials - for (const abortableDial of this.dials) { - abortableDial.abort() - } - this.dials.clear() - - // Flush must be called - if (!this._waiting) return - // If we have a result, or an abort occurred (no errors and no result) - if (this.result || this.errors.length === 0) { - this._waiting.resolve(this.result) - } else { - this._waiting.reject(new AggregateError(this.errors)) - } - } - - /** - * Flushes any remaining dials and resolves the first - * successful `Connection`. Flush should be called after all - * dials have been added. - * @returns {Promise} - */ - flush () { - if (this.finished) { - if (this.result) { - return Promise.resolve(this.result) - } else { - return Promise.reject(new AggregateError(this.errors)) - } - } - this._waiting = pDefer() - return this._waiting.promise - } } -module.exports.DialResolver = DialResolver module.exports.DialRequest = DialRequest diff --git a/src/dialer/token-holder.js b/src/dialer/token-holder.js deleted file mode 100644 index acef56e2..00000000 --- a/src/dialer/token-holder.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -/** - * @class TokenHolder - * @example - * const th = new TokenHolder(tokens, dialer.releaseToken) - * for (const action of actions) { - * const token = await th.getToken() - * action(token).then(() => th.releaseToken(token)) - * } - * - * await th.drain() - */ -class TokenHolder { - /** - * @param {Array<*>} tokens Tokens to track - * @param {function(*)} release Called when releasing control of the tokens - */ - constructor (tokens, release) { - this.originalTokens = tokens - this.tokens = [...tokens] - this._release = release - } - - /** - * Resolves a token once once is available. Once the token is no - * longer needed it MUST be release with `releaseToken()`. - * @returns {Promise<*>} - */ - getToken () { - if (this.tokens.length) return Promise.resolve(this.tokens.shift()) - return new Promise(resolve => { - const _push = this.tokens.push - this.tokens.push = (token) => { - this.tokens.push = _push - resolve(token) - } - }) - } - - /** - * Makes the token available via `getToken()` - * @param {*} token - */ - releaseToken (token) { - this.tokens.push(token) - } - - /** - * Once tokens are no longer needed for a series of actions, - * drain will release them to the owner via `this._release()` - */ - async drain () { - let drained = 0 - while (drained < this.originalTokens.length) { - this._release(await this.getToken()) - // Remove the token - drained++ - } - } -} - -module.exports.TokenHolder = TokenHolder diff --git a/test/dialing/dial-resolver.spec.js b/test/dialing/dial-resolver.spec.js deleted file mode 100644 index 105b71e2..00000000 --- a/test/dialing/dial-resolver.spec.js +++ /dev/null @@ -1,83 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-as-promised')) -const { expect } = chai -const sinon = require('sinon') -const pDefer = require('p-defer') -const pWaitFor = require('p-wait-for') -const AggregateError = require('aggregate-error') -const { AbortError } = require('libp2p-interfaces/src/transport/errors') - -const { DialResolver } = require('../../src/dialer/dial-request') - -const mockAbortableDial = () => { - const deferred = pDefer() - function dial () { - return { - promise: deferred.promise, - abort: () => deferred.reject(new AbortError()) - } - } - dial.reject = deferred.reject - dial.resolve = deferred.resolve - return dial -} - -describe('DialResolver', () => { - it('should not run subsequent dials if finished', async () => { - const deferred = pDefer() - const dial = sinon.stub().callsFake(() => { - return deferred - }) - const dialResolver = new DialResolver() - dialResolver.add(dial) - deferred.resolve(true) - - await pWaitFor(() => dialResolver.finished === true) - - dialResolver.add(dial) - expect(dial.callCount).to.equal(1) - }) - - it('.flush should throw if all dials errored', async () => { - const dialResolver = new DialResolver() - const dials = [ - mockAbortableDial(), - mockAbortableDial(), - mockAbortableDial() - ] - for (const dial of dials) { - dialResolver.add(dial) - dial.reject(new Error('transport error')) - } - - await expect(dialResolver.flush()).to.eventually.be.rejectedWith(AggregateError) - .and.to.have.nested.property('._errors.length', 3) - }) - - it('.flush should resolve the successful dial', async () => { - const dialResolver = new DialResolver() - const mockConn = {} - const dials = [ - mockAbortableDial(), - mockAbortableDial(), - mockAbortableDial() - ] - - // Make the first succeed - const successfulDial = dials.shift() - dialResolver.add(successfulDial) - successfulDial.resolve(mockConn) - - // Error the rest - for (const dial of dials) { - dialResolver.add(dial) - dial.reject(new Error('transport error')) - } - - await expect(dialResolver.flush()).to.eventually.be(mockConn) - }) -}) diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index fa903e2f..a24a39ac 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -175,8 +175,9 @@ describe('Dialing (direct, TCP)', () => { // Let the call stack run await delay(0) - // All dials should have executed - expect(localTM.dial.callCount).to.equal(3) + + // Only two dials should be executed, as the first dial will succeed + expect(localTM.dial.callCount).to.equal(2) expect(dialer.tokens).to.have.length(2) }) diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index ad538251..0b6a9276 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -53,7 +53,7 @@ describe('Dialing (direct, WebSockets)', () => { it('should limit the number of tokens it provides', () => { const dialer = new Dialer({ transportManager: localTM }) - const maxPerPeer = Constants.PER_PEER_LIMIT + const maxPerPeer = Constants.MAX_PER_PEER_DIALS expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS) const tokens = dialer.getTokens(maxPerPeer + 1) expect(tokens).to.have.length(maxPerPeer) @@ -163,8 +163,9 @@ describe('Dialing (direct, WebSockets)', () => { // Let the call stack run await delay(0) - // All dials should have executed - expect(localTM.dial.callCount).to.equal(3) + + // Only two dials will be run, as the first two succeeded + expect(localTM.dial.callCount).to.equal(2) expect(dialer.tokens).to.have.length(2) })