feat: create base libp2p.Node class

This commit is contained in:
David Dias 2016-11-26 03:07:52 +01:00
parent 57c85fe346
commit 5d0b25cb00
2 changed files with 239 additions and 10 deletions

View File

@ -37,5 +37,13 @@
"chai": "^3.5.0",
"aegir": "^9.1.2",
"pre-commit": "^1.1.1"
},
"dependencies": {
"libp2p-swarm": "^0.26.3",
"mafmt": "^2.1.2",
"multiaddr": "^2.1.1",
"peer-book": "^0.3.0",
"peer-id": "^0.8.0",
"peer-info": "^0.8.1"
}
}

View File

@ -1,19 +1,240 @@
'use strict'
exports = module.exports = Libp2p
const Swarm = require('libp2p-swarm')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const PeerBook = require('peer-book')
const multiaddr = require('multiaddr')
const mafmt = require('mafmt')
const EE = require('events').EventEmitter
const assert = require('assert')
function Libp2p (swarm, peerRouting, recordStore) {
var self = this
exports = module.exports
if (!(self instanceof Libp2p)) {
throw new Error('Must be called with new')
const OFFLINE_ERROR_MESSAGE = 'The libp2p node is not started yet'
const IPFS_CODE = 421
class Node {
constructor (_modules, _peerInfo, _peerBook) {
assert(_modules, 'requires modules to equip libp2p with features')
assert(_peerInfo, 'requires a PeerInfo instance')
this.modules = _modules
this.peerInfo = _peerInfo
this.peerBook = _peerBook || new PeerBook()
this.isOnline = false
this.discovery = new EE()
this.swarm = new Swarm(this.peerInfo)
// Attach stream multiplexers
if (this.modules.connection.muxer) {
let muxers = this.modules.connection.muxer
muxers = Array.isArray(muxers) ? muxers : [muxers]
muxers.forEach((muxer) => {
this.swarm.connection.addStreamMuxer(muxer)
})
// If muxer exists, we can use Identify
this.swarm.connection.reuse()
// Received incommind dial and muxer upgrade happened, reuse this
// muxed connection
this.swarm.on('peer-mux-established', (peerInfo) => {
this.peerBook.put(peerInfo)
})
this.swarm.on('peer-mux-closed', (peerInfo) => {
this.peerBook.removeByB58String(peerInfo.id.toB58String())
})
}
// Attach crypto channels
if (this.modules.connection.crypto) {
let cryptos = this.modules.connection.crypto
cryptos = Array.isArray(cryptos) ? cryptos : [cryptos]
cryptos.forEach((crypto) => {
this.swarm.connection.crypto(crypto.tag, crypto.encrypt)
})
}
// Attach discovery mechanisms
if (this.discovery) {
let discoveries = this.modules.discovery
discoveries = Array.isArray(discoveries) ? discoveries : [discoveries]
discoveries.forEach((discovery) => {
discovery.on('peer', (peerInfo) => {
this.discovery.emit('peer', peerInfo)
})
})
}
// Not fully implemented in js-libp2p yet
this.routing = undefined
this.records = undefined
}
if (!swarm) {
throw new Error('a swarm must be passed')
/*
* Start the libp2p node
* - create listeners on the multiaddrs the Peer wants to listen
*/
start (callback) {
if (!this.modules.transport) {
return callback(new Error('no transports were present'))
}
let transports = this.modules.transport
transports = Array.isArray(transports) ? transports : [transports]
const multiaddrs = this.peerInfo.multiaddrs
transports.forEach((transport) => {
if (transport.filter(multiaddrs).length > 0) {
this.swarm.transport.add(
transport.tag || transport.constructor.name, transport)
}
})
this.swarm.listen((err) => {
if (err) {
return callback(err)
}
this.isOnline = true
callback()
})
}
self.swarm = swarm
self.routing = peerRouting
self.record = recordStore
/*
* Stop the libp2p node by closing its listeners and open connections
*/
stop (callback) {
this.isOnline = false
this.swarm.close(callback)
}
//
// Dialing methods
//
// TODO
dialById (id, protocol, callback) {
// NOTE: dialById only works if a previous dial was made. This will
// change once we have PeerRouting
assert(this.isOnline, OFFLINE_ERROR_MESSAGE)
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
callback(new Error('not implemented yet'))
}
dialByMultiaddr (maddr, protocol, callback) {
assert(this.isOnline, OFFLINE_ERROR_MESSAGE)
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
if (typeof maddr === 'string') {
maddr = multiaddr(maddr)
}
if (!mafmt.IPFS.matches(maddr.toString())) {
return callback(new Error('multiaddr not valid'))
}
const ipfsIdB58String = maddr.stringTuples().filter((tuple) => {
if (tuple[0] === IPFS_CODE) {
return true
}
})[0][1]
let peer
try {
peer = this.peerBook.getByB58String(ipfsIdB58String)
} catch (err) {
peer = new PeerInfo(PeerId.createFromB58String(ipfsIdB58String))
}
peer.multiaddr.add(maddr)
this.dialByPeerInfo(peer, protocol, callback)
}
dialByPeerInfo (peer, protocol, callback) {
assert(this.isOnline, OFFLINE_ERROR_MESSAGE)
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
this.swarm.dial(peer, protocol, (err, conn) => {
if (err) {
return callback(err)
}
this.peerBook.put(peer)
callback(null, conn)
})
}
//
// Disconnecting (hangUp) methods
//
hangUpById (id, callback) {
// TODO
callback(new Error('not implemented yet'))
}
hangUpByMultiaddr (maddr, callback) {
assert(this.isOnline, OFFLINE_ERROR_MESSAGE)
if (typeof maddr === 'string') {
maddr = multiaddr(maddr)
}
if (!mafmt.IPFS.matches(maddr.toString())) {
return callback(new Error('multiaddr not valid'))
}
const ipfsIdB58String = maddr.stringTuples().filter((tuple) => {
if (tuple[0] === IPFS_CODE) {
return true
}
})[0][1]
try {
const pi = this.peerBook.getByB58String(ipfsIdB58String)
this.hangUpByPeerInfo(pi, callback)
} catch (err) {
// already disconnected
callback()
}
}
hangUpByPeerInfo (peer, callback) {
assert(this.isOnline, OFFLINE_ERROR_MESSAGE)
this.peerBook.removeByB58String(peer.id.toB58String())
this.swarm.hangUp(peer, callback)
}
//
// Protocol multiplexing handling
//
handle (protocol, handlerFunc, matchFunc) {
this.swarm.handle(protocol, handlerFunc, matchFunc)
}
unhandle (protocol) {
this.swarm.unhandle(protocol)
}
}
module.exports = {
Node: Node
}