mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-05-04 15:02:13 +00:00
refactor: simplify DialRequest logic per feedback
This commit is contained in:
parent
b7432bd02b
commit
e1e3be8106
@ -66,6 +66,7 @@
|
|||||||
"multistream-select": "^0.15.0",
|
"multistream-select": "^0.15.0",
|
||||||
"once": "^1.4.0",
|
"once": "^1.4.0",
|
||||||
"p-any": "^2.1.0",
|
"p-any": "^2.1.0",
|
||||||
|
"p-fifo": "^1.0.0",
|
||||||
"p-map": "^3.0.0",
|
"p-map": "^3.0.0",
|
||||||
"p-queue": "^6.1.1",
|
"p-queue": "^6.1.1",
|
||||||
"p-settle": "^3.1.0",
|
"p-settle": "^3.1.0",
|
||||||
|
@ -39,8 +39,6 @@ class Dialer {
|
|||||||
this.timeout = timeout
|
this.timeout = timeout
|
||||||
this.perPeerLimit = perPeerLimit
|
this.perPeerLimit = perPeerLimit
|
||||||
this.tokens = [...new Array(concurrency)].map((_, index) => index)
|
this.tokens = [...new Array(concurrency)].map((_, index) => index)
|
||||||
|
|
||||||
this.releaseToken = this.releaseToken.bind(this)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -68,7 +66,10 @@ class Dialer {
|
|||||||
* @returns {Promise<Connection>}
|
* @returns {Promise<Connection>}
|
||||||
*/
|
*/
|
||||||
async connectToMultiaddrs (addrs, options = {}) {
|
async connectToMultiaddrs (addrs, options = {}) {
|
||||||
const dialAction = (addr, options) => this.transportManager.dial(addr, options)
|
const dialAction = (addr, options) => {
|
||||||
|
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
|
||||||
|
return this.transportManager.dial(addr, options)
|
||||||
|
}
|
||||||
const dialRequest = new DialRequest({
|
const dialRequest = new DialRequest({
|
||||||
addrs,
|
addrs,
|
||||||
dialAction,
|
dialAction,
|
||||||
|
@ -2,14 +2,13 @@
|
|||||||
|
|
||||||
const AbortController = require('abort-controller')
|
const AbortController = require('abort-controller')
|
||||||
const AggregateError = require('aggregate-error')
|
const AggregateError = require('aggregate-error')
|
||||||
const pDefer = require('p-defer')
|
|
||||||
const anySignal = require('any-signal')
|
const anySignal = require('any-signal')
|
||||||
const debug = require('debug')
|
const debug = require('debug')
|
||||||
|
const errCode = require('err-code')
|
||||||
const log = debug('libp2p:dialer:request')
|
const log = debug('libp2p:dialer:request')
|
||||||
log.error = debug('libp2p:dialer:request:error')
|
log.error = debug('libp2p:dialer:request:error')
|
||||||
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
const FIFO = require('p-fifo')
|
||||||
|
const pAny = require('p-any')
|
||||||
const { TokenHolder } = require('./token-holder')
|
|
||||||
|
|
||||||
class DialRequest {
|
class DialRequest {
|
||||||
/**
|
/**
|
||||||
@ -37,161 +36,36 @@ class DialRequest {
|
|||||||
* @returns {Connection}
|
* @returns {Connection}
|
||||||
*/
|
*/
|
||||||
async run (options) {
|
async run (options) {
|
||||||
// Determine how many tokens we need
|
const tokens = this.dialer.getTokens(this.addrs.length)
|
||||||
const tokensWanted = Math.min(this.addrs.length, this.dialer.perPeerLimit)
|
|
||||||
// Get the tokens
|
|
||||||
const tokens = this.dialer.getTokens(tokensWanted)
|
|
||||||
// If no tokens are available, throw
|
// If no tokens are available, throw
|
||||||
if (tokens.length < 1) {
|
if (tokens.length < 1) {
|
||||||
throw Object.assign(new Error('No dial tokens available'), { code: 'ERR_NO_DIAL_TOKENS' })
|
throw errCode(new Error('No dial tokens available'), 'ERR_NO_DIAL_TOKENS')
|
||||||
}
|
}
|
||||||
|
|
||||||
// For every token, run a multiaddr dial
|
const th = new FIFO()
|
||||||
// If there are tokens left, release them
|
tokens.forEach(t => th.push(t))
|
||||||
// If there are multiaddrs left, wait for tokens to finish
|
const dialAbortControllers = this.addrs.map(() => new AbortController())
|
||||||
const th = new TokenHolder(tokens, t => this.dialer.releaseToken(t))
|
|
||||||
|
|
||||||
// Create the dial functions
|
|
||||||
const dials = this.addrs.map(addr => {
|
|
||||||
return () => this._abortableDial(addr, options)
|
|
||||||
})
|
|
||||||
|
|
||||||
const dialResolver = new DialResolver()
|
|
||||||
while (dials.length > 0) {
|
|
||||||
if (dialResolver.finished) break
|
|
||||||
// Wait for the next available token
|
|
||||||
const token = await th.getToken()
|
|
||||||
const dial = dials.shift()
|
|
||||||
dialResolver.add(dial, () => th.releaseToken(token))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start giving back the tokens
|
|
||||||
th.drain()
|
|
||||||
// Flush all the dials to get the final response
|
|
||||||
return dialResolver.flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @private
|
|
||||||
* @param {Multiaddr} addr
|
|
||||||
* @param {object} options
|
|
||||||
* @param {AbortSignal} options.signal An AbortController signal
|
|
||||||
* @param {number} options.timeout The max dial time for each request in ms
|
|
||||||
* @returns {{abort: function(), promise: Promise<Connection>}} An AbortableDial
|
|
||||||
*/
|
|
||||||
_abortableDial (addr, options) {
|
|
||||||
log('starting dial to %s', addr)
|
|
||||||
const controller = new AbortController()
|
|
||||||
const signals = [controller.signal]
|
|
||||||
options.signal && signals.push(options.signal)
|
|
||||||
const signal = anySignal([controller.signal, options.signal])
|
|
||||||
|
|
||||||
const promise = this.dialAction(addr, { signal, timeout: options.timeout })
|
|
||||||
return {
|
|
||||||
abort: () => controller.abort(),
|
|
||||||
promise
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class DialResolver {
|
|
||||||
constructor () {
|
|
||||||
this.dials = new Set()
|
|
||||||
this.errors = []
|
|
||||||
this.finished = false
|
|
||||||
this.didFlush = false
|
|
||||||
this._waiting = null
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a dial function to the resolver. The function will be immediately
|
|
||||||
* executed and its resolution tracked.
|
|
||||||
* @async
|
|
||||||
* @param {function()} dial A function that returns an AbortableDial
|
|
||||||
* @param {function()} [finallyHandler] Called when the dial resolves or rejects
|
|
||||||
*/
|
|
||||||
async add (dial, finallyHandler) {
|
|
||||||
if (this.finished) return
|
|
||||||
const abortableDial = dial()
|
|
||||||
this.dials.add(abortableDial)
|
|
||||||
try {
|
try {
|
||||||
this._onResolve(await abortableDial.promise)
|
return await pAny(this.addrs.map(async (addr, i) => {
|
||||||
} catch (err) {
|
const token = await th.shift() // get token
|
||||||
this._onReject(err)
|
let conn
|
||||||
|
try {
|
||||||
|
const signal = dialAbortControllers[i].signal
|
||||||
|
conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) })
|
||||||
|
// Remove the successful AbortController so it is no aborted
|
||||||
|
dialAbortControllers.splice(i, 1)
|
||||||
|
} catch (err) {
|
||||||
|
th.push(token) // return to token holder on error so another ma can be attempted
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
return conn
|
||||||
|
}))
|
||||||
} finally {
|
} finally {
|
||||||
this._onFinally(abortableDial)
|
dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else
|
||||||
finallyHandler && finallyHandler()
|
tokens.forEach(t => this.dialer.releaseToken(t)) // release tokens back to the dialer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when a dial resolves
|
|
||||||
* @param {Connection} result
|
|
||||||
*/
|
|
||||||
_onResolve (result) {
|
|
||||||
this.result = result
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when a dial rejects
|
|
||||||
* @param {Error} err
|
|
||||||
*/
|
|
||||||
_onReject (err) {
|
|
||||||
if (err.code === AbortError.code) return
|
|
||||||
this.errors.push(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_onFinally (dial) {
|
|
||||||
this.dials.delete(dial)
|
|
||||||
// If we have a result, or all dials have finished
|
|
||||||
if (this.result || (this._waiting && this.dials.size === 0)) {
|
|
||||||
this._onFinish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Called when dialing is completed, which means one of:
|
|
||||||
* 1. One dial succeeded
|
|
||||||
* 2. All dials failed
|
|
||||||
* 3. All dials were aborted
|
|
||||||
* @private
|
|
||||||
*/
|
|
||||||
_onFinish () {
|
|
||||||
this.finished = true
|
|
||||||
// Abort all remaining dials
|
|
||||||
for (const abortableDial of this.dials) {
|
|
||||||
abortableDial.abort()
|
|
||||||
}
|
|
||||||
this.dials.clear()
|
|
||||||
|
|
||||||
// Flush must be called
|
|
||||||
if (!this._waiting) return
|
|
||||||
// If we have a result, or an abort occurred (no errors and no result)
|
|
||||||
if (this.result || this.errors.length === 0) {
|
|
||||||
this._waiting.resolve(this.result)
|
|
||||||
} else {
|
|
||||||
this._waiting.reject(new AggregateError(this.errors))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flushes any remaining dials and resolves the first
|
|
||||||
* successful `Connection`. Flush should be called after all
|
|
||||||
* dials have been added.
|
|
||||||
* @returns {Promise<Connection>}
|
|
||||||
*/
|
|
||||||
flush () {
|
|
||||||
if (this.finished) {
|
|
||||||
if (this.result) {
|
|
||||||
return Promise.resolve(this.result)
|
|
||||||
} else {
|
|
||||||
return Promise.reject(new AggregateError(this.errors))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this._waiting = pDefer()
|
|
||||||
return this._waiting.promise
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports.DialResolver = DialResolver
|
|
||||||
module.exports.DialRequest = DialRequest
|
module.exports.DialRequest = DialRequest
|
||||||
|
@ -1,63 +0,0 @@
|
|||||||
'use strict'
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @class TokenHolder
|
|
||||||
* @example
|
|
||||||
* const th = new TokenHolder(tokens, dialer.releaseToken)
|
|
||||||
* for (const action of actions) {
|
|
||||||
* const token = await th.getToken()
|
|
||||||
* action(token).then(() => th.releaseToken(token))
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* await th.drain()
|
|
||||||
*/
|
|
||||||
class TokenHolder {
|
|
||||||
/**
|
|
||||||
* @param {Array<*>} tokens Tokens to track
|
|
||||||
* @param {function(*)} release Called when releasing control of the tokens
|
|
||||||
*/
|
|
||||||
constructor (tokens, release) {
|
|
||||||
this.originalTokens = tokens
|
|
||||||
this.tokens = [...tokens]
|
|
||||||
this._release = release
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolves a token once once is available. Once the token is no
|
|
||||||
* longer needed it MUST be release with `releaseToken()`.
|
|
||||||
* @returns {Promise<*>}
|
|
||||||
*/
|
|
||||||
getToken () {
|
|
||||||
if (this.tokens.length) return Promise.resolve(this.tokens.shift())
|
|
||||||
return new Promise(resolve => {
|
|
||||||
const _push = this.tokens.push
|
|
||||||
this.tokens.push = (token) => {
|
|
||||||
this.tokens.push = _push
|
|
||||||
resolve(token)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Makes the token available via `getToken()`
|
|
||||||
* @param {*} token
|
|
||||||
*/
|
|
||||||
releaseToken (token) {
|
|
||||||
this.tokens.push(token)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Once tokens are no longer needed for a series of actions,
|
|
||||||
* drain will release them to the owner via `this._release()`
|
|
||||||
*/
|
|
||||||
async drain () {
|
|
||||||
let drained = 0
|
|
||||||
while (drained < this.originalTokens.length) {
|
|
||||||
this._release(await this.getToken())
|
|
||||||
// Remove the token
|
|
||||||
drained++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports.TokenHolder = TokenHolder
|
|
@ -1,83 +0,0 @@
|
|||||||
'use strict'
|
|
||||||
/* eslint-env mocha */
|
|
||||||
|
|
||||||
const chai = require('chai')
|
|
||||||
chai.use(require('dirty-chai'))
|
|
||||||
chai.use(require('chai-as-promised'))
|
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
|
||||||
const pDefer = require('p-defer')
|
|
||||||
const pWaitFor = require('p-wait-for')
|
|
||||||
const AggregateError = require('aggregate-error')
|
|
||||||
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
|
||||||
|
|
||||||
const { DialResolver } = require('../../src/dialer/dial-request')
|
|
||||||
|
|
||||||
const mockAbortableDial = () => {
|
|
||||||
const deferred = pDefer()
|
|
||||||
function dial () {
|
|
||||||
return {
|
|
||||||
promise: deferred.promise,
|
|
||||||
abort: () => deferred.reject(new AbortError())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dial.reject = deferred.reject
|
|
||||||
dial.resolve = deferred.resolve
|
|
||||||
return dial
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('DialResolver', () => {
|
|
||||||
it('should not run subsequent dials if finished', async () => {
|
|
||||||
const deferred = pDefer()
|
|
||||||
const dial = sinon.stub().callsFake(() => {
|
|
||||||
return deferred
|
|
||||||
})
|
|
||||||
const dialResolver = new DialResolver()
|
|
||||||
dialResolver.add(dial)
|
|
||||||
deferred.resolve(true)
|
|
||||||
|
|
||||||
await pWaitFor(() => dialResolver.finished === true)
|
|
||||||
|
|
||||||
dialResolver.add(dial)
|
|
||||||
expect(dial.callCount).to.equal(1)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('.flush should throw if all dials errored', async () => {
|
|
||||||
const dialResolver = new DialResolver()
|
|
||||||
const dials = [
|
|
||||||
mockAbortableDial(),
|
|
||||||
mockAbortableDial(),
|
|
||||||
mockAbortableDial()
|
|
||||||
]
|
|
||||||
for (const dial of dials) {
|
|
||||||
dialResolver.add(dial)
|
|
||||||
dial.reject(new Error('transport error'))
|
|
||||||
}
|
|
||||||
|
|
||||||
await expect(dialResolver.flush()).to.eventually.be.rejectedWith(AggregateError)
|
|
||||||
.and.to.have.nested.property('._errors.length', 3)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('.flush should resolve the successful dial', async () => {
|
|
||||||
const dialResolver = new DialResolver()
|
|
||||||
const mockConn = {}
|
|
||||||
const dials = [
|
|
||||||
mockAbortableDial(),
|
|
||||||
mockAbortableDial(),
|
|
||||||
mockAbortableDial()
|
|
||||||
]
|
|
||||||
|
|
||||||
// Make the first succeed
|
|
||||||
const successfulDial = dials.shift()
|
|
||||||
dialResolver.add(successfulDial)
|
|
||||||
successfulDial.resolve(mockConn)
|
|
||||||
|
|
||||||
// Error the rest
|
|
||||||
for (const dial of dials) {
|
|
||||||
dialResolver.add(dial)
|
|
||||||
dial.reject(new Error('transport error'))
|
|
||||||
}
|
|
||||||
|
|
||||||
await expect(dialResolver.flush()).to.eventually.be(mockConn)
|
|
||||||
})
|
|
||||||
})
|
|
@ -175,8 +175,9 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
|
|
||||||
// Let the call stack run
|
// Let the call stack run
|
||||||
await delay(0)
|
await delay(0)
|
||||||
// All dials should have executed
|
|
||||||
expect(localTM.dial.callCount).to.equal(3)
|
// Only two dials should be executed, as the first dial will succeed
|
||||||
|
expect(localTM.dial.callCount).to.equal(2)
|
||||||
expect(dialer.tokens).to.have.length(2)
|
expect(dialer.tokens).to.have.length(2)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
|
|
||||||
it('should limit the number of tokens it provides', () => {
|
it('should limit the number of tokens it provides', () => {
|
||||||
const dialer = new Dialer({ transportManager: localTM })
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
const maxPerPeer = Constants.PER_PEER_LIMIT
|
const maxPerPeer = Constants.MAX_PER_PEER_DIALS
|
||||||
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS)
|
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS)
|
||||||
const tokens = dialer.getTokens(maxPerPeer + 1)
|
const tokens = dialer.getTokens(maxPerPeer + 1)
|
||||||
expect(tokens).to.have.length(maxPerPeer)
|
expect(tokens).to.have.length(maxPerPeer)
|
||||||
@ -163,8 +163,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
|
|
||||||
// Let the call stack run
|
// Let the call stack run
|
||||||
await delay(0)
|
await delay(0)
|
||||||
// All dials should have executed
|
|
||||||
expect(localTM.dial.callCount).to.equal(3)
|
// Only two dials will be run, as the first two succeeded
|
||||||
|
expect(localTM.dial.callCount).to.equal(2)
|
||||||
expect(dialer.tokens).to.have.length(2)
|
expect(dialer.tokens).to.have.length(2)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user