diff --git a/examples/webrtc-direct/test.js b/examples/webrtc-direct/test.js index a658e841..513a21e9 100644 --- a/examples/webrtc-direct/test.js +++ b/examples/webrtc-direct/test.js @@ -60,8 +60,8 @@ export async function test () { selector => { const text = document.querySelector(selector).innerText return text.includes('libp2p id is') && - text.includes('Found peer') && - text.includes('Connected to') + text.includes('Found peer 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m') && + text.includes('Connected to 12D3KooWCuo3MdXfMgaqpLC5Houi1TRoFqgK9aoxok4NK5udMu8m') }, '#output', { timeout: 10000 } diff --git a/src/config.ts b/src/config.ts index f2619142..c82b9d1d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -26,6 +26,7 @@ const DefaultConfig: Partial = { maxParallelDials: Constants.MAX_PARALLEL_DIALS, maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS, dialTimeout: Constants.DIAL_TIMEOUT, + inboundUpgradeTimeout: Constants.INBOUND_UPGRADE_TIMEOUT, resolvers: { dnsaddr: dnsaddrResolver }, @@ -79,7 +80,8 @@ const DefaultConfig: Partial = { host: { agentVersion: AGENT_VERSION }, - timeout: 30000, + // https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48 + timeout: 60000, maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, @@ -88,12 +90,14 @@ const DefaultConfig: Partial = { ping: { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 10000 }, fetch: { protocolPrefix: 'libp2p', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 10000 } } diff --git a/src/connection-manager/dialer/index.ts b/src/connection-manager/dialer/index.ts index 30cfff51..bcdc15a4 100644 --- a/src/connection-manager/dialer/index.ts +++ b/src/connection-manager/dialer/index.ts @@ -177,7 +177,7 @@ export class Dialer implements Startable, Initializable { log('creating dial target for %p', id) - const dialTarget = await this._createCancellableDialTarget(id) + const dialTarget = await this._createCancellableDialTarget(id, options) if (dialTarget.addrs.length === 0) { throw errCode(new Error('The dial request has no valid addresses'), codes.ERR_NO_VALID_ADDRESSES) @@ -207,7 +207,7 @@ export class Dialer implements Startable, Initializable { * The dial to the first address that is successfully able to upgrade a connection * will be used. */ - async _createCancellableDialTarget (peer: PeerId): Promise { + async _createCancellableDialTarget (peer: PeerId, options: AbortOptions): Promise { // Make dial target promise cancellable const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString()}${Date.now()}` const cancellablePromise = new Promise((resolve, reject) => { @@ -216,7 +216,7 @@ export class Dialer implements Startable, Initializable { try { const dialTarget = await Promise.race([ - this._createDialTarget(peer), + this._createDialTarget(peer, options), cancellablePromise ]) @@ -232,7 +232,7 @@ export class Dialer implements Startable, Initializable { * If a multiaddr is received it should be the first address attempted. * Multiaddrs not supported by the available transports will be filtered out. */ - async _createDialTarget (peer: PeerId): Promise { + async _createDialTarget (peer: PeerId, options: AbortOptions): Promise { const knownAddrs = await pipe( await this.components.getPeerStore().addressBook.get(peer), (source) => filter(source, async (address) => { @@ -253,7 +253,7 @@ export class Dialer implements Startable, Initializable { const addrs: Multiaddr[] = [] for (const a of knownAddrs) { - const resolvedAddrs = await this._resolve(a) + const resolvedAddrs = await this._resolve(a, options) resolvedAddrs.forEach(ra => addrs.push(ra)) } @@ -341,7 +341,7 @@ export class Dialer implements Startable, Initializable { /** * Resolve multiaddr recursively */ - async _resolve (ma: Multiaddr): Promise { + async _resolve (ma: Multiaddr, options: AbortOptions): Promise { // TODO: recursive logic should live in multiaddr once dns4/dns6 support is in place // Now only supporting resolve for dnsaddr const resolvableProto = ma.protoNames().includes('dnsaddr') @@ -351,9 +351,9 @@ export class Dialer implements Startable, Initializable { return [ma] } - const resolvedMultiaddrs = await this._resolveRecord(ma) + const resolvedMultiaddrs = await this._resolveRecord(ma, options) const recursiveMultiaddrs = await Promise.all(resolvedMultiaddrs.map(async (nm) => { - return await this._resolve(nm) + return await this._resolve(nm, options) })) const addrs = recursiveMultiaddrs.flat() @@ -368,10 +368,10 @@ export class Dialer implements Startable, Initializable { /** * Resolve a given multiaddr. If this fails, an empty array will be returned */ - async _resolveRecord (ma: Multiaddr): Promise { + async _resolveRecord (ma: Multiaddr, options: AbortOptions): Promise { try { ma = new Multiaddr(ma.toString()) // Use current multiaddr module - const multiaddrs = await ma.resolve() + const multiaddrs = await ma.resolve(options) return multiaddrs } catch (err) { log.error(`multiaddr ${ma.toString()} could not be resolved`, err) diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 0b32c02e..ff635839 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -107,10 +107,17 @@ export interface ConnectionManagerInit { maxAddrsToDial?: number /** - * How long a dial attempt is allowed to take + * How long a dial attempt is allowed to take, including DNS resolution + * of the multiaddr, opening a socket and upgrading it to a Connection. */ dialTimeout?: number + /** + * When a new inbound connection is opened, the upgrade process (e.g. protect, + * encrypt, multiplex etc) must complete within this number of ms. + */ + inboundUpgradeTimeout: number + /** * Number of max concurrent dials per peer */ @@ -146,6 +153,7 @@ export class DefaultConnectionManager extends EventEmitter { + async openConnection (peerId: PeerId, options: AbortOptions = {}): Promise { log('dial to %p', peerId) const existingConnections = this.getConnections(peerId) @@ -496,30 +505,43 @@ export class DefaultConnectionManager extends EventEmitter { diff --git a/src/constants.ts b/src/constants.ts index fcb7e943..ac055129 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -4,6 +4,11 @@ */ export const DIAL_TIMEOUT = 30e3 +/** + * How long in ms an inbound connection upgrade is allowed to take + */ +export const INBOUND_UPGRADE_TIMEOUT = 30e3 + /** * Maximum allowed concurrent dials */ diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 04d7efc8..9add8e8f 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -10,10 +10,10 @@ import type { Stream } from '@libp2p/interface-connection' import type { IncomingStreamData } from '@libp2p/interface-registrar' import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' -import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' import { pipe } from 'it-pipe' import first from 'it-first' +import { TimeoutController } from 'timeout-abort-controller' const log = logger('libp2p:fetch') @@ -21,6 +21,11 @@ export interface FetchServiceInit { protocolPrefix: string maxInboundStreams: number maxOutboundStreams: number + + /** + * How long we should wait for a remote peer to send any data + */ + timeout: number } export interface HandleMessageOptions { @@ -86,14 +91,22 @@ export class FetchService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const stream = await connection.newStream([this.protocol], options) - let source: Duplex = stream + let timeoutController + let signal = options.signal - // make stream abortable if AbortSignal passed - if (options.signal != null) { - source = abortableDuplex(stream, options.signal) + // create a timeout if no abort signal passed + if (signal == null) { + timeoutController = new TimeoutController(this.init.timeout) + signal = timeoutController.signal } + const stream = await connection.newStream([this.protocol], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) + try { const result = await pipe( [FetchRequest.encode({ identifier: key })], @@ -129,6 +142,10 @@ export class FetchService implements Startable { return result ?? null } finally { + if (timeoutController != null) { + timeoutController.clear() + } + stream.close() } } diff --git a/src/identify/index.ts b/src/identify/index.ts index e921308d..8a871f19 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -30,9 +30,6 @@ import type { Duplex } from 'it-stream-types' const log = logger('libp2p:identify') -// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48 -const IDENTIFY_TIMEOUT = 60000 - // https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52 const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8 @@ -54,7 +51,7 @@ export interface IdentifyServiceInit { /** * How long we should wait for a remote peer to send their identify response */ - timeout?: number + timeout: number /** * Identify responses larger than this in bytes will be rejected (default: 8192) @@ -167,7 +164,7 @@ export class IdentifyService implements Startable { const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId()) const pushes = connections.map(async connection => { - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) let stream: Stream | undefined try { @@ -229,19 +226,21 @@ export class IdentifyService implements Startable { } async _identify (connection: Connection, options: AbortOptions = {}): Promise { - const stream = await connection.newStream([this.identifyProtocolStr], options) - let source: Duplex = stream let timeoutController let signal = options.signal // create a timeout if no abort signal passed if (signal == null) { - timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + timeoutController = new TimeoutController(this.init.timeout) signal = timeoutController.signal } - // make stream abortable if AbortSignal passed - source = abortableDuplex(stream, signal) + const stream = await connection.newStream([this.identifyProtocolStr], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) try { const data = await pipe( @@ -370,7 +369,7 @@ export class IdentifyService implements Startable { */ async _handleIdentify (data: IncomingStreamData) { const { connection, stream } = data - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) try { const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0) @@ -421,7 +420,7 @@ export class IdentifyService implements Startable { */ async _handlePush (data: IncomingStreamData) { const { connection, stream } = data - const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + const timeoutController = new TimeoutController(this.init.timeout) let message: Identify | undefined try { diff --git a/src/libp2p.ts b/src/libp2p.ts index 002075c2..d4ae584b 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -124,7 +124,8 @@ export class Libp2pNode extends EventEmitter implements Libp2p { // Set up the Upgrader this.components.setUpgrader(new DefaultUpgrader(this.components, { connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)), - muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)) + muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)), + inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout })) // Create the Connection Manager diff --git a/src/ping/index.ts b/src/ping/index.ts index f755bd51..1fc1567a 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -11,8 +11,8 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' -import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' +import { TimeoutController } from 'timeout-abort-controller' const log = logger('libp2p:ping') @@ -20,6 +20,11 @@ export interface PingServiceInit { protocolPrefix: string maxInboundStreams: number maxOutboundStreams: number + + /** + * How long we should wait for a ping response + */ + timeout: number } export class PingService implements Startable { @@ -73,18 +78,25 @@ export class PingService implements Startable { async ping (peer: PeerId, options: AbortOptions = {}): Promise { log('dialing %s to %p', this.protocol, peer) - const connection = await this.components.getConnectionManager().openConnection(peer, options) - const stream = await connection.newStream([this.protocol], options) const start = Date.now() const data = randomBytes(PING_LENGTH) + const connection = await this.components.getConnectionManager().openConnection(peer, options) + let timeoutController + let signal = options.signal - let source: Duplex = stream - - // make stream abortable if AbortSignal passed - if (options.signal != null) { - source = abortableDuplex(stream, options.signal) + // create a timeout if no abort signal passed + if (signal == null) { + timeoutController = new TimeoutController(this.init.timeout) + signal = timeoutController.signal } + const stream = await connection.newStream([this.protocol], { + signal + }) + + // make stream abortable + const source = abortableDuplex(stream, signal) + try { const result = await pipe( [data], @@ -99,6 +111,10 @@ export class PingService implements Startable { return end - start } finally { + if (timeoutController != null) { + timeoutController.clear() + } + stream.close() } } diff --git a/src/upgrader.ts b/src/upgrader.ts index 9cf8fc50..80cb742f 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -18,6 +18,8 @@ import { Components, isInitializable } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import type { Registrar } from '@libp2p/interface-registrar' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' +import { TimeoutController } from 'timeout-abort-controller' +import { abortableDuplex } from 'abortable-iterator' const log = logger('libp2p:upgrader') @@ -43,6 +45,12 @@ export interface CryptoResult extends SecuredConnection { export interface UpgraderInit { connectionEncryption: ConnectionEncrypter[] muxers: StreamMuxerFactory[] + + /** + * An amount of ms by which an inbound connection upgrade + * must complete + */ + inboundUpgradeTimeout: number } function findIncomingStreamLimit (protocol: string, registrar: Registrar) { @@ -89,6 +97,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg private readonly components: Components private readonly connectionEncryption: Map private readonly muxers: Map + private readonly inboundUpgradeTimeout: number constructor (components: Components, init: UpgraderInit) { super() @@ -105,6 +114,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg init.muxers.forEach(muxer => { this.muxers.set(muxer.protocol, muxer) }) + + this.inboundUpgradeTimeout = init.inboundUpgradeTimeout } /** @@ -120,82 +131,92 @@ export class DefaultUpgrader extends EventEmitter implements Upg let proxyPeer const metrics = this.components.getMetrics() - if (await this.components.getConnectionGater().denyInboundConnection(maConn)) { - throw errCode(new Error('The multiaddr connection is blocked by gater.acceptConnection'), codes.ERR_CONNECTION_INTERCEPTED) - } - - if (metrics != null) { - ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) - const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` - setPeer({ toString: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) - } - - log('starting the inbound connection upgrade') - - // Protect - let protectedConn = maConn - const protector = this.components.getConnectionProtector() - - if (protector != null) { - log('protecting the inbound connection') - protectedConn = await protector.protect(maConn) - } + const timeoutController = new TimeoutController(this.inboundUpgradeTimeout) try { - // Encrypt the connection - ({ - conn: encryptedConn, - remotePeer, - protocol: cryptoProtocol - } = await this._encryptInbound(protectedConn)) + const abortableStream = abortableDuplex(maConn, timeoutController.signal) + maConn.source = abortableStream.source + maConn.sink = abortableStream.sink - if (await this.components.getConnectionGater().denyInboundEncryptedConnection(remotePeer, { + if (await this.components.getConnectionGater().denyInboundConnection(maConn)) { + throw errCode(new Error('The multiaddr connection is blocked by gater.acceptConnection'), codes.ERR_CONNECTION_INTERCEPTED) + } + + if (metrics != null) { + ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) + const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` + setPeer({ toString: () => idString }) + maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + } + + log('starting the inbound connection upgrade') + + // Protect + let protectedConn = maConn + const protector = this.components.getConnectionProtector() + + if (protector != null) { + log('protecting the inbound connection') + protectedConn = await protector.protect(maConn) + } + + try { + // Encrypt the connection + ({ + conn: encryptedConn, + remotePeer, + protocol: cryptoProtocol + } = await this._encryptInbound(protectedConn)) + + if (await this.components.getConnectionGater().denyInboundEncryptedConnection(remotePeer, { + ...protectedConn, + ...encryptedConn + })) { + throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) + } + + // Multiplex the connection + if (this.muxers.size > 0) { + const multiplexed = await this._multiplexInbound({ + ...protectedConn, + ...encryptedConn + }, this.muxers) + muxerFactory = multiplexed.muxerFactory + upgradedConn = multiplexed.stream + } else { + upgradedConn = encryptedConn + } + } catch (err: any) { + log.error('Failed to upgrade inbound connection', err) + await maConn.close(err) + throw err + } + + if (await this.components.getConnectionGater().denyInboundUpgradedConnection(remotePeer, { ...protectedConn, ...encryptedConn })) { throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) } - // Multiplex the connection - if (this.muxers.size > 0) { - const multiplexed = await this._multiplexInbound({ - ...protectedConn, - ...encryptedConn - }, this.muxers) - muxerFactory = multiplexed.muxerFactory - upgradedConn = multiplexed.stream - } else { - upgradedConn = encryptedConn + if (metrics != null) { + metrics.updatePlaceholder(proxyPeer, remotePeer) + setPeer(remotePeer) } - } catch (err: any) { - log.error('Failed to upgrade inbound connection', err) - await maConn.close(err) - throw err + + log('Successfully upgraded inbound connection') + + return this._createConnection({ + cryptoProtocol, + direction: 'inbound', + maConn, + upgradedConn, + muxerFactory, + remotePeer + }) + } finally { + timeoutController.clear() } - - if (await this.components.getConnectionGater().denyInboundUpgradedConnection(remotePeer, { - ...protectedConn, - ...encryptedConn - })) { - throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED) - } - - if (metrics != null) { - metrics.updatePlaceholder(proxyPeer, remotePeer) - setPeer(remotePeer) - } - - log('Successfully upgraded inbound connection') - - return this._createConnection({ - cryptoProtocol, - direction: 'inbound', - maConn, - upgradedConn, - muxerFactory, - remotePeer - }) } /** @@ -378,8 +399,16 @@ export class DefaultUpgrader extends EventEmitter implements Upg const muxedStream = muxer.newStream() const mss = new Dialer(muxedStream) const metrics = this.components.getMetrics() + let controller: TimeoutController | undefined try { + if (options.signal == null) { + log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols) + + controller = new TimeoutController(30000) + options.signal = controller.signal + } + let { stream, protocol } = await mss.select(protocols, options) if (metrics != null) { @@ -415,6 +444,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg } throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) + } finally { + if (controller != null) { + controller.clear() + } } } diff --git a/test/connection-manager/index.node.ts b/test/connection-manager/index.node.ts index 6e5d693c..69fc2b23 100644 --- a/test/connection-manager/index.node.ts +++ b/test/connection-manager/index.node.ts @@ -54,7 +54,8 @@ describe('Connection Manager', () => { const connectionManager = new DefaultConnectionManager({ maxConnections: 1000, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) connectionManager.init(new Components({ upgrader, peerStore })) @@ -89,7 +90,8 @@ describe('Connection Manager', () => { const connectionManager = new DefaultConnectionManager({ maxConnections: 1000, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) connectionManager.init(new Components({ upgrader, peerStore })) diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index c4d9cc0a..e1ad4436 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -10,6 +10,7 @@ import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/int import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { CustomEvent } from '@libp2p/interfaces/events' import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags' +import pWaitFor from 'p-wait-for' describe('Connection Manager', () => { let libp2p: Libp2pNode @@ -163,6 +164,10 @@ describe('Connection Manager', () => { await libp2p.stop() await libp2p.start() + await pWaitFor(() => connectionManagerOpenConnectionSpy.called, { + interval: 100 + }) + expect(connectionManagerOpenConnectionSpy.called).to.be.true('Did not attempt to connect to important peer') expect(connectionManagerOpenConnectionSpy.getCall(0).args[0].toString()).to.equal(peerId.toString(), 'Attempted to connect to the wrong peer') }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 428aba06..19091bf3 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -74,7 +74,8 @@ describe('Dialing (direct, TCP)', () => { localComponents.setConnectionManager(new DefaultConnectionManager({ maxConnections: 100, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 })) localTM = new DefaultTransportManager(localComponents) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index 1af0e7c6..846800a9 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -52,7 +52,8 @@ describe('Dialing (direct, WebSockets)', () => { localComponents.setConnectionManager(new DefaultConnectionManager({ maxConnections: 100, minConnections: 50, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 })) localTM = new DefaultTransportManager(localComponents) diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts index 555782c2..8ef13dd3 100644 --- a/test/fetch/index.spec.ts +++ b/test/fetch/index.spec.ts @@ -19,7 +19,8 @@ import { MemoryDatastore } from 'datastore-core' const defaultInit: FetchServiceInit = { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 1000 } async function createComponents (index: number) { @@ -34,7 +35,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index 3d6623a0..b9aefdff 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -40,7 +40,8 @@ const defaultInit: IdentifyServiceInit = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + timeout: 1000 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -58,7 +59,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) components.setAddressManager(new DefaultAddressManager(components, { diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts index 9dbf95c8..312a07d0 100644 --- a/test/identify/push.spec.ts +++ b/test/identify/push.spec.ts @@ -35,7 +35,8 @@ const defaultInit: IdentifyServiceInit = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + timeout: 1000 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -53,7 +54,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) components.setAddressManager(new DefaultAddressManager(components, { diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts index fe885df4..9e17d0b6 100644 --- a/test/ping/index.spec.ts +++ b/test/ping/index.spec.ts @@ -19,7 +19,8 @@ import { MemoryDatastore } from 'datastore-core' const defaultInit: PingServiceInit = { protocolPrefix: 'ipfs', maxInboundStreams: 1, - maxOutboundStreams: 1 + maxOutboundStreams: 1, + timeout: 1000 } async function createComponents (index: number) { @@ -34,7 +35,8 @@ async function createComponents (index: number) { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) diff --git a/test/registrar/registrar.spec.ts b/test/registrar/registrar.spec.ts index 4848c64a..080ecef3 100644 --- a/test/registrar/registrar.spec.ts +++ b/test/registrar/registrar.spec.ts @@ -43,7 +43,8 @@ describe('registrar', () => { connectionManager: new DefaultConnectionManager({ minConnections: 50, maxConnections: 1000, - autoDialInterval: 1000 + autoDialInterval: 1000, + inboundUpgradeTimeout: 1000 }) }) registrar = new DefaultRegistrar(components) diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 6951d39b..e21ca282 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -66,7 +66,8 @@ describe('Upgrader', () => { ], muxers: [ localMuxerFactory - ] + ], + inboundUpgradeTimeout: 1000 }) remoteComponents = new Components({ @@ -80,7 +81,8 @@ describe('Upgrader', () => { ], muxers: [ new Mplex() - ] + ], + inboundUpgradeTimeout: 1000 }) await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { @@ -137,13 +139,15 @@ describe('Upgrader', () => { connectionEncryption: [ new Plaintext() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ new Plaintext() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) const connections = await Promise.all([ @@ -214,13 +218,15 @@ describe('Upgrader', () => { connectionEncryption: [ new BoomCrypto() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ new BoomCrypto() ], - muxers: [] + muxers: [], + inboundUpgradeTimeout: 1000 }) // Wait for the results of each side of the connection @@ -266,7 +272,8 @@ describe('Upgrader', () => { ], muxers: [ new OtherMuxerFactory() - ] + ], + inboundUpgradeTimeout: 1000 }) remoteUpgrader = new DefaultUpgrader(remoteComponents, { connectionEncryption: [ @@ -274,7 +281,8 @@ describe('Upgrader', () => { ], muxers: [ new Mplex() - ] + ], + inboundUpgradeTimeout: 1000 }) // Wait for the results of each side of the connection