1
0
mirror of https://github.com/fluencelabs/js-libp2p synced 2025-03-31 06:41:08 +00:00
Alex Potsides de30c2cec7
feat!: limit protocol streams per-connection ()
* feat: limit protocol streams per-connection

Uses the `maxInboundStreams` and `maxOutboundStreams` of the `registrar.handle`
opts to limit the number of concurrent streams open on each connection
on a per-protocol basis.

Both values default to 1 so some tuning will be necessary to set
appropriate values for some protocols.

* chore: make error codes consistent

* chore: fix up examples
2022-06-17 15:46:31 +02:00

578 lines
18 KiB
TypeScript

/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise'
import { Multiaddr } from '@multiformats/multiaddr'
import delay from 'delay'
import pDefer from 'p-defer'
import pSettle, { PromiseResult } from 'p-settle'
import pWaitFor from 'p-wait-for'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import { Connection, isConnection } from '@libp2p/interface-connection'
import { AbortError } from '@libp2p/interfaces/errors'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MemoryDatastore } from 'datastore-core/memory'
import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-mocks'
import Peers from '../fixtures/peers.js'
import { Components } from '@libp2p/components'
import { createFromJSON } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js'
import swarmKey from '../fixtures/swarm.key.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
const swarmKeyBuffer = uint8ArrayFromString(swarmKey)
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
const unsupportedAddr = new Multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN')
describe('Dialing (direct, TCP)', () => {
let remoteTM: DefaultTransportManager
let localTM: DefaultTransportManager
let remoteAddr: Multiaddr
let remoteComponents: Components
let localComponents: Components
beforeEach(async () => {
const [localPeerId, remotePeerId] = await Promise.all([
createFromJSON(Peers[0]),
createFromJSON(Peers[1])
])
remoteComponents = new Components({
peerId: remotePeerId,
datastore: new MemoryDatastore(),
upgrader: mockUpgrader(),
connectionGater: mockConnectionGater(),
peerStore: new PersistentPeerStore()
})
remoteComponents.setAddressManager(new DefaultAddressManager(remoteComponents, {
listen: [
listenAddr.toString()
]
}))
remoteTM = new DefaultTransportManager(remoteComponents)
remoteTM.add(new TCP())
localComponents = new Components({
peerId: localPeerId,
datastore: new MemoryDatastore(),
upgrader: mockUpgrader(),
connectionGater: mockConnectionGater()
})
localComponents.setPeerStore(new PersistentPeerStore())
localComponents.setConnectionManager(new DefaultConnectionManager({
maxConnections: 100,
minConnections: 50,
autoDialInterval: 1000
}))
localTM = new DefaultTransportManager(localComponents)
localTM.add(new TCP())
localComponents.setTransportManager(localTM)
await remoteTM.listen([listenAddr])
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toString()}`)
})
afterEach(async () => await remoteTM.stop())
afterEach(() => {
sinon.restore()
})
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const connection = await dialer.dial(remoteAddr)
expect(connection).to.exist()
await connection.close()
})
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
await expect(dialer.dial(unsupportedAddr))
.to.eventually.be.rejectedWith(Error)
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})
it('should fail to connect if peer has no known addresses', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const peerId = await createFromJSON(Peers[1])
await expect(dialer.dial(peerId))
.to.eventually.be.rejectedWith(Error)
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})
it('should be able to connect to a given peer id', async () => {
await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs())
const dialer = new Dialer()
dialer.init(localComponents)
const connection = await dialer.dial(remoteComponents.getPeerId())
expect(connection).to.exist()
await connection.close()
})
it('should fail to connect to a given peer with unsupported addresses', async () => {
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr])
const dialer = new Dialer()
dialer.init(localComponents)
await expect(dialer.dial(remoteComponents.getPeerId()))
.to.eventually.be.rejectedWith(Error)
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})
it('should only try to connect to addresses supported by the transports configured', async () => {
const remoteAddrs = remoteTM.getAddrs()
const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr])
const dialer = new Dialer()
dialer.init(localComponents)
sinon.spy(localTM, 'dial')
const connection = await dialer.dial(peerId)
expect(localTM.dial).to.have.property('callCount', remoteAddrs.length)
expect(connection).to.exist()
await connection.close()
})
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
dialTimeout: 50
})
dialer.init(localComponents)
sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => {
expect(options.signal).to.exist()
expect(options.signal?.aborted).to.equal(false)
expect(addr.toString()).to.eql(remoteAddr.toString())
await delay(60)
expect(options.signal?.aborted).to.equal(true)
throw new AbortError()
})
await expect(dialer.dial(remoteAddr))
.to.eventually.be.rejectedWith(Error)
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
})
it('should dial to the max concurrency', async () => {
const addrs = [
new Multiaddr('/ip4/0.0.0.0/tcp/8000'),
new Multiaddr('/ip4/0.0.0.0/tcp/8001'),
new Multiaddr('/ip4/0.0.0.0/tcp/8002')
]
const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, addrs)
const dialer = new Dialer({
maxParallelDials: 2
})
dialer.init(localComponents)
expect(dialer.tokens).to.have.lengthOf(2)
const deferredDial = pDefer<Connection>()
sinon.stub(localTM, 'dial').callsFake(async () => await deferredDial.promise)
// Perform 3 multiaddr dials
void dialer.dial(peerId)
// Let the call stack run
await delay(0)
// We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.lengthOf(0)
deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
// Let the call stack run
await delay(0)
// Only two dials should be executed, as the first dial will succeed
expect(localTM.dial).to.have.property('callCount', 2)
expect(dialer.tokens).to.have.lengthOf(2)
})
})
describe('libp2p.dialer (direct, TCP)', () => {
let peerId: PeerId, remotePeerId: PeerId
let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
let remoteAddr: Multiaddr
beforeEach(async () => {
[peerId, remotePeerId] = await Promise.all([
createFromJSON(Peers[0]),
createFromJSON(Peers[1])
])
remoteLibp2p = await createLibp2pNode({
peerId: remotePeerId,
addresses: {
listen: [listenAddr.toString()]
},
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
})
await remoteLibp2p.start()
remoteAddr = remoteLibp2p.components.getTransportManager().getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toString()}`)
})
afterEach(async () => {
sinon.restore()
if (libp2p != null) {
await libp2p.stop()
}
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
}
})
it('should fail if no peer id is provided', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
await expect(libp2p.dial(remoteLibp2p.components.getTransportManager().getAddrs()[0])).to.eventually.be.rejected()
.with.property('code', ErrorCodes.ERR_INVALID_MULTIADDR)
})
it('should use the dialer for connecting to a multiaddr', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
const stream = await connection.newStream(['/echo/1.0.0'])
expect(stream).to.exist()
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
expect(dialerDialSpy.callCount).to.be.greaterThan(0)
await connection.close()
})
it('should use the dialer for connecting to a peer', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const connection = await libp2p.dial(remotePeerId)
expect(connection).to.exist()
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
expect(dialerDialSpy.callCount).to.be.greaterThan(0)
})
it('should close all streams when the connection closes', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
// register some stream handlers to simulate several protocols
await libp2p.handle('/stream-count/1', ({ stream }) => {
void pipe(stream, stream)
})
await libp2p.handle('/stream-count/2', ({ stream }) => {
void pipe(stream, stream)
})
await remoteLibp2p.handle('/stream-count/3', ({ stream }) => {
void pipe(stream, stream)
})
await remoteLibp2p.handle('/stream-count/4', ({ stream }) => {
void pipe(stream, stream)
})
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const connection = await libp2p.dial(remotePeerId)
// Create local to remote streams
const stream = await connection.newStream('/echo/1.0.0')
await connection.newStream('/stream-count/3')
await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4')
// Partially write to the echo stream
const source = pushable()
void stream.sink(source)
source.push(uint8ArrayFromString('hello'))
// Create remote to local streams
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1')
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2')
// Verify stream count
const remoteConn = remoteLibp2p.getConnections(libp2p.peerId)
if (remoteConn == null) {
throw new Error('No remote connection found')
}
expect(connection.streams).to.have.length(5)
expect(remoteConn).to.have.lengthOf(1)
expect(remoteConn).to.have.nested.property('[0].streams').with.lengthOf(5)
// Close the connection and verify all streams have been closed
await connection.close()
await pWaitFor(() => connection.streams.length === 0)
await pWaitFor(() => remoteConn[0].streams.length === 0)
})
it('should throw when using dialProtocol with no protocols', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
// @ts-expect-error invalid params
await expect(libp2p.dialProtocol(remoteAddr))
.to.eventually.be.rejectedWith(Error)
.and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
await expect(libp2p.dialProtocol(remoteAddr, []))
.to.eventually.be.rejectedWith(Error)
.and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
})
it('should be able to use hangup to close connections', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
expect(connection.stat.timeline.close).to.not.exist()
await libp2p.hangUp(connection.remotePeer)
expect(connection.stat.timeline.close).to.exist()
})
it('should use the protectors when provided for connecting', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
],
connectionProtector: new PreSharedKeyConnectionProtector({
psk: swarmKeyBuffer
})
})
const protector = libp2p.components.getConnectionProtector()
if (protector == null) {
throw new Error('No protector was configured')
}
const protectorProtectSpy = sinon.spy(protector, 'protect')
remoteLibp2p.components.setConnectionProtector(new PreSharedKeyConnectionProtector({ enabled: true, psk: swarmKeyBuffer }))
await libp2p.start()
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
expect(protectorProtectSpy.callCount).to.equal(1)
})
it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
const dials = 10
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toString()}`)
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const dialResults = await Promise.all([...new Array(dials)].map(async (_, index) => {
if (index % 2 === 0) return await libp2p.dial(remoteLibp2p.peerId)
return await libp2p.dial(fullAddress)
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const connection of dialResults) {
expect(isConnection(connection)).to.equal(true)
}
// 1 connection, because we know the peer in the multiaddr
expect(libp2p.getConnections()).to.have.lengthOf(1)
expect(remoteLibp2p.getConnections()).to.have.lengthOf(1)
})
it('should coalesce parallel dials to the same error on failure', async () => {
libp2p = await createLibp2pNode({
peerId,
transports: [
new TCP()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
const dials = 10
const error = new Error('Boom')
sinon.stub(libp2p.components.getTransportManager(), 'dial').callsFake(async () => await Promise.reject(error))
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const dialResults: Array<PromiseResult<Connection>> = await pSettle([...new Array(dials)].map(async (_, index) => {
if (index % 2 === 0) return await libp2p.dial(remoteLibp2p.peerId)
return await libp2p.dial(remoteAddr)
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const result of dialResults) {
expect(result).to.have.property('isRejected', true)
expect(result).to.have.property('reason').that.has.property('name', 'AggregateError')
// All errors should be the exact same as `error`
// @ts-expect-error reason is any
for (const err of result.reason.errors) {
expect(err).to.equal(error)
}
}
// 1 connection, because we know the peer in the multiaddr
expect(libp2p.getConnections()).to.have.lengthOf(0)
expect(remoteLibp2p.getConnections()).to.have.lengthOf(0)
})
})