js-libp2p/test/fetch/index.spec.ts
Alex Potsides 2836acc90f
fix: use keep-alive tag to reconnect to peers on startup (#1278)
Instead of trying to connect to every peer in the peer store when
we start a node, only connect to the peers that have been marked
with a `keep-alive` tag.
2022-06-28 08:05:16 +01:00

140 lines
4.4 KiB
TypeScript

/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { FetchService, FetchServiceInit } from '../../src/fetch/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/components'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { start, stop } from '@libp2p/interfaces/startable'
import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'
const defaultInit: FetchServiceInit = {
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
}
async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index])
const components = new Components({
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
})
return components
}
describe('fetch', () => {
let localComponents: Components
let remoteComponents: Components
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
await Promise.all([
start(localComponents),
start(remoteComponents)
])
})
afterEach(async () => {
sinon.restore()
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})
it('should be able to fetch from another peer', async () => {
const key = 'key'
const value = Uint8Array.from([0, 1, 2, 3, 4])
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
remoteFetch.registerLookupFunction(key, async (identifier) => {
expect(identifier).to.equal(key)
return value
})
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// Run fetch
const result = await localFetch.fetch(remoteComponents.getPeerId(), key)
expect(result).to.equalBytes(value)
})
it('should time out fetching from another peer when waiting for the record', async () => {
const key = 'key'
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol)
await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => {
void pipe(
stream,
async function * (source) {
for await (const chunk of source) {
// longer than the timeout
await delay(1000)
yield chunk
}
},
stream
)
})
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// Run fetch, should time out
await expect(localFetch.fetch(remoteComponents.getPeerId(), key, {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
})
})