1
0
mirror of https://github.com/fluencelabs/js-libp2p synced 2025-03-31 14:51:05 +00:00
Alex Potsides de30c2cec7
feat!: limit protocol streams per-connection ()
* feat: limit protocol streams per-connection

Uses the `maxInboundStreams` and `maxOutboundStreams` of the `registrar.handle`
opts to limit the number of concurrent streams open on each connection
on a per-protocol basis.

Both values default to 1 so some tuning will be necessary to set
appropriate values for some protocols.

* chore: make error codes consistent

* chore: fix up examples
2022-06-17 15:46:31 +02:00

136 lines
4.3 KiB
TypeScript

/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { FetchService, FetchServiceInit } from '../../src/fetch/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/components'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { start, stop } from '@libp2p/interfaces/startable'
import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
const defaultInit: FetchServiceInit = {
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
}
async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index])
const components = new Components({
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
})
return components
}
describe('fetch', () => {
let localComponents: Components
let remoteComponents: Components
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
await Promise.all([
start(localComponents),
start(remoteComponents)
])
})
afterEach(async () => {
sinon.restore()
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})
it('should be able to fetch from another peer', async () => {
const key = 'key'
const value = Uint8Array.from([0, 1, 2, 3, 4])
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
remoteFetch.registerLookupFunction(key, async (identifier) => {
expect(identifier).to.equal(key)
return value
})
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// Run fetch
const result = await localFetch.fetch(remoteComponents.getPeerId(), key)
expect(result).to.equalBytes(value)
})
it('should time out fetching from another peer when waiting for the record', async () => {
const key = 'key'
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol)
await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => {
void pipe(
stream,
async function * (source) {
for await (const chunk of source) {
// longer than the timeout
await delay(1000)
yield chunk
}
},
stream
)
})
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// Run fetch, should time out
await expect(localFetch.fetch(remoteComponents.getPeerId(), key, {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
})
})