js-libp2p/test/upgrading/upgrader.spec.ts
Cayman 8880eefa8f
fix: add successful stream peer to protobook (#1341)
* fix: add successful stream peer to protobook

If a protocol stream has been successfully negotiated and is to be
psased to the application, the peerstore should ensure that the peer is
registerd with that protocol.

* fix: upgrader test fix
2022-08-11 10:48:16 -05:00

728 lines
22 KiB
TypeScript

/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { Mplex } from '@libp2p/mplex'
import { Multiaddr } from '@multiformats/multiaddr'
import { pipe } from 'it-pipe'
import all from 'it-all'
import pSettle from 'p-settle'
import { WebSockets } from '@libp2p/websockets'
import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import swarmKey from '../fixtures/swarm.key.js'
import { DefaultUpgrader } from '../../src/upgrader.js'
import { codes } from '../../src/errors.js'
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-mocks'
import Peers from '../fixtures/peers.js'
import type { Upgrader } from '@libp2p/interface-transport'
import type { PeerId } from '@libp2p/interface-peer-id'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/components'
import { Plaintext } from '../../src/insecure/index.js'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Stream } from '@libp2p/interface-connection'
import pDefer from 'p-defer'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { pEvent } from 'p-event'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import drain from 'it-drain'
import { Uint8ArrayList } from 'uint8arraylist'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'
const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
new Multiaddr('/ip4/127.0.0.1/tcp/0')
]
describe('Upgrader', () => {
let localUpgrader: Upgrader
let localMuxerFactory: StreamMuxerFactory
let remoteUpgrader: Upgrader
let localPeer: PeerId
let remotePeer: PeerId
let localComponents: Components
let remoteComponents: Components
beforeEach(async () => {
([
localPeer,
remotePeer
] = await Promise.all([
createFromJSON(Peers[0]),
createFromJSON(Peers[1])
]))
localComponents = new Components({
peerId: localPeer,
connectionGater: mockConnectionGater(),
registrar: mockRegistrar(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore()
})
localMuxerFactory = new Mplex()
localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [
localMuxerFactory
],
inboundUpgradeTimeout: 1000
})
remoteComponents = new Components({
peerId: remotePeer,
connectionGater: mockConnectionGater(),
registrar: mockRegistrar(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore()
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [
new Mplex()
],
inboundUpgradeTimeout: 1000
})
await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
await remoteComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
})
afterEach(() => {
sinon.restore()
})
it('should upgrade with valid muxers and crypto', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const stream = await connections[0].newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
const hello = uint8ArrayFromString('hello there!')
const result = await pipe(
[hello],
stream,
function toBuffer (source) {
return (async function * () {
for await (const val of source) yield val.slice()
})()
},
async (source) => await all(source)
)
expect(result).to.eql([hello])
})
it('should upgrade with only crypto', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// No available muxers
localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [],
inboundUpgradeTimeout: 1000
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [],
inboundUpgradeTimeout: 1000
})
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected()
// Verify the MultiaddrConnection close method is called
const inboundCloseSpy = sinon.spy(inbound, 'close')
const outboundCloseSpy = sinon.spy(outbound, 'close')
await Promise.all(connections.map(async conn => await conn.close()))
expect(inboundCloseSpy.callCount).to.equal(1)
expect(outboundCloseSpy.callCount).to.equal(1)
})
it('should use a private connection protector when provided', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const protector = new PreSharedKeyConnectionProtector({
psk: uint8ArrayFromString(swarmKey)
})
const protectorProtectSpy = sinon.spy(protector, 'protect')
localComponents.setConnectionProtector(protector)
remoteComponents.setConnectionProtector(protector)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const stream = await connections[0].newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
const hello = uint8ArrayFromString('hello there!')
const result = await pipe(
[hello],
stream,
function toBuffer (source) {
return (async function * () {
for await (const val of source) yield val.slice()
})()
},
async (source) => await all(source)
)
expect(result).to.eql([hello])
expect(protectorProtectSpy.callCount).to.eql(2)
})
it('should fail if crypto fails', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
class BoomCrypto implements ConnectionEncrypter {
static protocol = '/insecure'
public protocol = '/insecure'
async secureInbound (): Promise<SecuredConnection> { throw new Error('Boom') }
async secureOutbound (): Promise<SecuredConnection> { throw new Error('Boom') }
}
localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
new BoomCrypto()
],
muxers: [],
inboundUpgradeTimeout: 1000
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
new BoomCrypto()
],
muxers: [],
inboundUpgradeTimeout: 1000
})
// Wait for the results of each side of the connection
const results = await pSettle([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// Ensure both sides fail
expect(results).to.have.length(2)
results.forEach(result => {
expect(result).to.have.property('isRejected', true)
expect(result).to.have.nested.property('reason.code', codes.ERR_ENCRYPTION_FAILED)
})
})
it('should fail if muxers do not match', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
class OtherMuxer implements StreamMuxer {
protocol = '/muxer-local'
streams = []
newStream (name?: string): Stream {
throw new Error('Not implemented')
}
source = []
async sink () {}
close () {}
}
class OtherMuxerFactory implements StreamMuxerFactory {
protocol = '/muxer-local'
createStreamMuxer (init?: StreamMuxerInit): StreamMuxer {
return new OtherMuxer()
}
}
localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [
new OtherMuxerFactory()
],
inboundUpgradeTimeout: 1000
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [
new Mplex()
],
inboundUpgradeTimeout: 1000
})
// Wait for the results of each side of the connection
const results = await pSettle([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// Ensure both sides fail
expect(results).to.have.length(2)
results.forEach(result => {
expect(result).to.have.property('isRejected', true)
expect(result).to.have.nested.property('reason.code', codes.ERR_MUXER_UNAVAILABLE)
})
})
it('should map getStreams and close methods', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
// Create a few streams, at least 1 in each direction
await connections[0].newStream('/echo/1.0.0')
await connections[1].newStream('/echo/1.0.0')
await connections[0].newStream('/echo/1.0.0')
connections.forEach(conn => {
expect(conn.streams).to.have.length(3)
})
// Verify the MultiaddrConnection close method is called
const inboundCloseSpy = sinon.spy(inbound, 'close')
const outboundCloseSpy = sinon.spy(outbound, 'close')
await Promise.all(connections.map(async conn => await conn.close()))
expect(inboundCloseSpy.callCount).to.equal(1)
expect(outboundCloseSpy.callCount).to.equal(1)
})
it('should call connection handlers', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const localConnectionEventReceived = pDefer()
const localConnectionEndEventReceived = pDefer()
const remoteConnectionEventReceived = pDefer()
const remoteConnectionEndEventReceived = pDefer()
localUpgrader.addEventListener('connection', () => {
localConnectionEventReceived.resolve()
})
localUpgrader.addEventListener('connectionEnd', () => {
localConnectionEndEventReceived.resolve()
})
remoteUpgrader.addEventListener('connection', () => {
remoteConnectionEventReceived.resolve()
})
remoteUpgrader.addEventListener('connectionEnd', () => {
remoteConnectionEndEventReceived.resolve()
})
// Verify onConnection is called with the connection
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
await Promise.all([
localConnectionEventReceived.promise,
remoteConnectionEventReceived.promise
])
// Verify onConnectionEnd is called with the connection
await Promise.all(connections.map(async conn => await conn.close()))
await Promise.all([
localConnectionEndEventReceived.promise,
remoteConnectionEndEventReceived.promise
])
})
it('should fail to create a stream for an unsupported protocol', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const results = await pSettle([
connections[0].newStream('/unsupported/1.0.0'),
connections[1].newStream('/unsupported/1.0.0')
])
expect(results).to.have.length(2)
results.forEach(result => {
expect(result).to.have.property('isRejected', true)
expect(result).to.have.nested.property('reason.code', codes.ERR_UNSUPPORTED_PROTOCOL)
})
})
it('should abort protocol selection for slow streams', async () => {
const createStreamMuxerSpy = sinon.spy(localMuxerFactory, 'createStreamMuxer')
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// should have created muxer for connection
expect(createStreamMuxerSpy).to.have.property('callCount', 1)
// create mock muxed stream that never sends data
const muxer = createStreamMuxerSpy.getCall(0).returnValue
muxer.newStream = () => {
return mockStream({
source: (async function * () {
// longer than the timeout
await delay(1000)
yield new Uint8ArrayList()
}()),
sink: drain
})
}
await expect(connections[0].newStream('/echo/1.0.0', {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
})
it('should close streams when protocol negotiation fails', async () => {
await remoteComponents.getRegistrar().unhandle('/echo/1.0.0')
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections[0].streams).to.have.lengthOf(0)
expect(connections[1].streams).to.have.lengthOf(0)
await expect(connections[0].newStream('/echo/1.0.0'))
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
// wait for remote to close
await delay(100)
expect(connections[0].streams).to.have.lengthOf(0)
expect(connections[1].streams).to.have.lengthOf(0)
})
})
describe('libp2p.upgrader', () => {
let peers: PeerId[]
let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
before(async () => {
peers = await Promise.all([
createFromJSON(Peers[0]),
createFromJSON(Peers[1])
])
})
afterEach(async () => {
sinon.restore()
if (libp2p != null) {
await libp2p.stop()
}
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
}
})
it('should create an Upgrader', async () => {
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
],
connectionProtector: new PreSharedKeyConnectionProtector({
psk: uint8ArrayFromString(swarmKey)
})
})
expect(libp2p.components.getUpgrader()).to.exist()
expect(libp2p.components.getConnectionProtector()).to.exist()
})
it('should return muxed streams', async () => {
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await libp2p.start()
const echoHandler = () => {}
await libp2p.handle(['/echo/1.0.0'], echoHandler)
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await remoteLibp2p.start()
await remoteLibp2p.handle('/echo/1.0.0', echoHandler)
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const [localConnection] = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
const remoteLibp2pUpgraderOnStreamSpy = sinon.spy(remoteLibp2p.components.getUpgrader() as DefaultUpgrader, '_onStream')
const stream = await localConnection.newStream(['/echo/1.0.0'])
expect(stream).to.include.keys(['id', 'close', 'reset', 'stat'])
const [arg0] = remoteLibp2pUpgraderOnStreamSpy.getCall(0).args
expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'stat'])
})
it('should emit connect and disconnect events', async () => {
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await libp2p.start()
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await remoteLibp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// Spy on emit for easy verification
const connectionManagerDispatchEventSpy = sinon.spy(libp2p.components.getConnectionManager(), 'dispatchEvent')
// Upgrade and check the connect event
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
const connections = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
await connectionPromise
expect(connectionManagerDispatchEventSpy.callCount).to.equal(1)
let [event] = connectionManagerDispatchEventSpy.getCall(0).args
expect(event).to.have.property('type', 'peer:connect')
// @ts-expect-error detail is only on CustomEvent type
expect(remotePeer.equals(event.detail.remotePeer)).to.equal(true)
// Close and check the disconnect event
await Promise.all(connections.map(async conn => await conn.close()))
expect(connectionManagerDispatchEventSpy.callCount).to.equal(2)
;([event] = connectionManagerDispatchEventSpy.getCall(1).args)
expect(event).to.have.property('type', 'peer:disconnect')
// @ts-expect-error detail is only on CustomEvent type
expect(remotePeer.equals(event.detail.remotePeer)).to.equal(true)
})
it('should limit the number of incoming streams that can be opened using a protocol', async () => {
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await libp2p.start()
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await remoteLibp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const [localToRemote] = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
let streamCount = 0
await libp2p.handle(protocol, (data) => {}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: 1,
maxOutboundStreams: 1
})
expect(streamCount).to.equal(0)
await localToRemote.newStream(protocol)
expect(streamCount).to.equal(1)
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', 'ERR_STREAM_RESET')
})
it('should limit the number of outgoing streams that can be opened using a protocol', async () => {
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await libp2p.start()
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
new Plaintext()
]
})
await remoteLibp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const [localToRemote] = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
let streamCount = 0
await libp2p.handle(protocol, (data) => {}, {
maxInboundStreams: 1,
maxOutboundStreams: 1
})
await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
expect(streamCount).to.equal(0)
await localToRemote.newStream(protocol)
expect(streamCount).to.equal(1)
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})
})