js-libp2p/src/index.js

356 lines
9.6 KiB
JavaScript
Raw Normal View History

2016-03-23 15:30:14 +01:00
'use strict'
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const each = require('async/each')
const series = require('async/series')
const parallel = require('async/parallel')
2016-11-26 03:07:52 +01:00
const PeerBook = require('peer-book')
2018-02-07 07:48:37 +00:00
const Switch = require('libp2p-switch')
const Ping = require('libp2p-ping')
2018-04-03 15:51:05 +01:00
const WebSockets = require('libp2p-websockets')
const ConnectionManager = require('libp2p-connection-manager')
2018-02-07 07:48:37 +00:00
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
2018-02-14 11:30:36 +01:00
const pubsub = require('./pubsub')
2018-02-07 07:48:37 +00:00
const getPeerInfo = require('./get-peer-info')
const validateConfig = require('./config').validate
2016-11-26 03:07:52 +01:00
exports = module.exports
const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet'
2016-11-26 03:07:52 +01:00
2017-03-27 12:26:34 +01:00
class Node extends EventEmitter {
constructor (_options) {
2017-03-27 12:26:34 +01:00
super()
// validateConfig will ensure the config is correct,
// and add default values where appropriate
_options = validateConfig(_options)
2016-11-26 03:07:52 +01:00
this.peerInfo = _options.peerInfo
this.peerBook = _options.peerBook || new PeerBook()
this._modules = _options.modules
this._config = _options.config
this._isStarted = false
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references
2016-11-26 03:07:52 +01:00
this._switch = new Switch(this.peerInfo, this.peerBook, _options.switch)
this.stats = this._switch.stats
this.connectionManager = new ConnectionManager(this, _options.connectionManager)
2016-11-26 03:07:52 +01:00
// Attach stream multiplexers
if (this._modules.streamMuxer) {
let muxers = this._modules.streamMuxer
muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer))
2016-11-26 03:07:52 +01:00
// If muxer exists
// we can use Identify
this._switch.connection.reuse()
// we can use Relay for listening/dialing
this._switch.connection.enableCircuitRelay(this._config.relay)
2016-11-26 03:07:52 +01:00
// Received incomming dial and muxer upgrade happened,
// reuse this muxed connection
this._switch.on('peer-mux-established', (peerInfo) => {
this.emit('peer:connect', peerInfo)
2016-11-26 03:07:52 +01:00
this.peerBook.put(peerInfo)
})
this._switch.on('peer-mux-closed', (peerInfo) => {
this.emit('peer:disconnect', peerInfo)
2016-11-26 03:07:52 +01:00
})
}
// Attach crypto channels
if (this._modules.connEncryption) {
let cryptos = this._modules.connEncryption
2016-11-26 03:07:52 +01:00
cryptos.forEach((crypto) => {
this._switch.connection.crypto(crypto.tag, crypto.encrypt)
2016-11-26 03:07:52 +01:00
})
}
// Attach private network protector
if (this._modules.connProtector) {
this._switch.protector = this._modules.connProtector
} else if (process.env.LIBP2P_FORCE_PNET) {
throw new Error('Private network is enforced, but no protector was provided')
}
// dht provided components (peerRouting, contentRouting, dht)
if (this._config.EXPERIMENTAL.dht) {
const DHT = this._modules.dht
const enabledDiscovery = this._config.dht.enabledDiscovery !== false
this._dht = new DHT(this._switch, {
kBucketSize: this._config.dht.kBucketSize || 20,
enabledDiscovery,
// TODO make datastore an option of libp2p itself so
// that other things can use it as well
datastore: dht.datastore
2016-11-26 03:07:52 +01:00
})
}
// enable/disable pubsub
if (this._config.EXPERIMENTAL.pubsub) {
this.pubsub = pubsub(this)
}
// Attach remaining APIs
2018-02-07 07:48:37 +00:00
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)
2018-02-07 07:48:37 +00:00
this._getPeerInfo = getPeerInfo(this)
2018-02-07 07:48:37 +00:00
// Mount default protocols
Ping.mount(this._switch)
2016-11-26 03:07:52 +01:00
}
/*
* Start the libp2p node
* - create listeners on the multiaddrs the Peer wants to listen
*/
start (callback) {
if (!this._modules.transport) {
2016-11-26 03:07:52 +01:00
return callback(new Error('no transports were present'))
}
let ws
// so that we can have webrtc-star addrs without adding manually the id
const maOld = []
const maNew = []
this.peerInfo.multiaddrs.toArray().forEach((ma) => {
if (!ma.getPeerId()) {
maOld.push(ma)
maNew.push(ma.encapsulate('/ipfs/' + this.peerInfo.id.toB58String()))
}
})
this.peerInfo.multiaddrs.replace(maOld, maNew)
const multiaddrs = this.peerInfo.multiaddrs.toArray()
this._modules.transport.forEach((Transport) => {
let t
if (typeof Transport === 'function') {
t = new Transport()
} else {
t = Transport
}
if (t.filter(multiaddrs).length > 0) {
this._switch.transport.add(t.tag || t.constructor.name, t)
} else if (WebSockets.isWebSockets(t)) {
// TODO find a cleaner way to signal that a transport is always used
// for dialing, even if no listener
ws = t
2016-11-26 03:07:52 +01:00
}
this._transport.push(t)
2016-11-26 03:07:52 +01:00
})
series([
(cb) => {
this.connectionManager.start()
this._switch.start(cb)
},
(cb) => {
if (ws) {
// always add dialing on websockets
this._switch.transport.add(ws.tag || ws.constructor.name, ws)
}
// all transports need to be setup before discover starts
if (this._modules.peerDiscovery) {
each(this._modules.peerDiscovery, (D, _cb) => {
let config = {}
if (D.tag &&
this._config.peerDiscovery &&
this._config.peerDiscovery[D.tag]) {
config = this._config.peerDiscovery[D.tag]
}
// If not configured to be enabled/disabled then enable by default
const enabled = config.enabled == null ? true : config.enabled
// If enabled then start it
if (enabled) {
let d
if (typeof D === 'function') {
d = new D(Object.assign({}, config, { peerInfo: this.peerInfo }))
} else {
d = D
}
d.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
this._discovery.push(d)
d.start(_cb)
} else {
_cb()
}
}, cb)
} else {
cb()
}
},
(cb) => {
2018-02-14 11:30:36 +01:00
// TODO: chicken-and-egg problem #1:
// have to set started here because DHT requires libp2p is already started
this._isStarted = true
if (this._dht) {
2018-02-14 11:30:36 +01:00
this._dht.start(cb)
} else {
cb()
}
},
2018-02-14 11:30:36 +01:00
(cb) => {
// TODO: chicken-and-egg problem #2:
// have to set started here because FloodSub requires libp2p is already started
if (this._floodSub) {
2018-02-14 11:30:36 +01:00
this._floodSub.start(cb)
} else {
cb()
}
},
(cb) => {
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()
2018-02-14 11:30:36 +01:00
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!this._transport.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
cb()
},
(cb) => {
this.emit('start')
cb()
}
], callback)
}
2016-11-26 03:07:52 +01:00
/*
* Stop the libp2p node by closing its listeners and open connections
*/
stop (callback) {
series([
2018-02-14 11:30:36 +01:00
(cb) => {
if (this._modules.peerDiscovery) {
// stop all discoveries before continuing with shutdown
return parallel(
this._discovery.map((d) => {
return (_cb) => d.stop(() => { _cb() })
}),
cb
)
2018-02-14 11:30:36 +01:00
}
cb()
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
}
cb()
2018-02-14 11:30:36 +01:00
},
(cb) => {
if (this._dht) {
return this._dht.stop(cb)
}
cb()
},
(cb) => {
this.connectionManager.stop()
this._switch.stop(cb)
},
(cb) => {
this.emit('stop')
cb()
}
], (err) => {
this._isStarted = false
callback(err)
})
}
isStarted () {
return this._isStarted
2016-12-01 11:53:27 +00:00
}
2018-02-07 07:48:37 +00:00
dial (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2018-02-07 07:48:37 +00:00
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this._switch.dial(peerInfo, (err) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
2018-02-07 08:22:03 +00:00
callback()
2018-02-07 07:48:37 +00:00
})
})
2016-12-01 11:53:27 +00:00
}
2018-02-07 07:48:37 +00:00
dialProtocol (peer, protocol, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2016-11-26 03:07:52 +01:00
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this._switch.dial(peerInfo, protocol, (err, conn) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
callback(null, conn)
})
2016-11-26 03:07:52 +01:00
})
}
2017-03-27 12:26:34 +01:00
hangUp (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
2016-11-26 03:07:52 +01:00
this._getPeerInfo(peer, (err, peerInfo) => {
2018-02-07 07:48:37 +00:00
if (err) { return callback(err) }
this._switch.hangUp(peerInfo, callback)
})
2016-11-26 03:07:52 +01:00
}
2018-02-07 07:48:37 +00:00
ping (peer, callback) {
if (!this.isStarted()) {
return callback(new Error(NOT_STARTED_ERROR_MESSAGE))
}
2018-02-07 07:48:37 +00:00
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
callback(null, new Ping(this._switch, peerInfo))
2018-02-07 07:48:37 +00:00
})
}
2016-11-26 03:07:52 +01:00
handle (protocol, handlerFunc, matchFunc) {
this._switch.handle(protocol, handlerFunc, matchFunc)
2016-11-26 03:07:52 +01:00
}
unhandle (protocol) {
this._switch.unhandle(protocol)
2016-11-26 03:07:52 +01:00
}
}
module.exports = Node