js-libp2p/test/upgrading/upgrader.spec.js

445 lines
15 KiB
JavaScript
Raw Permalink Normal View History

'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')
const Muxer = require('libp2p-mplex')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const pipe = require('it-pipe')
const { collect } = require('streaming-iterables')
const pSettle = require('p-settle')
const Transport = require('libp2p-websockets')
const Crypto = require('libp2p-secio')
const Protector = require('../../src/pnet')
const swarmKeyBuffer = Buffer.from(require('../fixtures/swarm.key'))
const Libp2p = require('../../src')
const Upgrader = require('../../src/upgrader')
const { codes } = require('../../src/errors')
const mockMultiaddrConnPair = require('../utils/mockMultiaddrConn')
const Peers = require('../fixtures/peers')
const addrs = [
multiaddr('/ip4/127.0.0.1/tcp/0'),
multiaddr('/ip4/127.0.0.1/tcp/0')
]
describe('Upgrader', () => {
let localUpgrader
let remoteUpgrader
let localPeer
let remotePeer
before(async () => {
([
localPeer,
remotePeer
] = await Promise.all([
PeerId.createFromJSON(Peers[0]),
PeerId.createFromJSON(Peers[1])
]))
localUpgrader = new Upgrader({
localPeer
})
remoteUpgrader = new Upgrader({
localPeer: remotePeer
})
localUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
remoteUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
})
afterEach(() => {
sinon.restore()
})
it('should ignore a missing remote peer id', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
// Remove the peer id from the remote address
outbound.remoteAddr = outbound.remoteAddr.decapsulateCode(421)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
})
it('should upgrade with valid muxers and crypto', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const { stream, protocol } = await connections[0].newStream('/echo/1.0.0')
expect(protocol).to.equal('/echo/1.0.0')
const hello = Buffer.from('hello there!')
const result = await pipe(
[hello],
stream,
function toBuffer (source) {
return (async function * () {
for await (const val of source) yield val.slice()
})()
},
collect
)
expect(result).to.eql([hello])
})
it('should upgrade with only crypto', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// No available muxers
const muxers = new Map()
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected()
// Verify the MultiaddrConnection close method is called
sinon.spy(inbound, 'close')
sinon.spy(outbound, 'close')
await Promise.all(connections.map(conn => conn.close()))
expect(inbound.close.callCount).to.equal(1)
expect(outbound.close.callCount).to.equal(1)
})
it('should use a private connection protector when provided', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const protector = new Protector(swarmKeyBuffer)
sinon.stub(localUpgrader, 'protector').value(protector)
sinon.stub(remoteUpgrader, 'protector').value(protector)
sinon.spy(protector, 'protect')
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const { stream, protocol } = await connections[0].newStream('/echo/1.0.0')
expect(protocol).to.equal('/echo/1.0.0')
const hello = Buffer.from('hello there!')
const result = await pipe(
[hello],
stream,
function toBuffer (source) {
return (async function * () {
for await (const val of source) yield val.slice()
})()
},
collect
)
expect(result).to.eql([hello])
expect(protector.protect.callCount).to.eql(2)
})
it('should fail if crypto fails', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const crypto = {
tag: '/insecure',
secureInbound: () => { throw new Error('Boom') },
secureOutbound: () => { throw new Error('Boom') }
}
const cryptos = new Map([[crypto.tag, crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
// Wait for the results of each side of the connection
const results = await pSettle([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// Ensure both sides fail
expect(results).to.have.length(2)
results.forEach(result => {
expect(result.isRejected).to.equal(true)
expect(result.reason.code).to.equal(codes.ERR_ENCRYPTION_FAILED)
})
})
it('should fail if muxers do not match', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxersLocal = new Map([['/muxer-local', Muxer]])
const muxersRemote = new Map([['/muxer-remote', Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxersLocal)
sinon.stub(remoteUpgrader, 'muxers').value(muxersRemote)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
// Wait for the results of each side of the connection
const results = await pSettle([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// Ensure both sides fail
expect(results).to.have.length(2)
results.forEach(result => {
expect(result.isRejected).to.equal(true)
expect(result.reason.code).to.equal(codes.ERR_MUXER_UNAVAILABLE)
})
})
it('should map getStreams and close methods', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
// Create a few streams, at least 1 in each direction
await connections[0].newStream('/echo/1.0.0')
await connections[1].newStream('/echo/1.0.0')
await connections[0].newStream('/echo/1.0.0')
connections.forEach(conn => {
expect(conn.streams).to.have.length(3)
})
// Verify the MultiaddrConnection close method is called
sinon.spy(inbound, 'close')
sinon.spy(outbound, 'close')
await Promise.all(connections.map(conn => conn.close()))
expect(inbound.close.callCount).to.equal(1)
expect(outbound.close.callCount).to.equal(1)
})
it('should call connection handlers', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
// Verify onConnection is called with the connection
sinon.spy(localUpgrader, 'onConnection')
sinon.spy(remoteUpgrader, 'onConnection')
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
expect(localUpgrader.onConnection.callCount).to.equal(1)
expect(localUpgrader.onConnection.getCall(0).args).to.eql([connections[0]])
expect(remoteUpgrader.onConnection.callCount).to.equal(1)
expect(remoteUpgrader.onConnection.getCall(0).args).to.eql([connections[1]])
// Verify onConnectionEnd is called with the connection
sinon.spy(localUpgrader, 'onConnectionEnd')
sinon.spy(remoteUpgrader, 'onConnectionEnd')
await Promise.all(connections.map(conn => conn.close()))
expect(localUpgrader.onConnectionEnd.callCount).to.equal(1)
expect(localUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[0]])
expect(remoteUpgrader.onConnectionEnd.callCount).to.equal(1)
expect(remoteUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[1]])
})
it('should fail to create a stream for an unsupported protocol', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const muxers = new Map([[Muxer.multicodec, Muxer]])
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
const results = await pSettle([
connections[0].newStream('/unsupported/1.0.0'),
connections[1].newStream('/unsupported/1.0.0')
])
expect(results).to.have.length(2)
results.forEach(result => {
expect(result.isRejected).to.equal(true)
expect(result.reason.code).to.equal(codes.ERR_UNSUPPORTED_PROTOCOL)
})
})
})
describe('libp2p.upgrader', () => {
let peers
let libp2p
before(async () => {
const ids = await Promise.all([
PeerId.createFromJSON(Peers[0]),
PeerId.createFromJSON(Peers[1])
])
peers = ids.map(peerId => new PeerInfo(peerId))
})
afterEach(async () => {
sinon.restore()
libp2p && await libp2p.stop()
libp2p = null
})
it('should create an Upgrader', () => {
const protector = new Protector(swarmKeyBuffer)
libp2p = new Libp2p({
peerInfo: peers[0],
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto],
connProtector: protector
}
})
expect(libp2p.upgrader).to.exist()
expect(libp2p.upgrader.muxers).to.eql(new Map([[Muxer.multicodec, Muxer]]))
expect(libp2p.upgrader.cryptos).to.eql(new Map([[Crypto.protocol, Crypto]]))
expect(libp2p.upgrader.protector).to.equal(protector)
// Ensure the transport manager also has the upgrader
expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader)
})
it('should be able to register and unregister a handler', () => {
libp2p = new Libp2p({
peerInfo: peers[0],
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
expect(libp2p.upgrader.protocols).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])
const echoHandler = () => {}
libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler)
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler)
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
libp2p.unhandle(['/echo/1.0.0'])
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined)
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
})
it('should emit connect and disconnect events', async () => {
const remotePeer = peers[1]
libp2p = new Libp2p({
peerInfo: peers[0],
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const remoteUpgrader = new Upgrader({
localPeer: remotePeer.id,
muxers: new Map([[Muxer.multicodec, Muxer]]),
cryptos: new Map([[Crypto.protocol, Crypto]])
})
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer: remotePeer.id })
// Spy on emit for easy verification
sinon.spy(libp2p, 'emit')
// Upgrade and check the connect event
const connections = await Promise.all([
libp2p.upgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(libp2p.emit.callCount).to.equal(1)
let [event, peerInfo] = libp2p.emit.getCall(0).args
expect(event).to.equal('peer:connect')
expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true)
// Close and check the disconnect event
await Promise.all(connections.map(conn => conn.close()))
expect(libp2p.emit.callCount).to.equal(2)
;([event, peerInfo] = libp2p.emit.getCall(1).args)
expect(event).to.equal('peer:disconnect')
expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true)
})
})