Alex Potsides 978eb3676f
feat: async peerstore backed by datastores (#1058)
We have a peerstore that keeps all data for all observed peers in memory with no eviction.

This is fine when you don't discover many peers but when using the DHT you encounter a significant number of peers so our peer storage grows and grows over time.

We have a persistent peer store, but it just periodically writes peers into the datastore to be read at startup, still keeping them in memory.

It also means a restart doesn't give you any temporary reprieve from the memory leak as the previously observed peer data is read into memory at startup.

This change refactors the peerstore to use a datastore by default, reading and writing peer info as it arrives.  It can be configured with a MemoryDatastore if desired.

It was necessary to change the peerstore and *book interfaces to be asynchronous since the datastore api is asynchronous.

BREAKING CHANGE: `libp2p.handle`, `libp2p.registrar.register` and the peerstore methods have become async
2022-01-20 12:03:35 +00:00

417 lines
12 KiB

'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const { MemoryDatastore } = require('datastore-core/memory')
const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const PeerStore = require('../../src/peer-store')
const peerUtils = require('../utils/creators/peer')
const {
} = require('../../src/errors')
* @typedef {import('../../src/peer-store/types').PeerStore} PeerStore
* @typedef {import('../../src/peer-store/types').ProtoBook} ProtoBook
* @typedef {import('peer-id')} PeerId
const arraysAreEqual = (a, b) => a.length === b.length && a.sort().every((item, index) => b[index] === item)
describe('protoBook', () => {
/** @type {PeerId} */
let peerId
before(async () => {
[peerId] = await peerUtils.createPeerId()
describe('protoBook.set', () => {
/** @type {PeerStore} */
let peerStore
/** @type {ProtoBook} */
let pb
beforeEach(() => {
peerStore = new PeerStore({
datastore: new MemoryDatastore()
pb = peerStore.protoBook
afterEach(() => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
await expect(pb.set('invalid peerId')).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('throws invalid parameters error if no protocols provided', async () => {
await expect(pb.set(peerId)).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('replaces the stored content by default and emit change event', async () => {
const defer = pDefer()
const supportedProtocols = ['protocol1', 'protocol2']
peerStore.once('change:protocols', ({ peerId, protocols }) => {
await pb.set(peerId, supportedProtocols)
const protocols = await pb.get(peerId)
await defer.promise
it('emits on set if not storing the exact same content', async () => {
const defer = pDefer()
const supportedProtocolsA = ['protocol1', 'protocol2']
const supportedProtocolsB = ['protocol2']
let changeCounter = 0
peerStore.on('change:protocols', () => {
if (changeCounter > 1) {
// set 1
await pb.set(peerId, supportedProtocolsA)
// set 2 (same content)
await pb.set(peerId, supportedProtocolsB)
const protocols = await pb.get(peerId)
await defer.promise
it('does not emit on set if it is storing the exact same content', async () => {
const defer = pDefer()
const supportedProtocols = ['protocol1', 'protocol2']
let changeCounter = 0
peerStore.on('change:protocols', () => {
if (changeCounter > 1) {
// set 1
await pb.set(peerId, supportedProtocols)
// set 2 (same content)
await pb.set(peerId, supportedProtocols)
// Wait 50ms for incorrect second event
setTimeout(() => {
}, 50)
return defer.promise
describe('protoBook.add', () => {
/** @type {PeerStore} */
let peerStore
/** @type {ProtoBook} */
let pb
beforeEach(() => {
peerStore = new PeerStore({
datastore: new MemoryDatastore()
pb = peerStore.protoBook
afterEach(() => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
await expect(pb.add('invalid peerId')).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('throws invalid parameters error if no protocols provided', async () => {
await expect(pb.add(peerId)).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('adds the new content and emits change event', async () => {
const defer = pDefer()
const supportedProtocolsA = ['protocol1', 'protocol2']
const supportedProtocolsB = ['protocol3']
const finalProtocols = supportedProtocolsA.concat(supportedProtocolsB)
let changeTrigger = 2
peerStore.on('change:protocols', ({ protocols }) => {
if (changeTrigger === 0 && arraysAreEqual(protocols, finalProtocols)) {
// Replace
await pb.set(peerId, supportedProtocolsA)
let protocols = await pb.get(peerId)
// Add
await pb.add(peerId, supportedProtocolsB)
protocols = await pb.get(peerId)
return defer.promise
it('emits on add if the content to add not exists', async () => {
const defer = pDefer()
const supportedProtocolsA = ['protocol1']
const supportedProtocolsB = ['protocol2']
const finalProtocols = supportedProtocolsA.concat(supportedProtocolsB)
let changeCounter = 0
peerStore.on('change:protocols', () => {
if (changeCounter > 1) {
// set 1
await pb.set(peerId, supportedProtocolsA)
// set 2 (content already existing)
await pb.add(peerId, supportedProtocolsB)
const protocols = await pb.get(peerId)
return defer.promise
it('does not emit on add if the content to add already exists', async () => {
const defer = pDefer()
const supportedProtocolsA = ['protocol1', 'protocol2']
const supportedProtocolsB = ['protocol2']
let changeCounter = 0
peerStore.on('change:protocols', () => {
if (changeCounter > 1) {
// set 1
await pb.set(peerId, supportedProtocolsA)
// set 2 (content already existing)
await pb.add(peerId, supportedProtocolsB)
// Wait 50ms for incorrect second event
setTimeout(() => {
}, 50)
return defer.promise
describe('protoBook.remove', () => {
/** @type {PeerStore} */
let peerStore
/** @type {ProtoBook} */
let pb
beforeEach(() => {
peerStore = new PeerStore({
datastore: new MemoryDatastore()
pb = peerStore.protoBook
afterEach(() => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
await expect(pb.remove('invalid peerId')).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('throws invalid parameters error if no protocols provided', async () => {
await expect(pb.remove(peerId)).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('removes the given protocol and emits change event', async () => {
const spy = sinon.spy()
const supportedProtocols = ['protocol1', 'protocol2']
const removedProtocols = ['protocol1']
const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p))
peerStore.on('change:protocols', spy)
// Replace
await pb.set(peerId, supportedProtocols)
let protocols = await pb.get(peerId)
// Remove
await pb.remove(peerId, removedProtocols)
protocols = await pb.get(peerId)
await pWaitFor(() => spy.callCount === 2)
const [firstCallArgs] = spy.firstCall.args
const [secondCallArgs] = spy.secondCall.args
expect(arraysAreEqual(firstCallArgs.protocols, supportedProtocols))
expect(arraysAreEqual(secondCallArgs.protocols, finalProtocols))
it('emits on remove if the content changes', async () => {
const spy = sinon.spy()
const supportedProtocols = ['protocol1', 'protocol2']
const removedProtocols = ['protocol2']
const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p))
peerStore.on('change:protocols', spy)
// set
await pb.set(peerId, supportedProtocols)
// remove (content already existing)
await pb.remove(peerId, removedProtocols)
const protocols = await pb.get(peerId)
return pWaitFor(() => spy.callCount === 2)
it('does not emit on remove if the content does not change', async () => {
const spy = sinon.spy()
const supportedProtocols = ['protocol1', 'protocol2']
const removedProtocols = ['protocol3']
peerStore.on('change:protocols', spy)
// set
await pb.set(peerId, supportedProtocols)
// remove
await pb.remove(peerId, removedProtocols)
// Only one event
describe('protoBook.get', () => {
/** @type {PeerStore} */
let peerStore
/** @type {ProtoBook} */
let pb
beforeEach(() => {
peerStore = new PeerStore({
datastore: new MemoryDatastore()
pb = peerStore.protoBook
it('throws invalid parameters error if invalid PeerId is provided', async () => {
await expect(pb.get('invalid peerId')).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('returns empty if no protocols are known for the provided peer', async () => {
const protocols = await pb.get(peerId)
it('returns the protocols stored', async () => {
const supportedProtocols = ['protocol1', 'protocol2']
await pb.set(peerId, supportedProtocols)
const protocols = await pb.get(peerId)
describe('protoBook.delete', () => {
/** @type {PeerStore} */
let peerStore
/** @type {ProtoBook} */
let pb
beforeEach(() => {
peerStore = new PeerStore({
datastore: new MemoryDatastore()
pb = peerStore.protoBook
it('throws invalid parameters error if invalid PeerId is provided', async () => {
await expect(pb.delete('invalid peerId')).to.eventually.be.rejected().with.property('code', ERR_INVALID_PARAMETERS)
it('should not emit event if no records exist for the peer', async () => {
const defer = pDefer()
peerStore.on('change:protocols', () => {
await pb.delete(peerId)
// Wait 50ms for incorrect invalid event
setTimeout(() => {
}, 50)
await defer.promise
it('should emit event if a record exists for the peer', async () => {
const defer = pDefer()
const supportedProtocols = ['protocol1', 'protocol2']
await pb.set(peerId, supportedProtocols)
// Listen after set
peerStore.on('change:protocols', ({ protocols }) => {
await pb.delete(peerId)
await defer.promise