mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-03-16 07:30:51 +00:00
[WIP] Awesome DHT (#86)
* feat: integrate dht * better interfaces * docs: add documentation for peerRouting, contentRouting, dht * fix: take in passed datastore * fix: update usage of _getPeerInfo * fix: getPeerInfo * docs: update docs * moar
This commit is contained in:
parent
babb90fe17
commit
8aa932a491
35
README.md
35
README.md
@ -75,6 +75,7 @@ const WS = require('libp2p-websockets')
|
||||
const spdy = require('libp2p-spdy')
|
||||
const secio = require('libp2p-secio')
|
||||
const MulticastDNS = require('libp2p-mdns')
|
||||
const DHT = require('libp2p-kad-dht')
|
||||
|
||||
class Node extends libp2p {
|
||||
constructor (peerInfo, peerBook, options) {
|
||||
@ -95,7 +96,9 @@ class Node extends libp2p {
|
||||
},
|
||||
discovery: [
|
||||
new MulticastDNS(peerInfo, 'your-identifier')
|
||||
]
|
||||
],
|
||||
// DHT is passed as its own enabling PeerRouting, ContentRouting and DHT itself components
|
||||
dht: DHT
|
||||
}
|
||||
|
||||
super(modules, peerInfo, peerBook, options)
|
||||
@ -144,6 +147,36 @@ class Node extends libp2p {
|
||||
|
||||
`callback` is a function with the following `function (err) {}` signature, where `err` is an Error in case stopping the node fails.
|
||||
|
||||
#### `libp2p.peerRouting.findPeer(id, callback)`
|
||||
|
||||
> Looks up for multiaddrs of a peer in the DHT
|
||||
|
||||
- `id`: instance of [PeerId][]
|
||||
|
||||
#### `libp2p.contentRouting.findProviders(key, timeout, callback)`
|
||||
|
||||
- `key`: Buffer
|
||||
- `timeout`: Number miliseconds
|
||||
|
||||
#### `libp2p.contentRouting.provide(key, timeout, callback)`
|
||||
|
||||
- `key`: Buffer
|
||||
- `timeout`: Number miliseconds
|
||||
|
||||
#### `libp2p.dht.put(key, value, callback)`
|
||||
|
||||
- `key`: Buffer
|
||||
- `value`: Buffer
|
||||
|
||||
#### `libp2p.dht.get(key, callback)`
|
||||
|
||||
- `key`: Buffer
|
||||
|
||||
#### `libp2p.dht.getMany(key, nVals, callback)`
|
||||
|
||||
- `key`: Buffer
|
||||
- `nVals`: Number
|
||||
|
||||
#### `libp2p.handle(protocol, handlerFunc [, matchFunc])`
|
||||
|
||||
> Handle new protocol
|
||||
|
159
src/index.js
159
src/index.js
@ -1,15 +1,19 @@
|
||||
'use strict'
|
||||
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const assert = require('assert')
|
||||
|
||||
const setImmediate = require('async/setImmediate')
|
||||
const each = require('async/each')
|
||||
const series = require('async/series')
|
||||
|
||||
const Ping = require('libp2p-ping')
|
||||
const Swarm = require('libp2p-swarm')
|
||||
const PeerId = require('peer-id')
|
||||
const PeerInfo = require('peer-info')
|
||||
const mafmt = require('mafmt')
|
||||
const PeerBook = require('peer-book')
|
||||
const mafmt = require('mafmt')
|
||||
const multiaddr = require('multiaddr')
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const assert = require('assert')
|
||||
const Ping = require('libp2p-ping')
|
||||
const setImmediate = require('async/setImmediate')
|
||||
|
||||
exports = module.exports
|
||||
|
||||
@ -32,9 +36,7 @@ class Node extends EventEmitter {
|
||||
if (this.modules.connection.muxer) {
|
||||
let muxers = this.modules.connection.muxer
|
||||
muxers = Array.isArray(muxers) ? muxers : [muxers]
|
||||
muxers.forEach((muxer) => {
|
||||
this.swarm.connection.addStreamMuxer(muxer)
|
||||
})
|
||||
muxers.forEach((muxer) => this.swarm.connection.addStreamMuxer(muxer))
|
||||
|
||||
// If muxer exists, we can use Identify
|
||||
this.swarm.connection.reuse()
|
||||
@ -73,9 +75,49 @@ class Node extends EventEmitter {
|
||||
// Mount default protocols
|
||||
Ping.mount(this.swarm)
|
||||
|
||||
// Not fully implemented in js-libp2p yet
|
||||
this.routing = undefined
|
||||
this.records = undefined
|
||||
// dht provided components (peerRouting, contentRouting, dht)
|
||||
if (_modules.DHT) {
|
||||
this._dht = new this.modules.DHT(this, 20, _options.DHT && _options.DHT.datastore)
|
||||
}
|
||||
|
||||
this.peerRouting = {
|
||||
findPeer: (id, callback) => {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.findPeer(id, callback)
|
||||
}
|
||||
}
|
||||
|
||||
this.contentRouting = {
|
||||
findProviders: (key, timeout, callback) => {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.findProviders(key, timeout, callback)
|
||||
},
|
||||
provide: (key, callback) => {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.provide(key, callback)
|
||||
}
|
||||
}
|
||||
|
||||
this.dht = {
|
||||
put: (key, value, callback) => {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.put(key, value, callback)
|
||||
},
|
||||
get: (key, callback) => {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.get(key, callback)
|
||||
},
|
||||
getMany (key, nVals, callback) {
|
||||
assert(this._dht, 'DHT is not available')
|
||||
|
||||
this._dht.getMany(key, nVals, callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -117,24 +159,30 @@ class Node extends EventEmitter {
|
||||
}
|
||||
})
|
||||
|
||||
this.swarm.listen((err) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
if (ws) {
|
||||
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
|
||||
}
|
||||
series([
|
||||
(cb) => this.swarm.listen(cb),
|
||||
(cb) => {
|
||||
// listeners on, libp2p is on
|
||||
this.isOnline = true
|
||||
|
||||
this.isOnline = true
|
||||
if (ws) {
|
||||
// always add dialing on websockets
|
||||
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
|
||||
}
|
||||
|
||||
if (this.modules.discovery) {
|
||||
this.modules.discovery.forEach((discovery) => {
|
||||
setImmediate(() => discovery.start(() => {}))
|
||||
})
|
||||
// all transports need to be setup before discover starts
|
||||
if (this.modules.discovery) {
|
||||
return each(this.modules.discovery, (d, cb) => d.start(cb), cb)
|
||||
}
|
||||
cb()
|
||||
},
|
||||
(cb) => {
|
||||
if (this._dht) {
|
||||
return this._dht.start(cb)
|
||||
}
|
||||
cb()
|
||||
}
|
||||
|
||||
callback()
|
||||
})
|
||||
], callback)
|
||||
}
|
||||
|
||||
/*
|
||||
@ -149,7 +197,15 @@ class Node extends EventEmitter {
|
||||
})
|
||||
}
|
||||
|
||||
this.swarm.close(callback)
|
||||
series([
|
||||
(cb) => {
|
||||
if (this._dht) {
|
||||
return this._dht.stop(cb)
|
||||
}
|
||||
cb()
|
||||
},
|
||||
(cb) => this.swarm.close(cb)
|
||||
], callback)
|
||||
}
|
||||
|
||||
isOn () {
|
||||
@ -158,8 +214,13 @@ class Node extends EventEmitter {
|
||||
|
||||
ping (peer, callback) {
|
||||
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
|
||||
const peerInfo = this._getPeerInfo(peer)
|
||||
callback(null, new Ping(this.swarm, peerInfo))
|
||||
this._getPeerInfo(peer, (err, peerInfo) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
callback(null, new Ping(this.swarm, peerInfo))
|
||||
})
|
||||
}
|
||||
|
||||
dial (peer, protocol, callback) {
|
||||
@ -170,27 +231,31 @@ class Node extends EventEmitter {
|
||||
protocol = undefined
|
||||
}
|
||||
|
||||
let peerInfo
|
||||
try {
|
||||
peerInfo = this._getPeerInfo(peer)
|
||||
} catch (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
this.swarm.dial(peerInfo, protocol, (err, conn) => {
|
||||
this._getPeerInfo(peer, (err, peerInfo) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
this.peerBook.put(peerInfo)
|
||||
callback(null, conn)
|
||||
|
||||
this.swarm.dial(peerInfo, protocol, (err, conn) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
this.peerBook.put(peerInfo)
|
||||
callback(null, conn)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
hangUp (peer, callback) {
|
||||
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
|
||||
const peerInfo = this._getPeerInfo(peer)
|
||||
|
||||
this.swarm.hangUp(peerInfo, callback)
|
||||
this._getPeerInfo(peer, (err, peerInfo) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
this.swarm.hangUp(peerInfo, callback)
|
||||
})
|
||||
}
|
||||
|
||||
handle (protocol, handlerFunc, matchFunc) {
|
||||
@ -204,10 +269,12 @@ class Node extends EventEmitter {
|
||||
/*
|
||||
* Helper method to check the data type of peer and convert it to PeerInfo
|
||||
*/
|
||||
_getPeerInfo (peer) {
|
||||
_getPeerInfo (peer, callback) {
|
||||
let p
|
||||
// PeerInfo
|
||||
if (PeerInfo.isPeerInfo(peer)) {
|
||||
p = peer
|
||||
// Multiaddr instance (not string)
|
||||
} else if (multiaddr.isMultiaddr(peer)) {
|
||||
const peerIdB58Str = peer.getPeerId()
|
||||
try {
|
||||
@ -216,19 +283,19 @@ class Node extends EventEmitter {
|
||||
p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str))
|
||||
}
|
||||
p.multiaddrs.add(peer)
|
||||
// PeerId
|
||||
} else if (PeerId.isPeerId(peer)) {
|
||||
const peerIdB58Str = peer.toB58String()
|
||||
try {
|
||||
p = this.peerBook.get(peerIdB58Str)
|
||||
} catch (err) {
|
||||
// TODO this is where PeerRouting comes into place
|
||||
throw new Error('No knowledge about: ' + peerIdB58Str)
|
||||
return this.peerRouting.findPeer(peer, callback)
|
||||
}
|
||||
} else {
|
||||
throw new Error('peer type not recognized')
|
||||
return setImmediate(() => callback(new Error('peer type not recognized')))
|
||||
}
|
||||
|
||||
return p
|
||||
setImmediate(() => callback(null, p))
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user