js-libp2p/src/index.js

275 lines
7.5 KiB
JavaScript
Raw Normal View History

2016-03-23 15:30:14 +01:00
'use strict'
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const series = require('async/series')
2016-11-26 03:07:52 +01:00
const PeerBook = require('peer-book')
2018-02-07 07:48:37 +00:00
const Switch = require('libp2p-switch')
const Ping = require('libp2p-ping')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const getPeerInfo = require('./get-peer-info')
2016-11-26 03:07:52 +01:00
exports = module.exports
const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet'
2016-11-26 03:07:52 +01:00
2017-03-27 12:26:34 +01:00
class Node extends EventEmitter {
2016-12-11 12:03:21 -08:00
constructor (_modules, _peerInfo, _peerBook, _options) {
2017-03-27 12:26:34 +01:00
super()
2016-11-26 03:07:52 +01:00
assert(_modules, 'requires modules to equip libp2p with features')
assert(_peerInfo, 'requires a PeerInfo instance')
this.modules = _modules
this.peerInfo = _peerInfo
this.peerBook = _peerBook || new PeerBook()
_options = _options || {}
this._isStarted = false
2016-11-26 03:07:52 +01:00
2018-02-07 07:23:08 +00:00
this.switch = new Switch(this.peerInfo, this.peerBook)
2016-11-26 03:07:52 +01:00
// Attach stream multiplexers
2017-07-06 14:26:20 +01:00
if (this.modules.connection && this.modules.connection.muxer) {
2016-11-26 03:07:52 +01:00
let muxers = this.modules.connection.muxer
muxers = Array.isArray(muxers) ? muxers : [muxers]
2018-02-07 07:23:08 +00:00
muxers.forEach((muxer) => this.switch.connection.addStreamMuxer(muxer))
2016-11-26 03:07:52 +01:00
// If muxer exists, we can use Identify
2018-02-07 07:23:08 +00:00
this.switch.connection.reuse()
2016-11-26 03:07:52 +01:00
// If muxer exists, we can use Relay for listening/dialing
2018-02-07 07:23:08 +00:00
this.switch.connection.enableCircuitRelay(_options.relay)
// Received incommind dial and muxer upgrade happened,
// reuse this muxed connection
2018-02-07 07:23:08 +00:00
this.switch.on('peer-mux-established', (peerInfo) => {
this.emit('peer:connect', peerInfo)
2016-11-26 03:07:52 +01:00
this.peerBook.put(peerInfo)
})
2018-02-07 07:23:08 +00:00
this.switch.on('peer-mux-closed', (peerInfo) => {
this.emit('peer:disconnect', peerInfo)
2016-11-26 03:07:52 +01:00
})
}
// Attach crypto channels
2017-07-06 14:26:20 +01:00
if (this.modules.connection && this.modules.connection.crypto) {
2016-11-26 03:07:52 +01:00
let cryptos = this.modules.connection.crypto
cryptos = Array.isArray(cryptos) ? cryptos : [cryptos]
cryptos.forEach((crypto) => {
2018-02-07 07:23:08 +00:00
this.switch.connection.crypto(crypto.tag, crypto.encrypt)
2016-11-26 03:07:52 +01:00
})
}
// Attach discovery mechanisms
2017-01-28 20:59:47 +00:00
if (this.modules.discovery) {
2016-11-26 03:07:52 +01:00
let discoveries = this.modules.discovery
discoveries = Array.isArray(discoveries) ? discoveries : [discoveries]
2016-11-26 03:07:52 +01:00
discoveries.forEach((discovery) => {
discovery.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
2016-11-26 03:07:52 +01:00
})
}
// dht provided components (peerRouting, contentRouting, dht)
if (_modules.DHT) {
2018-02-07 07:23:08 +00:00
this._dht = new this.modules.DHT(this.switch, {
2017-07-20 10:20:10 -07:00
kBucketSize: 20,
datastore: _options.DHT && _options.DHT.datastore
2017-07-20 10:20:10 -07:00
})
}
2018-02-07 07:48:37 +00:00
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)
2018-02-07 07:48:37 +00:00
this._getPeerInfo = getPeerInfo(this)
2018-02-07 07:48:37 +00:00
// Mount default protocols
Ping.mount(this.switch)
2016-11-26 03:07:52 +01:00
}
/*
* Start the libp2p node
* - create listeners on the multiaddrs the Peer wants to listen
*/
start (callback) {
if (!this.modules.transport) {
return callback(new Error('no transports were present'))
}
let ws
2016-11-26 03:07:52 +01:00
let transports = this.modules.transport
2016-11-26 03:07:52 +01:00
transports = Array.isArray(transports) ? transports : [transports]
// so that we can have webrtc-star addrs without adding manually the id
const maOld = []
const maNew = []
this.peerInfo.multiaddrs.toArray().forEach((ma) => {
if (!ma.getPeerId()) {
maOld.push(ma)
maNew.push(ma.encapsulate('/ipfs/' + this.peerInfo.id.toB58String()))
}
})
this.peerInfo.multiaddrs.replace(maOld, maNew)
const multiaddrs = this.peerInfo.multiaddrs.toArray()
2016-11-26 03:07:52 +01:00
transports.forEach((transport) => {
if (transport.filter(multiaddrs).length > 0) {
2018-02-07 07:23:08 +00:00
this.switch.transport.add(
2016-11-26 03:07:52 +01:00
transport.tag || transport.constructor.name, transport)
} else if (transport.constructor &&
transport.constructor.name === 'WebSockets') {
// TODO find a cleaner way to signal that a transport is always
// used for dialing, even if no listener
ws = transport
2016-11-26 03:07:52 +01:00
}
})
series([
2018-02-07 07:23:08 +00:00
(cb) => this.switch.start(cb),
(cb) => {
if (ws) {
// always add dialing on websockets
2018-02-07 07:23:08 +00:00
this.switch.transport.add(ws.tag || ws.constructor.name, ws)
}
// all transports need to be setup before discover starts
if (this.modules.discovery) {
return each(this.modules.discovery, (d, cb) => d.start(cb), cb)
}
cb()
},
(cb) => {
// TODO: chicken-and-egg problem:
// have to set started here because DHT requires libp2p is already started
this._isStarted = true
if (this._dht) {
return this._dht.start(cb)
}
cb()
},
(cb) => {
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()
transports.forEach((transport) => {
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!transports.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
})
cb()
},
(cb) => {
this.emit('start')
cb()
}
], callback)
}
2016-11-26 03:07:52 +01:00
/*
* Stop the libp2p node by closing its listeners and open connections
*/
stop (callback) {
2017-01-28 20:59:47 +00:00
if (this.modules.discovery) {
this.modules.discovery.forEach((discovery) => {
setImmediate(() => discovery.stop(() => {}))
})
}
series([
(cb) => {
if (this._dht) {
return this._dht.stop(cb)
}
cb()
},
2018-02-07 07:23:08 +00:00
(cb) => this.switch.stop(cb),
(cb) => {
this.emit('stop')
cb()
}
], (err) => {
this._isStarted = false
callback(err)
})
}
isStarted () {
return this._isStarted
2016-12-01 11:53:27 +00:00
}
2018-02-07 07:48:37 +00:00
dial (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2018-02-07 07:48:37 +00:00
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
2018-02-07 07:48:37 +00:00
this.switch.dial(peerInfo, (err, conn) => {
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
callback(null, conn)
})
})
2016-12-01 11:53:27 +00:00
}
2018-02-07 07:48:37 +00:00
dialProtocol (peer, protocol, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2016-11-26 03:07:52 +01:00
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
2018-02-07 07:23:08 +00:00
this.switch.dial(peerInfo, protocol, (err, conn) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
callback(null, conn)
})
2016-11-26 03:07:52 +01:00
})
}
2017-03-27 12:26:34 +01:00
hangUp (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2016-11-26 03:07:52 +01:00
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
2018-02-07 07:23:08 +00:00
this.switch.hangUp(peerInfo, callback)
})
2016-11-26 03:07:52 +01:00
}
2018-02-07 07:48:37 +00:00
ping (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
callback(null, new Ping(this.switch, peerInfo))
})
}
2016-11-26 03:07:52 +01:00
handle (protocol, handlerFunc, matchFunc) {
2018-02-07 07:23:08 +00:00
this.switch.handle(protocol, handlerFunc, matchFunc)
2016-11-26 03:07:52 +01:00
}
unhandle (protocol) {
2018-02-07 07:23:08 +00:00
this.switch.unhandle(protocol)
2016-11-26 03:07:52 +01:00
}
}
module.exports = Node