Jacob Heun 5e1dbc21a2 refactor(async): add dialer and upgrader (#462)
* chore(deps): update connection and multistream

* feat: add basic dial support for addresses and peers

* test: automatically require all node test files

* fix: dont catch and log in the wrong place

* test: add direct spec test

fix: improve dial error consistency

* feat: add dial timeouts and concurrency

Queue timeouts will result in aborts of the dials

* chore: fix linting

* test: verify dialer defaults

* feat: add initial upgrader

* fix: add more test coverage and fix bugs

* feat: libp2p creates the upgrader

* feat: hook up handle to the upgrader

* feat: hook up the dialer to libp2p

test: add node dialer libp2p tests

* feat: add connection listeners to upgrader

* feat: emit connect and disconnect events

* chore: use libp2p-interfaces

* fix: address review feedback

* fix: correct import

* refactor: dedupe connection creation code
2020-01-24 14:40:40 +01:00

110 lines
2.6 KiB
JavaScript

'use strict'
const { Connection } = require('libp2p-interfaces/src/connection')
const pull = require('pull-stream/pull')
const empty = require('pull-stream/sources/empty')
const timeout = require('async/timeout')
const queue = require('async/queue')
const debug = require('debug')
const once = require('once')
const log = debug('libp2p:switch:dialer:queue')
log.error = debug('libp2p:switch:dialer:queue:error')
/**
* Queue up the amount of dials to a given peer.
*/
class DialQueue {
/**
* Create a new dial queue.
*
* @param {number} limit
* @param {number} dialTimeout
*/
constructor (limit, dialTimeout) {
this.dialTimeout = dialTimeout
this.queue = queue((task, cb) => {
this._doWork(task.transport, task.addr, task.token, cb)
}, limit)
}
/**
* The actual work done by the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
* @private
*/
_doWork (transport, addr, token, callback) {
callback = once(callback)
log('work:start')
this._dialWithTimeout(transport, addr, (err, conn) => {
if (err) {
log.error(`${transport.constructor.name}:work`, err)
return callback(err)
}
if (token.cancel) {
log('work:cancel')
// clean up already done dials
pull(empty(), conn)
// If we can close the connection, do it
if (typeof conn.close === 'function') {
return conn.close((_) => callback(null))
}
return callback(null)
}
// one is enough
token.cancel = true
log('work:success')
const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
callback(null, { multiaddr: addr, conn: conn })
})
}
/**
* Dial the given transport, timing out with the set timeout.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {function(Error, Connection)} callback
* @returns {void}
*
* @private
*/
_dialWithTimeout (transport, addr, callback) {
timeout((cb) => {
const conn = transport.dial(addr, (err) => {
if (err) {
return cb(err)
}
cb(null, conn)
})
}, this.dialTimeout)(callback)
}
/**
* Add new work to the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
*/
push (transport, addr, token, callback) {
this.queue.push({ transport, addr, token }, callback)
}
}
module.exports = DialQueue