'use strict' /* eslint-env mocha */ const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const Transport = require('libp2p-tcp') const Muxer = require('libp2p-mplex') const { NOISE: Crypto } = require('libp2p-noise') const { Multiaddr } = require('multiaddr') const PeerId = require('peer-id') const delay = require('delay') const pDefer = require('p-defer') const pSettle = require('p-settle') const pWaitFor = require('p-wait-for') const pipe = require('it-pipe') const pushable = require('it-pushable') const AggregateError = require('aggregate-error') const { Connection } = require('libp2p-interfaces/src/connection') const { AbortError } = require('libp2p-interfaces/src/transport/errors') const uint8ArrayFromString = require('uint8arrays/from-string') const Libp2p = require('../../src') const Dialer = require('../../src/dialer') const AddressManager = require('../../src/address-manager') const PeerStore = require('../../src/peer-store') const TransportManager = require('../../src/transport-manager') const { codes: ErrorCodes } = require('../../src/errors') const Protector = require('../../src/pnet') const swarmKeyBuffer = uint8ArrayFromString(require('../fixtures/swarm.key')) const mockUpgrader = require('../utils/mockUpgrader') const createMockConnection = require('../utils/mockConnection') const Peers = require('../fixtures/peers') const { createPeerId } = require('../utils/creators/peer') const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') const unsupportedAddr = new Multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN') describe('Dialing (direct, TCP)', () => { let remoteTM let localTM let peerStore let remoteAddr beforeEach(async () => { const [localPeerId, remotePeerId] = await Promise.all([ PeerId.createFromJSON(Peers[0]), PeerId.createFromJSON(Peers[1]) ]) peerStore = new PeerStore({ peerId: remotePeerId }) remoteTM = new TransportManager({ libp2p: { addressManager: new AddressManager(remotePeerId, { listen: [listenAddr] }), peerId: remotePeerId, peerStore }, upgrader: mockUpgrader }) remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport) localTM = new TransportManager({ libp2p: { peerId: localPeerId, peerStore: new PeerStore({ peerId: localPeerId }) }, upgrader: mockUpgrader }) localTM.add(Transport.prototype[Symbol.toStringTag], Transport) await remoteTM.listen([listenAddr]) remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`) }) afterEach(() => remoteTM.close()) afterEach(() => { sinon.restore() }) it('should be able to connect to a remote node via its multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore }) const connection = await dialer.connectToPeer(remoteAddr) expect(connection).to.exist() await connection.close() }) it('should be able to connect to a remote node via its stringified multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore }) const connection = await dialer.connectToPeer(remoteAddr.toString()) expect(connection).to.exist() await connection.close() }) it('should fail to connect to an unsupported multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore }) await expect(dialer.connectToPeer(unsupportedAddr)) .to.eventually.be.rejectedWith(Error) .and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES) }) it('should fail to connect if peer has no known addresses', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore }) const peerId = await PeerId.createFromJSON(Peers[1]) await expect(dialer.connectToPeer(peerId)) .to.eventually.be.rejectedWith(Error) .and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES) }) it('should be able to connect to a given peer id', async () => { const peerId = await PeerId.createFromJSON(Peers[0]) const peerStore = new PeerStore({ peerId }) const dialer = new Dialer({ transportManager: localTM, peerStore }) peerStore.addressBook.set(peerId, remoteTM.getAddrs()) const connection = await dialer.connectToPeer(peerId) expect(connection).to.exist() await connection.close() }) it('should fail to connect to a given peer with unsupported addresses', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore: { addressBook: { add: () => {}, getMultiaddrsForPeer: () => [unsupportedAddr] } } }) const peerId = await PeerId.createFromJSON(Peers[0]) await expect(dialer.connectToPeer(peerId)) .to.eventually.be.rejectedWith(Error) .and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES) }) it('should only try to connect to addresses supported by the transports configured', async () => { const remoteAddrs = remoteTM.getAddrs() const dialer = new Dialer({ transportManager: localTM, peerStore: { addressBook: { add: () => { }, getMultiaddrsForPeer: () => [...remoteAddrs, unsupportedAddr] } } }) const peerId = await PeerId.createFromJSON(Peers[0]) sinon.spy(localTM, 'dial') const connection = await dialer.connectToPeer(peerId) expect(localTM.dial.callCount).to.equal(remoteAddrs.length) expect(connection).to.exist() await connection.close() }) it('should abort dials on queue task timeout', async () => { const dialer = new Dialer({ transportManager: localTM, peerStore, dialTimeout: 50 }) sinon.stub(localTM, 'dial').callsFake(async (addr, options) => { expect(options.signal).to.exist() expect(options.signal.aborted).to.equal(false) expect(addr.toString()).to.eql(remoteAddr.toString()) await delay(60) expect(options.signal.aborted).to.equal(true) throw new AbortError() }) await expect(dialer.connectToPeer(remoteAddr)) .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) it('should dial to the max concurrency', async () => { const addrs = [ new Multiaddr('/ip4/0.0.0.0/tcp/8000'), new Multiaddr('/ip4/0.0.0.0/tcp/8001'), new Multiaddr('/ip4/0.0.0.0/tcp/8002') ] const dialer = new Dialer({ transportManager: localTM, maxParallelDials: 2, peerStore: { addressBook: { add: () => {}, getMultiaddrsForPeer: () => addrs } } }) expect(dialer.tokens).to.have.length(2) const deferredDial = pDefer() sinon.stub(localTM, 'dial').callsFake(() => deferredDial.promise) const [peerId] = await createPeerId() // Perform 3 multiaddr dials dialer.connectToPeer(peerId) // Let the call stack run await delay(0) // We should have 2 in progress, and 1 waiting expect(dialer.tokens).to.have.length(0) deferredDial.resolve(await createMockConnection()) // Let the call stack run await delay(0) // Only two dials should be executed, as the first dial will succeed expect(localTM.dial.callCount).to.equal(2) expect(dialer.tokens).to.have.length(2) }) describe('libp2p.dialer', () => { let peerId, remotePeerId let libp2p let remoteLibp2p let remoteAddr before(async () => { [peerId, remotePeerId] = await Promise.all([ PeerId.createFromJSON(Peers[0]), PeerId.createFromJSON(Peers[1]) ]) remoteLibp2p = new Libp2p({ peerId: remotePeerId, addresses: { listen: [listenAddr] }, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) await remoteLibp2p.start() remoteAddr = remoteLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`) }) afterEach(async () => { sinon.restore() libp2p && await libp2p.stop() libp2p = null }) after(() => remoteLibp2p.stop()) it('should fail if no peer id is provided', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) sinon.spy(libp2p.dialer, 'connectToPeer') try { await libp2p.dial(remoteLibp2p.transportManager.getAddrs()[0]) } catch (err) { expect(err).to.have.property('code', ErrorCodes.ERR_INVALID_MULTIADDR) return } expect.fail('dial should have failed') }) it('should use the dialer for connecting to a multiaddr', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) sinon.spy(libp2p.dialer, 'connectToPeer') const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) }) it('should use the dialer for connecting to a peer', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) sinon.spy(libp2p.dialer, 'connectToPeer') libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) const connection = await libp2p.dial(remotePeerId) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) }) it('should close all streams when the connection closes', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) // register some stream handlers to simulate several protocols libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream)) libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream)) remoteLibp2p.handle('/stream-count/3', ({ stream }) => pipe(stream, stream)) remoteLibp2p.handle('/stream-count/4', ({ stream }) => pipe(stream, stream)) libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) const connection = await libp2p.dial(remotePeerId) // Create local to remote streams const { stream } = await connection.newStream('/echo/1.0.0') await connection.newStream('/stream-count/3') await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4') // Partially write to the echo stream const source = pushable() stream.sink(source) source.push('hello') // Create remote to local streams await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1') await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2') // Verify stream count const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId) expect(connection.streams).to.have.length(5) expect(remoteConn.streams).to.have.length(5) // Close the connection and verify all streams have been closed await connection.close() await pWaitFor(() => connection.streams.length === 0) await pWaitFor(() => remoteConn.streams.length === 0) }) it('should throw when using dialProtocol with no protocols', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) await expect(libp2p.dialProtocol(remotePeerId)) .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM) await expect(libp2p.dialProtocol(remotePeerId, [])) .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM) }) it('should be able to use hangup to close connections', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() expect(connection.stat.timeline.close).to.not.exist() await libp2p.hangUp(connection.remotePeer) expect(connection.stat.timeline.close).to.exist() }) it('should be able to use hangup by address string to close connections', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) const connection = await libp2p.dial(`${remoteAddr.toString()}`) expect(connection).to.exist() expect(connection.stat.timeline.close).to.not.exist() await libp2p.hangUp(connection.remotePeer) expect(connection.stat.timeline.close).to.exist() }) it('should use the protectors when provided for connecting', async () => { const protector = new Protector(swarmKeyBuffer) libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto], connProtector: protector } }) sinon.spy(libp2p.upgrader.protector, 'protect') sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer)) const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() expect(libp2p.upgrader.protector.protect.callCount).to.equal(1) }) it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) const dials = 10 const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toB58String()}`) libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) const dialResults = await Promise.all([...new Array(dials)].map((_, index) => { if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerId) return libp2p.dial(fullAddress) })) // All should succeed and we should have ten results expect(dialResults).to.have.length(10) for (const connection of dialResults) { expect(Connection.isConnection(connection)).to.equal(true) } // 1 connection, because we know the peer in the multiaddr expect(libp2p.connectionManager.size).to.equal(1) expect(remoteLibp2p.connectionManager.size).to.equal(1) }) it('should coalesce parallel dials to the same error on failure', async () => { libp2p = new Libp2p({ peerId, modules: { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] } }) const dials = 10 const error = new Error('Boom') sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error)) libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) const dialResults = await pSettle([...new Array(dials)].map((_, index) => { if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerId) return libp2p.dial(remoteAddr) })) // All should succeed and we should have ten results expect(dialResults).to.have.length(10) for (const result of dialResults) { expect(result).to.have.property('isRejected', true) expect(result.reason).to.be.an.instanceof(AggregateError) // All errors should be the exact same as `error` for (const err of result.reason) { expect(err).to.equal(error) } } // 1 connection, because we know the peer in the multiaddr expect(libp2p.connectionManager.size).to.equal(0) expect(remoteLibp2p.connectionManager.size).to.equal(0) }) }) })