feat: gossipsub 1.1 (#733)

* feat: gossipsub 1.1

BREAKING CHANGE: pubsub implementation is now directly exposed and its API was updated according to the new pubsub interface in js-libp2p-interfaces repo

* chore: use gossipsub branch with src added

* fix: add pubsub handlers adapter

* chore: fix deps

* chore: update pubsub docs and examples

* chore: apply suggestions from code review

Co-authored-by: Jacob Heun <jacobheun@gmail.com>

* chore: use new floodsub

* chore: change validator doc set

Co-authored-by: Jacob Heun <jacobheun@gmail.com>

* chore: add new gossipsub src

Co-authored-by: Jacob Heun <jacobheun@gmail.com>
This commit is contained in:
Vasco Santos 2020-08-25 16:48:04 +02:00 committed by Jacob Heun
parent 1e869717ff
commit 55c9bfac44
12 changed files with 248 additions and 175 deletions

View File

@ -45,7 +45,7 @@ const after = async () => {
}
module.exports = {
bundlesize: { maxSize: '205kB' },
bundlesize: { maxSize: '225kB' },
hooks: {
pre: before,
post: after

View File

@ -46,6 +46,8 @@
* [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`pubsub.on`](#pubsubon)
* [`pubsub.removeListener`](#pubsubremovelistener)
* [`connectionManager.get`](#connectionmanagerget)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`connectionManager.size`](#connectionmanagersize)
@ -1327,15 +1329,75 @@ await libp2p.pubsub.publish(topic, data)
### pubsub.subscribe
Subscribes the given handler to a pubsub topic.
Subscribes to a pubsub topic.
`libp2p.pubsub.subscribe(topic, handler)`
`libp2p.pubsub.subscribe(topic)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to subscribe |
#### Returns
| Type | Description |
|------|-------------|
| `void` | |
#### Example
```js
const topic = 'topic'
const handler = (msg) => {
// msg.data - pubsub data received
}
libp2p.pubsub.on(topic, handler)
libp2p.pubsub.subscribe(topic)
```
### pubsub.unsubscribe
Unsubscribes from a pubsub topic.
`libp2p.pubsub.unsubscribe(topic)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to unsubscribe |
#### Returns
| Type | Description |
|------|-------------|
| `void` | |
#### Example
```js
const topic = 'topic'
const handler = (msg) => {
// msg.data - pubsub data received
}
libp2p.pubsub.removeListener(topic handler)
libp2p.pubsub.unsubscribe(topic)
```
## pubsub.on
A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers.
`libp2p.pubsub.on(topic, handler)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to listen |
| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array<string>, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic |
#### Returns
@ -1352,21 +1414,22 @@ const handler = (msg) => {
// msg.data - pubsub data received
}
libp2p.pubsub.subscribe(topic, handler)
libp2p.pubsub.on(topic, handler)
libp2p.pubsub.subscribe(topic)
```
### pubsub.unsubscribe
## pubsub.removeListener
Unsubscribes the given handler from a pubsub topic. If no handler is provided, all handlers for the topic are removed.
A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers.
`libp2p.pubsub.unsubscribe(topic, handler)`
`libp2p.pubsub.removeListener(topic, handler)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to unsubscribe |
| handler | `function(<object>)` | handler subscribed |
| topic | `string` | topic to remove listener |
| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array<string>, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic |
#### Returns
@ -1382,7 +1445,67 @@ const handler = (msg) => {
// msg.data - pubsub data received
}
libp2p.pubsub.unsubscribe(topic, handler)
libp2p.pubsub.removeListener(topic handler)
libp2p.pubsub.unsubscribe(topic)
```
## pubsub.topicValidators.set
Pubsub routers support message validators per topic, which will validate the message before its propagations. Set is used to specify a validator for a topic.
`libp2p.pubsub.topicValidators.set(topic, validator)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to bind a validator |
| handler | `function({ topic: string, msg: RPC })` | validator for new data on topic |
#### Returns
| Type | Description |
|------|-------------|
| `Map<string, function(string, RPC)>` | The `Map` object |
#### Example
```js
const topic = 'topic'
const validateMessage = (msgTopic, msg) => {
const input = uint8ArrayToString(msg.data)
const validInputs = ['a', 'b', 'c']
if (!validInputs.includes(input)) {
throw new Error('no valid input received')
}
}
libp2p.pubsub.topicValidators.set(topic, validateMessage)
```
## pubsub.topicValidators.delete
Pubsub routers support message validators per topic, which will validate the message before its propagations. Delete is used to remove a validator for a topic.
`libp2p.pubsub.topicValidators.delete(topic)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| topic | `string` | topic to remove a validator |
#### Returns
| Type | Description |
|------|-------------|
| `boolean` | `true` if an element in the Map object existed and has been removed, or `false` if the element does not exist. |
#### Example
```js
const topic = 'topic'
libp2p.pubsub.topicValidators.delete(topic)
```
### connectionManager.get

View File

@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise')
const SECIO = require('libp2p-secio')
const Gossipsub = require('libp2p-gossipsub')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const createNode = async () => {
const node = await Libp2p.create({
@ -38,13 +39,15 @@ const createNode = async () => {
node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
await node1.dial(node2.peerId)
await node1.pubsub.subscribe(topic, (msg) => {
console.log(`node1 received: ${msg.data.toString()}`)
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)
await node2.pubsub.subscribe(topic, (msg) => {
console.log(`node2 received: ${msg.data.toString()}`)
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)
// node2 publishes "news" every second
setInterval(() => {

View File

@ -47,13 +47,15 @@ node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
await node1.dial(node2.peerId)
await node1.pubsub.subscribe(topic, (msg) => {
console.log(`node1 received: ${msg.data.toString()}`)
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)
await node2.pubsub.subscribe(topic, (msg) => {
console.log(`node2 received: ${msg.data.toString()}`)
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)
// node2 publishes "news" every second
setInterval(() => {

View File

@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise')
const SECIO = require('libp2p-secio')
const Gossipsub = require('libp2p-gossipsub')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const createNode = async () => {
const node = await Libp2p.create({
@ -43,29 +44,34 @@ const createNode = async () => {
await node2.dial(node3.peerId)
//subscribe
await node1.pubsub.subscribe(topic, (msg) => {
console.log(`node1 received: ${msg.data.toString()}`)
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)
await node2.pubsub.subscribe(topic, (msg) => {
console.log(`node2 received: ${msg.data.toString()}`)
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)
await node3.pubsub.subscribe(topic, (msg) => {
console.log(`node3 received: ${msg.data.toString()}`)
node3.pubsub.on(topic, (msg) => {
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
})
await node3.pubsub.subscribe(topic)
const validateFruit = (msgTopic, peer, msg) => {
const fruit = msg.data.toString();
const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data)
const validFruit = ['banana', 'apple', 'orange']
const valid = validFruit.includes(fruit);
return valid;
if (!validFruit.includes(fruit)) {
throw new Error('no valid fruit received')
}
}
//validate fruit
node1.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node2.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node3.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node1.pubsub.topicValidators.set(topic, validateFruit)
node2.pubsub.topicValidators.set(topic, validateFruit)
node3.pubsub.topicValidators.set(topic, validateFruit)
// node1 publishes "fruits" every five seconds
var count = 0;

View File

@ -44,31 +44,36 @@ Now we' can subscribe to the fruit topic and log incoming messages.
```JavaScript
const topic = 'fruit'
await node1.pubsub.subscribe(topic, (msg) => {
console.log(`node1 received: ${msg.data.toString()}`)
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)
await node2.pubsub.subscribe(topic, (msg) => {
console.log(`node2 received: ${msg.data.toString()}`)
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)
await node3.pubsub.subscribe(topic, (msg) => {
console.log(`node3 received: ${msg.data.toString()}`)
node3.pubsub.on(topic, (msg) => {
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
})
await node3.pubsub.subscribe(topic)
```
Finally, let's define the additional filter in the fruit topic.
```JavaScript
const validateFruit = (msgTopic, peer, msg) => {
const fruit = msg.data.toString();
const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data)
const validFruit = ['banana', 'apple', 'orange']
const valid = validFruit.includes(fruit);
return valid;
if (!validFruit.includes(fruit)) {
throw new Error('no valid fruit received')
}
}
node1.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node2.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node3.pubsub._pubsub.topicValidators.set(topic, validateFruit);
node1.pubsub.topicValidators.set(topic, validateFruit)
node2.pubsub.topicValidators.set(topic, validateFruit)
node3.pubsub.topicValidators.set(topic, validateFruit)
```
In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared.

View File

@ -59,7 +59,7 @@
"it-pipe": "^1.1.0",
"it-protocol-buffers": "^0.2.0",
"libp2p-crypto": "^0.18.0",
"libp2p-interfaces": "^0.5.0",
"libp2p-interfaces": "^0.5.1",
"libp2p-utils": "^0.2.0",
"mafmt": "^8.0.0",
"merge-options": "^2.0.0",
@ -92,16 +92,17 @@
"cids": "^1.0.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interop-libp2p": "^0.2.0",
"interop-libp2p": "libp2p/interop#chore/gossipsub-1.1",
"ipfs-http-client": "^46.0.0",
"it-concat": "^1.0.0",
"it-pair": "^1.0.0",
"it-pushable": "^1.4.0",
"libp2p": ".",
"libp2p-bootstrap": "^0.12.0",
"libp2p-delegated-content-routing": "^0.6.0",
"libp2p-delegated-peer-routing": "^0.6.0",
"libp2p-floodsub": "^0.22.0",
"libp2p-gossipsub": "^0.5.0",
"libp2p-floodsub": "^0.23.0",
"libp2p-gossipsub": "ChainSafe/js-libp2p-gossipsub#chore/use-libp2p-interfaces0.5.1-with-src",
"libp2p-kad-dht": "^0.20.0",
"libp2p-mdns": "^0.15.0",
"libp2p-mplex": "^0.10.0",

View File

@ -11,7 +11,6 @@ const PeerId = require('peer-id')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const pubsub = require('./pubsub')
const getPeer = require('./get-peer')
const { validate: validateConfig } = require('./config')
const { codes, messages } = require('./errors')
@ -25,6 +24,7 @@ const Metrics = require('./metrics')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const PubsubAdapter = require('./pubsub-adapter')
const PersistentPeerStore = require('./peer-store/persistent')
const Registrar = require('./registrar')
const ping = require('./ping')
@ -185,9 +185,11 @@ class Libp2p extends EventEmitter {
})
}
// start pubsub
// Create pubsub if provided
if (this._modules.pubsub) {
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
const Pubsub = this._modules.pubsub
// using pubsub adapter with *DEPRECATED* handlers functionality
this.pubsub = PubsubAdapter(Pubsub, this, this._config.pubsub)
}
// Attach remaining APIs

40
src/pubsub-adapter.js Normal file
View File

@ -0,0 +1,40 @@
'use strict'
// Pubsub adapter to keep API with handlers while not removed.
module.exports = (PubsubRouter, libp2p, options) => {
class Pubsub extends PubsubRouter {
/**
* Subscribes to a given topic.
* @override
* @param {string} topic
* @param {function(msg: InMessage)} [handler]
* @returns {void}
*/
subscribe (topic, handler) {
// Bind provided handler
handler && this.on(topic, handler)
super.subscribe(topic)
}
/**
* Unsubscribe from the given topic.
* @override
* @param {string} topic
* @param {function(msg: InMessage)} [handler]
* @returns {void}
*/
unsubscribe (topic, handler) {
if (!handler) {
this.removeAllListeners(topic)
} else {
this.removeListener(topic, handler)
}
if (this.listenerCount(topic) === 0) {
super.unsubscribe(topic)
}
}
}
return new Pubsub(libp2p, options)
}

View File

@ -1,109 +0,0 @@
'use strict'
const errCode = require('err-code')
const { messages, codes } = require('./errors')
const uint8ArrayFromString = require('uint8arrays/from-string')
module.exports = (node, Pubsub, config) => {
const pubsub = new Pubsub(node.peerId, node.registrar, config)
return {
/**
* Subscribe the given handler to a pubsub topic
* @param {string} topic
* @param {function} handler The handler to subscribe
* @returns {void}
*/
subscribe: (topic, handler) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
}
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}
pubsub.on(topic, handler)
},
/**
* Unsubscribes from a pubsub topic
* @param {string} topic
* @param {function} [handler] The handler to unsubscribe from
*/
unsubscribe: (topic, handler) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
}
if (!handler) {
pubsub.removeAllListeners(topic)
} else {
pubsub.removeListener(topic, handler)
}
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
}
},
/**
* Publish messages to the given topics.
* @param {Array<string>|string} topic
* @param {Uint8Array} data
* @returns {Promise<void>}
*/
publish: (topic, data) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
}
if (typeof data === 'string' || data instanceof String) {
data = uint8ArrayFromString(data)
}
try {
data = Uint8Array.from(data)
} catch (err) {
throw errCode(new Error('data must be convertible to a Uint8Array'), 'ERR_DATA_IS_NOT_VALID')
}
return pubsub.publish(topic, data)
},
/**
* Get a list of topics the node is subscribed to.
* @returns {Array<String>} topics
*/
getTopics: () => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
}
return pubsub.getTopics()
},
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getSubscribers: (topic) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
}
return pubsub.getSubscribers(topic)
},
setMaxListeners (n) {
return pubsub.setMaxListeners(n)
},
_pubsub: pubsub,
start: () => pubsub.start(),
stop: () => pubsub.stop()
}
}

View File

@ -42,13 +42,13 @@ describe('Pubsub subsystem is configurable', () => {
})
libp2p = await create(customOptions)
expect(libp2p.pubsub._pubsub.started).to.equal(false)
expect(libp2p.pubsub.started).to.equal(false)
await libp2p.start()
expect(libp2p.pubsub._pubsub.started).to.equal(true)
expect(libp2p.pubsub.started).to.equal(true)
await libp2p.stop()
expect(libp2p.pubsub._pubsub.started).to.equal(false)
expect(libp2p.pubsub.started).to.equal(false)
})
it('should not start if disabled once libp2p starts', async () => {
@ -67,10 +67,10 @@ describe('Pubsub subsystem is configurable', () => {
})
libp2p = await create(customOptions)
expect(libp2p.pubsub._pubsub.started).to.equal(false)
expect(libp2p.pubsub.started).to.equal(false)
await libp2p.start()
expect(libp2p.pubsub._pubsub.started).to.equal(false)
expect(libp2p.pubsub.started).to.equal(false)
})
it('should allow a manual start', async () => {
@ -90,9 +90,9 @@ describe('Pubsub subsystem is configurable', () => {
libp2p = await create(customOptions)
await libp2p.start()
expect(libp2p.pubsub._pubsub.started).to.equal(false)
expect(libp2p.pubsub.started).to.equal(false)
await libp2p.pubsub.start()
expect(libp2p.pubsub._pubsub.started).to.equal(true)
expect(libp2p.pubsub.started).to.equal(true)
})
})

View File

@ -66,8 +66,8 @@ describe('Pubsub subsystem operates correctly', () => {
expect(connection).to.exist()
return Promise.all([
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
})
@ -141,14 +141,14 @@ describe('Pubsub subsystem operates correctly', () => {
const connection = await libp2p.dial(remotePeerId)
expect(connection).to.exist()
expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0)
expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0)
expect(libp2p.pubsub.peers.size).to.be.eql(0)
expect(remoteLibp2p.pubsub.peers.size).to.be.eql(0)
remoteLibp2p.pubsub.start()
return Promise.all([
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
})
@ -164,8 +164,8 @@ describe('Pubsub subsystem operates correctly', () => {
remoteLibp2p.pubsub.start()
await Promise.all([
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
let subscribedTopics = libp2p.pubsub.getTopics()