js-libp2p/test/transports/transport-manager.node.js
Alex Potsides 978eb3676f
feat: async peerstore backed by datastores (#1058)
We have a peerstore that keeps all data for all observed peers in memory with no eviction.

This is fine when you don't discover many peers but when using the DHT you encounter a significant number of peers so our peer storage grows and grows over time.

We have a persistent peer store, but it just periodically writes peers into the datastore to be read at startup, still keeping them in memory.

It also means a restart doesn't give you any temporary reprieve from the memory leak as the previously observed peer data is read into memory at startup.

This change refactors the peerstore to use a datastore by default, reading and writing peer info as it arrives.  It can be configured with a MemoryDatastore if desired.

It was necessary to change the peerstore and *book interfaces to be asynchronous since the datastore api is asynchronous.

BREAKING CHANGE: `libp2p.handle`, `libp2p.registrar.register` and the peerstore methods have become async
2022-01-20 12:03:35 +00:00

104 lines
3.6 KiB
JavaScript

'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const { MemoryDatastore } = require('datastore-core/memory')
const AddressManager = require('../../src/address-manager')
const TransportManager = require('../../src/transport-manager')
const PeerStore = require('../../src/peer-store')
const PeerRecord = require('../../src/record/peer-record')
const Transport = require('libp2p-tcp')
const PeerId = require('peer-id')
const { Multiaddr } = require('multiaddr')
const mockUpgrader = require('../utils/mockUpgrader')
const sinon = require('sinon')
const Peers = require('../fixtures/peers')
const pWaitFor = require('p-wait-for')
const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
new Multiaddr('/ip4/127.0.0.1/tcp/0')
]
describe('Transport Manager (TCP)', () => {
let tm
let localPeer
before(async () => {
localPeer = await PeerId.createFromJSON(Peers[0])
})
beforeEach(() => {
tm = new TransportManager({
libp2p: {
peerId: localPeer,
multiaddrs: addrs,
addressManager: new AddressManager({ listen: addrs }),
peerStore: new PeerStore({
peerId: localPeer,
datastore: new MemoryDatastore()
})
},
upgrader: mockUpgrader,
onConnection: () => {}
})
})
afterEach(async () => {
await tm.removeAll()
expect(tm._transports.size).to.equal(0)
})
it('should be able to add and remove a transport', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
expect(tm._transports.size).to.equal(1)
await tm.remove(Transport.prototype[Symbol.toStringTag])
})
it('should be able to listen', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport, { listenerOptions: { listen: 'carefully' } })
const transport = tm._transports.get(Transport.prototype[Symbol.toStringTag])
const spyListener = sinon.spy(transport, 'createListener')
await tm.listen(addrs)
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
// Ephemeral ip addresses may result in multiple listeners
expect(tm.getAddrs().length).to.equal(addrs.length)
await tm.close()
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
expect(spyListener.firstCall.firstArg).to.deep.equal({ listen: 'carefully' })
})
it('should create self signed peer record on listen', async () => {
let signedPeerRecord = await tm.libp2p.peerStore.addressBook.getRawEnvelope(localPeer)
expect(signedPeerRecord).to.not.exist()
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen(addrs)
// Should created Self Peer record on new listen address, but it is done async
// with no event so we have to wait a bit
await pWaitFor(async () => {
signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
return signedPeerRecord != null
}, { interval: 100, timeout: 2000 })
const record = PeerRecord.createFromProtobuf(signedPeerRecord.payload)
expect(record).to.exist()
expect(record.multiaddrs.length).to.equal(addrs.length)
addrs.forEach((a, i) => {
expect(record.multiaddrs[i].equals(a)).to.be.true()
})
})
it('should be able to dial', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen(addrs)
const addr = tm.getAddrs().shift()
const connection = await tm.dial(addr)
expect(connection).to.exist()
await connection.close()
})
})