From 750ed9c35f095aa6e136a801ccd792f2190f38a1 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 14 Jul 2022 13:35:12 +0000 Subject: [PATCH] fix: add timeout for incoming connections and build-in protocols (#1292) Ensure that we don't wait forever for upgrading an inbound connection to occur. Note that transports should return an AbortableSource when passed an AbortSignal so outbound connections to not need the same fix. Also adds default timeouts for the ping, fetch, and identify protocols. --- examples/webrtc-direct/test.js | 4 +- src/config.ts | 10 +- src/connection-manager/dialer/index.ts | 20 +-- src/connection-manager/index.ts | 62 +++++++--- src/constants.ts | 5 + src/fetch/index.ts | 29 ++++- src/identify/index.ts | 23 ++-- src/libp2p.ts | 3 +- src/ping/index.ts | 32 +++-- src/upgrader.ts | 163 +++++++++++++++---------- test/connection-manager/index.node.ts | 6 +- test/connection-manager/index.spec.ts | 5 + test/dialing/direct.node.ts | 3 +- test/dialing/direct.spec.ts | 3 +- test/fetch/index.spec.ts | 6 +- test/identify/index.spec.ts | 6 +- test/identify/push.spec.ts | 6 +- test/ping/index.spec.ts | 6 +- test/registrar/registrar.spec.ts | 3 +- test/upgrading/upgrader.spec.ts | 24 ++-- 20 files changed, 271 insertions(+), 148 deletions(-) diff --git a/examples/webrtc-direct/test.js b/examples/webrtc-direct/test.js index a658e841..513a21e9 100644 --- a/examples/webrtc-direct/test.js +++ b/examples/webrtc-direct/test.js @@ -60,8 +60,8 @@ export async function test () { selector => { const text = document.querySelector(selector).innerText return text.includes('libp2p id is') && - text.includes('Found peer') && - text.includes('Connected to') + text.includes('Found peer 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m') && + text.includes('Connected to 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m') }, '#output', { timeout: 10000 } diff --git a/src/config.ts b/src/config.ts index f2619142..c82b9d1d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -26,6 +26,7 @@ const DefaultConfig: Partial = { maxParallelDials: Constants.MAX_PARALLEL_DIALS, maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS, dialTimeout: Constants.DIAL_TIMEOUT, + inboundUpgradeTimeout: Constants.INBOUND_UPGRADE_TIMEOUT, resolvers: { dnsaddr: dnsaddrResolver }, @@ -79,7 +80,8 @@ const DefaultConfig: Partial = { host: { agentVersion: AGENT_VERSION }, - timeout: 30000, + // https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48 + timeout: 60000, maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, @@ -88,12 +90,14 @@ const DefaultConfig: Partial = { ping: { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 10000 }, fetch: { protocolPrefix: 'libp2p', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 10000 } } diff --git a/src/connection-manager/dialer/index.ts b/src/connection-manager/dialer/index.ts index 30cfff51..bcdc15a4 100644 --- a/src/connection-manager/dialer/index.ts +++ b/src/connection-manager/dialer/index.ts @@ -177,7 +177,7 @@ export class Dialer implements Startable, Initializable { log('creating dial target for %p', id) - const dialTarget = await this._createCancellableDialTarget(id) + const dialTarget = await this._createCancellableDialTarget(id, options) if (dialTarget.addrs.length === 0) { throw errCode(new Error('The dial request has no valid addresses'), codes.ERR_NO_VALID_ADDRESSES) @@ -207,7 +207,7 @@ export class Dialer implements Startable, Initializable { * The dial to the first address that is successfully able to upgrade a connection * will be used. */ - async _createCancellableDialTarget (peer: PeerId): Promise { + async _createCancellableDialTarget (peer: PeerId, options: AbortOptions): Promise { // Make dial target promise cancellable const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString()}${Date.now()}` const cancellablePromise = new Promise((resolve, reject) => { @@ -216,7 +216,7 @@ export class Dialer implements Startable, Initializable { try { const dialTarget = await Promise.race([ - this._createDialTarget(peer), + this._createDialTarget(peer, options), cancellablePromise ]) @@ -232,7 +232,7 @@ export class Dialer implements Startable, Initializable { * If a multiaddr is received it should be the first address attempted. * Multiaddrs not supported by the available transports will be filtered out. */ - async _createDialTarget (peer: PeerId): Promise { + async _createDialTarget (peer: PeerId, options: AbortOptions): Promise { const knownAddrs = await pipe( await this.components.getPeerStore().addressBook.get(peer), (source) => filter(source, async (address) => { @@ -253,7 +253,7 @@ export class Dialer implements Startable, Initializable { const addrs: Multiaddr[] = [] for (const a of knownAddrs) { - const resolvedAddrs = await this._resolve(a) + const resolvedAddrs = await this._resolve(a, options) resolvedAddrs.forEach(ra => addrs.push(ra)) } @@ -341,7 +341,7 @@ export class Dialer implements Startable, Initializable { /** * Resolve multiaddr recursively */ - async _resolve (ma: Multiaddr): Promise { + async _resolve (ma: Multiaddr, options: AbortOptions): Promise { // TODO: recursive logic should live in multiaddr once dns4/dns6 support is in place // Now only supporting resolve for dnsaddr const resolvableProto = ma.protoNames().includes('dnsaddr') @@ -351,9 +351,9 @@ export class Dialer implements Startable, Initializable { return [ma] } - const resolvedMultiaddrs = await this._resolveRecord(ma) + const resolvedMultiaddrs = await this._resolveRecord(ma, options) const recursiveMultiaddrs = await Promise.all(resolvedMultiaddrs.map(async (nm) => { - return await this._resolve(nm) + return await this._resolve(nm, options) })) const addrs = recursiveMultiaddrs.flat() @@ -368,10 +368,10 @@ export class Dialer implements Startable, Initializable { /** * Resolve a given multiaddr. If this fails, an empty array will be returned */ - async _resolveRecord (ma: Multiaddr): Promise { + async _resolveRecord (ma: Multiaddr, options: AbortOptions): Promise { try { ma = new Multiaddr(ma.toString()) // Use current multiaddr module - const multiaddrs = await ma.resolve() + const multiaddrs = await ma.resolve(options) return multiaddrs } catch (err) { log.error(`multiaddr ${ma.toString()} could not be resolved`, err) diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 0b32c02e..ff635839 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -107,10 +107,17 @@ export interface ConnectionManagerInit { maxAddrsToDial?: number /** - * How long a dial attempt is allowed to take + * How long a dial attempt is allowed to take, including DNS resolution + * of the multiaddr, opening a socket and upgrading it to a Connection. */ dialTimeout?: number + /** + * When a new inbound connection is opened, the upgrade process (e.g. protect, + * encrypt, multiplex etc) must complete within this number of ms. + */ + inboundUpgradeTimeout: number + /** * Number of max concurrent dials per peer */ @@ -146,6 +153,7 @@ export class DefaultConnectionManager extends EventEmitter { + async openConnection (peerId: PeerId, options: AbortOptions = {}): Promise { log('dial to %p', peerId) const existingConnections = this.getConnections(peerId) @@ -496,30 +505,43 @@ export class DefaultConnectionManager extends EventEmitter { diff --git a/src/constants.ts b/src/constants.ts index fcb7e943..ac055129 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -4,6 +4,11 @@ */ export const DIAL_TIMEOUT = 30e3 +/** + * How long in ms an inbound connection upgrade is allowed to take + */ +export const INBOUND_UPGRADE_TIMEOUT = 30e3 + /** * Maximum allowed concurrent dials */ diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 04d7efc8..9add8e8f 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -10,10 +10,10 @@ import type { Stream } from '@libp2p/interface-connection' import type { IncomingStreamData } from '@libp2p/interface-registrar' import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' -import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' import { pipe } from 'it-pipe' import first from 'it-first' +import { TimeoutController } from 'timeout-abort-controller' const log = logger('libp2p:fetch') @@ -21,6 +21,11 @@ export interface FetchServiceInit { protocolPrefix: string maxInboundStreams: number maxOutboundStreams: number + + /** + * How long we should wait for a remote peer to send any data + */ + timeout: number } export interface HandleMessageOptions { @@ -86,14 +91,22 @@ export class FetchService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const stream = await connection.newStream([this.protocol], options) - let source: Duplex = stream + let timeoutController + let signal = options.signal - // make stream abortable if AbortSignal passed - if (options.signal != null) { - source = abortableDuplex(stream, options.signal) + // create a timeout if no abort signal passed + if (signal == null) { + timeoutController = new TimeoutController(this.init.timeout) + signal = timeoutController.signal } + const stream = await connection.newStream([this.protocol], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) + try { const result = await pipe( [FetchRequest.encode({ identifier: key })], @@ -129,6 +142,10 @@ export class FetchService implements Startable { return result ?? null } finally { + if (timeoutController != null) { + timeoutController.clear() + } + stream.close() } } diff --git a/src/identify/index.ts b/src/identify/index.ts index e921308d..8a871f19 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -30,9 +30,6 @@ import type { Duplex } from 'it-stream-types' const log = logger('libp2p:identify') -// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48 -const IDENTIFY_TIMEOUT = 60000 - // https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52 const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8 @@ -54,7 +51,7 @@ export interface IdentifyServiceInit { /** * How long we should wait for a remote peer to send their identify response */ - timeout?: number + timeout: number /** * Identify responses larger than this in bytes will be rejected (default: 8192) @@ -167,7 +164,7 @@ export class IdentifyService implements Startable { const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId()) const pushes = connections.map(async connection => { - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) let stream: Stream | undefined try { @@ -229,19 +226,21 @@ export class IdentifyService implements Startable { } async _identify (connection: Connection, options: AbortOptions = {}): Promise { - const stream = await connection.newStream([this.identifyProtocolStr], options) - let source: Duplex = stream let timeoutController let signal = options.signal // create a timeout if no abort signal passed if (signal == null) { - timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + timeoutController = new TimeoutController(this.init.timeout) signal = timeoutController.signal } - // make stream abortable if AbortSignal passed - source = abortableDuplex(stream, signal) + const stream = await connection.newStream([this.identifyProtocolStr], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) try { const data = await pipe( @@ -370,7 +369,7 @@ export class IdentifyService implements Startable { */ async _handleIdentify (data: IncomingStreamData) { const { connection, stream } = data - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) try { const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0) @@ -421,7 +420,7 @@ export class IdentifyService implements Startable { */ async _handlePush (data: IncomingStreamData) { const { connection, stream } = data - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) let message: Identify | undefined try { diff --git a/src/libp2p.ts b/src/libp2p.ts index 002075c2..d4ae584b 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -124,7 +124,8 @@ export class Libp2pNode extends EventEmitter implements Libp2p { // Set up the Upgrader this.components.setUpgrader(new DefaultUpgrader(this.components, { connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)), - muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)) + muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)), + inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout })) // Create the Connection Manager diff --git a/src/ping/index.ts b/src/ping/index.ts index f755bd51..1fc1567a 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -11,8 +11,8 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' -import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' +import { TimeoutController } from 'timeout-abort-controller' const log = logger('libp2p:ping') @@ -20,6 +20,11 @@ export interface PingServiceInit { protocolPrefix: string maxInboundStreams: number maxOutboundStreams: number + + /** + * How long we should wait for a ping response + */ + timeout: number } export class PingService implements Startable { @@ -73,18 +78,25 @@ export class PingService implements Startable { async ping (peer: PeerId, options: AbortOptions = {}): Promise { log('dialing %s to %p', this.protocol, peer) - const connection = await this.components.getConnectionManager().openConnection(peer, options) - const stream = await connection.newStream([this.protocol], options) const start = Date.now() const data = randomBytes(PING_LENGTH) + const connection = await this.components.getConnectionManager().openConnection(peer, options) + let timeoutController + let signal = options.signal - let source: Duplex = stream - - // make stream abortable if AbortSignal passed - if (options.signal != null) { - source = abortableDuplex(stream, options.signal) + // create a timeout if no abort signal passed + if (signal == null) { + timeoutController = new TimeoutController(this.init.timeout) + signal = timeoutController.signal } + const stream = await connection.newStream([this.protocol], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) + try { const result = await pipe( [data], @@ -99,6 +111,10 @@ export class PingService implements Startable { return end - start } finally { + if (timeoutController != null) { + timeoutController.clear() + } + stream.close() } } diff --git a/src/upgrader.ts b/src/upgrader.ts index 9cf8fc50..80cb742f 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -18,6 +18,8 @@ import { Components, isInitializable } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import type { Registrar } from '@libp2p/interface-registrar' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' +import { TimeoutController } from 'timeout-abort-controller' +import { abortableDuplex } from 'abortable-iterator' const log = logger('libp2p:upgrader') @@ -43,6 +45,12 @@ export interface CryptoResult extends SecuredConnection { export interface UpgraderInit { connectionEncryption: ConnectionEncrypter[] muxers: StreamMuxerFactory[] + + /** + * An amount of ms by which an inbound connection upgrade + * must complete + */ + inboundUpgradeTimeout: number } function findIncomingStreamLimit (protocol: string, registrar: Registrar) { @@ -89,6 +97,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg private readonly components: Components private readonly connectionEncryption: Map private readonly muxers: Map + private readonly inboundUpgradeTimeout: number constructor (components: Components, init: UpgraderInit) { super() @@ -105,6 +114,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg init.muxers.forEach(muxer => { this.muxers.set(muxer.protocol, muxer) }) + + this.inboundUpgradeTimeout = init.inboundUpgradeTimeout } /** @@ -120,82 +131,92 @@ export class DefaultUpgrader extends EventEmitter implements Upg let proxyPeer const metrics = this.components.getMetrics() - if (await this.components.getConnectionGater().denyInboundConnection(maConn)) { - throw errCode(new Error('The multiaddr connection is blocked by gater.acceptConnection'), codes.ERR_CONNECTION_INTERCEPTED) - } - - if (metrics != null) { - ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) - const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` - setPeer({ toString: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) - } - - log('starting the inbound connection upgrade') - - // Protect - let protectedConn = maConn - const protector = this.components.getConnectionProtector() - - if (protector != null) { - log('protecting the inbound connection') - protectedConn = await protector.protect(maConn) - } + const timeoutController = new TimeoutController(this.inboundUpgradeTimeout) try { - // Encrypt the connection - ({ - conn: encryptedConn, - remotePeer, - protocol: cryptoProtocol - } = await this._encryptInbound(protectedConn)) + const abortableStream = abortableDuplex(maConn, timeoutController.signal) + maConn.source = abortableStream.source + maConn.sink = abortableStream.sink - if (await this.components.getConnectionGater().denyInboundEncryptedConnection(remotePeer, { + if (await this.components.getConnectionGater().denyInboundConnection(maConn)) { + throw errCode(new Error('The multiaddr connection is blocked by gater.acceptConnection'), codes.ERR_CONNECTION_INTERCEPTED) + } + + if (metrics != null) { + ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) + const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` + setPeer({ toString: () => idString }) + maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + } + + log('starting the inbound connection upgrade') + + // Protect + let protectedConn = maConn + const protector = this.components.getConnectionProtector() + + if (protector != null) { + log('protecting the inbound connection') + protectedConn = await protector.protect(maConn) + } + + try { + // Encrypt the connection + ({ + conn: encryptedConn, + remotePeer, + protocol: cryptoProtocol + } = await this._encryptInbound(protectedConn)) + + if (await this.components.getConnectionGater().denyInboundEncryptedConnection(remotePeer, { + ...protectedConn, + ...encryptedConn + })) { + throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) + } + + // Multiplex the connection + if (this.muxers.size > 0) { + const multiplexed = await this._multiplexInbound({ + ...protectedConn, + ...encryptedConn + }, this.muxers) + muxerFactory = multiplexed.muxerFactory + upgradedConn = multiplexed.stream + } else { + upgradedConn = encryptedConn + } + } catch (err: any) { + log.error('Failed to upgrade inbound connection', err) + await maConn.close(err) + throw err + } + + if (await this.components.getConnectionGater().denyInboundUpgradedConnection(remotePeer, { ...protectedConn, ...encryptedConn })) { throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) } - // Multiplex the connection - if (this.muxers.size > 0) { - const multiplexed = await this._multiplexInbound({ - ...protectedConn, - ...encryptedConn - }, this.muxers) - muxerFactory = multiplexed.muxerFactory - upgradedConn = multiplexed.stream - } else { - upgradedConn = encryptedConn + if (metrics != null) { + metrics.updatePlaceholder(proxyPeer, remotePeer) + setPeer(remotePeer) } - } catch (err: any) { - log.error('Failed to upgrade inbound connection', err) - await maConn.close(err) - throw err + + log('Successfully upgraded inbound connection') + + return this._createConnection({ + cryptoProtocol, + direction: 'inbound', + maConn, + upgradedConn, + muxerFactory, + remotePeer + }) + } finally { + timeoutController.clear() } - - if (await this.components.getConnectionGater().denyInboundUpgradedConnection(remotePeer, { - ...protectedConn, - ...encryptedConn - })) { - throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) - } - - if (metrics != null) { - metrics.updatePlaceholder(proxyPeer, remotePeer) - setPeer(remotePeer) - } - - log('Successfully upgraded inbound connection') - - return this._createConnection({ - cryptoProtocol, - direction: 'inbound', - maConn, - upgradedConn, - muxerFactory, - remotePeer - }) } /** @@ -378,8 +399,16 @@ export class DefaultUpgrader extends EventEmitter implements Upg const muxedStream = muxer.newStream() const mss = new Dialer(muxedStream) const metrics = this.components.getMetrics() + let controller: TimeoutController | undefined try { + if (options.signal == null) { + log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols) + + controller = new TimeoutController(30000) + options.signal = controller.signal + } + let { stream, protocol } = await mss.select(protocols, options) if (metrics != null) { @@ -415,6 +444,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg } throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) + } finally { + if (controller != null) { + controller.clear() + } } } diff --git a/test/connection-manager/index.node.ts b/test/connection-manager/index.node.ts index 6e5d693c..69fc2b23 100644 --- a/test/connection-manager/index.node.ts +++ b/test/connection-manager/index.node.ts @@ -54,7 +54,8 @@ describe('Connection Manager', () => { const connectionManager = new DefaultConnectionManager({ maxConnections: 1000, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) connectionManager.init(new Components({ upgrader, peerStore })) @@ -89,7 +90,8 @@ describe('Connection Manager', () => { const connectionManager = new DefaultConnectionManager({ maxConnections: 1000, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) connectionManager.init(new Components({ upgrader, peerStore })) diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index c4d9cc0a..e1ad4436 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -10,6 +10,7 @@ import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/int import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { CustomEvent } from '@libp2p/interfaces/events' import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags' +import pWaitFor from 'p-wait-for' describe('Connection Manager', () => { let libp2p: Libp2pNode @@ -163,6 +164,10 @@ describe('Connection Manager', () => { await libp2p.stop() await libp2p.start() + await pWaitFor(() => connectionManagerOpenConnectionSpy.called, { + interval: 100 + }) + expect(connectionManagerOpenConnectionSpy.called).to.be.true('Did not attempt to connect to important peer') expect(connectionManagerOpenConnectionSpy.getCall(0).args[0].toString()).to.equal(peerId.toString(), 'Attempted to connect to the wrong peer') }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 428aba06..19091bf3 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -74,7 +74,8 @@ describe('Dialing (direct, TCP)', () => { localComponents.setConnectionManager(new DefaultConnectionManager({ maxConnections: 100, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 })) localTM = new DefaultTransportManager(localComponents) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index 1af0e7c6..846800a9 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -52,7 +52,8 @@ describe('Dialing (direct, WebSockets)', () => { localComponents.setConnectionManager(new DefaultConnectionManager({ maxConnections: 100, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 })) localTM = new DefaultTransportManager(localComponents) diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts index 555782c2..8ef13dd3 100644 --- a/test/fetch/index.spec.ts +++ b/test/fetch/index.spec.ts @@ -19,7 +19,8 @@ import { MemoryDatastore } from 'datastore-core' const defaultInit: FetchServiceInit = { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 1000 } async function createComponents (index: number) { @@ -34,7 +35,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index 3d6623a0..b9aefdff 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -40,7 +40,8 @@ const defaultInit: IdentifyServiceInit = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + timeout: 1000 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -58,7 +59,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) components.setAddressManager(new DefaultAddressManager(components, { diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts index 9dbf95c8..312a07d0 100644 --- a/test/identify/push.spec.ts +++ b/test/identify/push.spec.ts @@ -35,7 +35,8 @@ const defaultInit: IdentifyServiceInit = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + timeout: 1000 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -53,7 +54,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) components.setAddressManager(new DefaultAddressManager(components, { diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts index fe885df4..9e17d0b6 100644 --- a/test/ping/index.spec.ts +++ b/test/ping/index.spec.ts @@ -19,7 +19,8 @@ import { MemoryDatastore } from 'datastore-core' const defaultInit: PingServiceInit = { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 1000 } async function createComponents (index: number) { @@ -34,7 +35,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) diff --git a/test/registrar/registrar.spec.ts b/test/registrar/registrar.spec.ts index 4848c64a..080ecef3 100644 --- a/test/registrar/registrar.spec.ts +++ b/test/registrar/registrar.spec.ts @@ -43,7 +43,8 @@ describe('registrar', () => { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) registrar = new DefaultRegistrar(components) diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 6951d39b..e21ca282 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -66,7 +66,8 @@ describe('Upgrader', () => { ], muxers: [ localMuxerFactory - ] + ], + inboundUpgradeTimeout: 1000 }) remoteComponents = new Components({ @@ -80,7 +81,8 @@ describe('Upgrader', () => { ], muxers: [ new Mplex() - ] + ], + inboundUpgradeTimeout: 1000 }) await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { @@ -137,13 +139,15 @@ describe('Upgrader', () => { connectionEncryption: [ new Plaintext() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ new Plaintext() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) const connections = await Promise.all([ @@ -214,13 +218,15 @@ describe('Upgrader', () => { connectionEncryption: [ new BoomCrypto() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ new BoomCrypto() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) // Wait for the results of each side of the connection @@ -266,7 +272,8 @@ describe('Upgrader', () => { ], muxers: [ new OtherMuxerFactory() - ] + ], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ @@ -274,7 +281,8 @@ describe('Upgrader', () => { ], muxers: [ new Mplex() - ] + ], + inboundUpgradeTimeout: 1000 }) // Wait for the results of each side of the connection