Updates all deps needed to support passing lists of byte arrays where they have been created from multiple input buffers. When reading multiplexed data, all messages arrive in length-prefixed buffers, which means the first few bytes tell the consumer how many bytes long next chunk will be. One length prefixed chunk can be delivered in several payloads from the underlying network transport. The first payload can also include the length prefix and some or all of the data, so we stitch these together in a `Uint8ArrayList` to avoid having to concatenate `Uint8Array`s together. Previously once we'd received enough bytes to satisfy the length prefix we'd concatenate the bytes together, but this is a potentially expensive operation where transports have small message sizes so instead just pass the `Uint8ArrayList` to the consumer and let them decide wether to concatenate or not as some consumers will be smart enough to operate on lists of `Uint8Array`s instead of always requiring a contiguous block of memory. BREAKING CHANGE: Streams are now `Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>`
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { Plaintext } from '../../src/insecure/index.js'
import { Multiaddr } from '@multiformats/multiaddr'
import delay from 'delay'
import pDefer from 'p-defer'
import pSettle, { PromiseResult } from 'p-settle'
import pWaitFor from 'p-wait-for'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import { Connection, isConnection } from '@libp2p/interface-connection'
import { AbortError } from '@libp2p/interfaces/errors'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MemoryDatastore } from 'datastore-core/memory'
import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-mocks'
import Peers from '../fixtures/peers.js'
import { Components } from '@libp2p/components'
import { createFromJSON } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js'
import swarmKey from '../fixtures/swarm.key.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
const swarmKeyBuffer = uint8ArrayFromString(swarmKey)
const listenAddr = new Multiaddr('/ip4/')
const unsupportedAddr = new Multiaddr('/ip4/')
describe('Dialing (direct, TCP)', () => {
let remoteTM: DefaultTransportManager
let localTM: DefaultTransportManager
let remoteAddr: Multiaddr
let remoteComponents: Components
let localComponents: Components
beforeEach(async () => {
const [localPeerId, remotePeerId] = await Promise.all([
remoteComponents = new Components({
peerId: remotePeerId,
datastore: new MemoryDatastore(),
upgrader: mockUpgrader(),
connectionGater: mockConnectionGater(),
peerStore: new PersistentPeerStore()
remoteComponents.setAddressManager(new DefaultAddressManager(remoteComponents, {
listen: [
remoteTM = new DefaultTransportManager(remoteComponents)
remoteTM.add(new TCP())
localComponents = new Components({
peerId: localPeerId,
datastore: new MemoryDatastore(),
upgrader: mockUpgrader(),
connectionGater: mockConnectionGater()
localComponents.setPeerStore(new PersistentPeerStore())
localComponents.setConnectionManager(new DefaultConnectionManager({
maxConnections: 100,
minConnections: 50,
autoDialInterval: 1000,
inboundUpgradeTimeout: 1000
localTM = new DefaultTransportManager(localComponents)
localTM.add(new TCP())
await remoteTM.listen([listenAddr])
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toString()}`)
afterEach(async () => await remoteTM.stop())
afterEach(() => {
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer()
const connection = await dialer.dial(remoteAddr)
await connection.close()
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer()
await expect(dialer.dial(unsupportedAddr))
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
it('should fail to connect if peer has no known addresses', async () => {
const dialer = new Dialer()
const peerId = await createFromJSON(Peers[1])
await expect(dialer.dial(peerId))
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
it('should be able to connect to a given peer id', async () => {
await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs())
const dialer = new Dialer()
const connection = await dialer.dial(remoteComponents.getPeerId())
await connection.close()
it('should fail to connect to a given peer with unsupported addresses', async () => {
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr])
const dialer = new Dialer()
await expect(dialer.dial(remoteComponents.getPeerId()))
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
it('should only try to connect to addresses supported by the transports configured', async () => {
const remoteAddrs = remoteTM.getAddrs()
const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr])
const dialer = new Dialer()
sinon.spy(localTM, 'dial')
const connection = await dialer.dial(peerId)
expect(localTM.dial).to.have.property('callCount', remoteAddrs.length)
await connection.close()
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
dialTimeout: 50
sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => {
await delay(60)
throw new AbortError()
await expect(dialer.dial(remoteAddr))
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
it('should dial to the max concurrency', async () => {
const addrs = [
new Multiaddr('/ip4/'),
new Multiaddr('/ip4/'),
new Multiaddr('/ip4/')
const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, addrs)
const dialer = new Dialer({
maxParallelDials: 2
const deferredDial = pDefer<Connection>()
sinon.stub(localTM, 'dial').callsFake(async () => await deferredDial.promise)
// Perform 3 multiaddr dials
void dialer.dial(peerId)
// Let the call stack run
await delay(0)
// We should have 2 in progress, and 1 waiting
deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
// Let the call stack run
await delay(0)
// Only two dials should be executed, as the first dial will succeed
expect(localTM.dial).to.have.property('callCount', 2)
describe('libp2p.dialer (direct, TCP)', () => {
let peerId: PeerId, remotePeerId: PeerId
let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
let remoteAddr: Multiaddr
beforeEach(async () => {
[peerId, remotePeerId] = await Promise.all([
remoteLibp2p = await createLibp2pNode({
peerId: remotePeerId,
addresses: {
listen: [listenAddr.toString()]
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
await remoteLibp2p.start()
remoteAddr = remoteLibp2p.components.getTransportManager().getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toString()}`)
afterEach(async () => {
if (libp2p != null) {
await libp2p.stop()
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
it('should fail if no peer id is provided', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
await expect(libp2p.dial(remoteLibp2p.components.getTransportManager().getAddrs()[0])).to.eventually.be.rejected()
.with.property('code', ErrorCodes.ERR_INVALID_MULTIADDR)
it('should use the dialer for connecting to a multiaddr', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
const connection = await libp2p.dial(remoteAddr)
const stream = await connection.newStream(['/echo/1.0.0'])
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
it('should use the dialer for connecting to a peer', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const connection = await libp2p.dial(remotePeerId)
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
it('should close all streams when the connection closes', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
// register some stream handlers to simulate several protocols
await libp2p.handle('/stream-count/1', ({ stream }) => {
void pipe(stream, stream)
await libp2p.handle('/stream-count/2', ({ stream }) => {
void pipe(stream, stream)
await remoteLibp2p.handle('/stream-count/3', ({ stream }) => {
void pipe(stream, stream)
await remoteLibp2p.handle('/stream-count/4', ({ stream }) => {
void pipe(stream, stream)
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const connection = await libp2p.dial(remotePeerId)
// Create local to remote streams
const stream = await connection.newStream('/echo/1.0.0')
await connection.newStream('/stream-count/3')
await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4')
// Partially write to the echo stream
const source = pushable()
void stream.sink(source)
// Create remote to local streams
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1')
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2')
// Verify stream count
const remoteConn = remoteLibp2p.getConnections(libp2p.peerId)
if (remoteConn == null) {
throw new Error('No remote connection found')
// Close the connection and verify all streams have been closed
await connection.close()
await pWaitFor(() => connection.streams.length === 0)
await pWaitFor(() => remoteConn[0].streams.length === 0)
it('should throw when using dialProtocol with no protocols', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
// @ts-expect-error invalid params
await expect(libp2p.dialProtocol(remoteAddr))
.and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
await expect(libp2p.dialProtocol(remoteAddr, []))
.and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
it('should be able to use hangup to close connections', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
const connection = await libp2p.dial(remoteAddr)
await libp2p.hangUp(connection.remotePeer)
it('should use the protectors when provided for connecting', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
connectionProtector: new PreSharedKeyConnectionProtector({
psk: swarmKeyBuffer
const protector = libp2p.components.getConnectionProtector()
if (protector == null) {
throw new Error('No protector was configured')
const protectorProtectSpy = sinon.spy(protector, 'protect')
remoteLibp2p.components.setConnectionProtector(new PreSharedKeyConnectionProtector({ enabled: true, psk: swarmKeyBuffer }))
await libp2p.start()
const connection = await libp2p.dial(remoteAddr)
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
const dials = 10
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toString()}`)
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const dialResults = await Promise.all([...new Array(dials)].map(async (_, index) => {
if (index % 2 === 0) return await libp2p.dial(remoteLibp2p.peerId)
return await libp2p.dial(fullAddress)
// All should succeed and we should have ten results
for (const connection of dialResults) {
// 1 connection, because we know the peer in the multiaddr
it('should coalesce parallel dials to the same error on failure', async () => {
libp2p = await createLibp2pNode({
transports: [
new TCP()
streamMuxers: [
new Mplex()
connectionEncryption: [
new Plaintext()
await libp2p.start()
const dials = 10
const error = new Error('Boom')
sinon.stub(libp2p.components.getTransportManager(), 'dial').callsFake(async () => await Promise.reject(error))
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
const dialResults: Array<PromiseResult<Connection>> = await pSettle([...new Array(dials)].map(async (_, index) => {
if (index % 2 === 0) return await libp2p.dial(remoteLibp2p.peerId)
return await libp2p.dial(remoteAddr)
// All should succeed and we should have ten results
for (const result of dialResults) {
expect(result).to.have.property('isRejected', true)
expect(result).to.have.property('reason').that.has.property('name', 'AggregateError')
// All errors should be the exact same as `error`
// @ts-expect-error reason is any
for (const err of result.reason.errors) {
// 1 connection, because we know the peer in the multiaddr