We have a peerstore that keeps all data for all observed peers in memory with no eviction. This is fine when you don't discover many peers but when using the DHT you encounter a significant number of peers so our peer storage grows and grows over time. We have a persistent peer store, but it just periodically writes peers into the datastore to be read at startup, still keeping them in memory. It also means a restart doesn't give you any temporary reprieve from the memory leak as the previously observed peer data is read into memory at startup. This change refactors the peerstore to use a datastore by default, reading and writing peer info as it arrives. It can be configured with a MemoryDatastore if desired. It was necessary to change the peerstore and *book interfaces to be asynchronous since the datastore api is asynchronous. BREAKING CHANGE: `libp2p.handle`, `libp2p.registrar.register` and the peerstore methods have become async
Filter Messages
To prevent undesired data from being propagated on the network, we can apply a filter to Gossipsub. Messages that fail validation in the filter will not be re-shared.
1. Setting up a PubSub network with three nodes
First, let's update our libp2p configuration with a pubsub implementation.
const Libp2p = require('libp2p')
const Gossipsub = require('libp2p-gossipsub')
const node = await Libp2p.create({
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
modules: {
transport: [ TCP ],
streamMuxer: [ Mplex ],
connEncryption: [ NOISE ],
pubsub: Gossipsub
}
})
Then, create three nodes and connect them together. In this example, we will connect the nodes in series. Node 1 connected with node 2 and node 2 connected with node 3.
const [node1, node2, node3] = await Promise.all([
createNode(),
createNode(),
createNode(),
])
await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
await node1.dial(node2.peerId)
await node2.peerStore.addressBook.set(node3.peerId, node3.multiaddrs)
await node2.dial(node3.peerId)
Now we' can subscribe to the fruit topic and log incoming messages.
const topic = 'fruit'
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)
node3.pubsub.on(topic, (msg) => {
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
})
await node3.pubsub.subscribe(topic)
Finally, let's define the additional filter in the fruit topic.
const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data)
const validFruit = ['banana', 'apple', 'orange']
if (!validFruit.includes(fruit)) {
throw new Error('no valid fruit received')
}
}
node1.pubsub.topicValidators.set(topic, validateFruit)
node2.pubsub.topicValidators.set(topic, validateFruit)
node3.pubsub.topicValidators.set(topic, validateFruit)
In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared.
var count = 0;
const myFruits = ['banana', 'apple', 'car', 'orange'];
setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count]))
count++
if (count == myFruits.length) {
count = 0
}
}, 5000)
Result
> node 1.js
############## fruit banana ##############
node2 received: banana
node3 received: banana
############## fruit apple ##############
node2 received: apple
node3 received: apple
############## fruit car ##############
############## fruit orange ##############
node1 received: orange
node2 received: orange
node3 received: orange