mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-01 15:21:04 +00:00
* feat: promisify all api methods that accept callbacks This is a stop-gap until the full async/await migration can be completed. It means we can refactor tests of other modules that depend on this module without having to mix async flow control strategies. N.b. some methods that were previously callable without callbacks (e.g. `node.start()`, `node.stop()`, etc) now require callbacks otherwise a promise is returned which, if rejected, can cause `unhandledPromiseRejection` events and lead to memory leaks. * docs: add a global note to the api about promisify * fix: update the logic for unsubscribe * test(fix): correct pubsub unsubscribe usage for api change * test(fix): update content routing tests for latest delegate version
255 lines
6.8 KiB
JavaScript
255 lines
6.8 KiB
JavaScript
/* eslint-env mocha */
|
|
/* eslint max-nested-callbacks: ["error", 8] */
|
|
|
|
'use strict'
|
|
|
|
const chai = require('chai')
|
|
chai.use(require('dirty-chai'))
|
|
chai.use(require('chai-checkmark'))
|
|
const expect = chai.expect
|
|
const parallel = require('async/parallel')
|
|
const series = require('async/series')
|
|
const _times = require('lodash.times')
|
|
|
|
const { codes } = require('../src/errors')
|
|
const createNode = require('./utils/create-node')
|
|
|
|
function startTwo (callback) {
|
|
const tasks = _times(2, () => (cb) => {
|
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
|
config: {
|
|
peerDiscovery: {
|
|
mdns: {
|
|
enabled: false
|
|
}
|
|
},
|
|
EXPERIMENTAL: {
|
|
pubsub: true
|
|
}
|
|
}
|
|
}, (err, node) => {
|
|
expect(err).to.not.exist()
|
|
node.start((err) => cb(err, node))
|
|
})
|
|
})
|
|
|
|
parallel(tasks, (err, nodes) => {
|
|
expect(err).to.not.exist()
|
|
|
|
nodes[0].dial(nodes[1].peerInfo, (err) => callback(err, nodes))
|
|
})
|
|
}
|
|
|
|
function stopTwo (nodes, callback) {
|
|
parallel([
|
|
(cb) => nodes[0].stop(cb),
|
|
(cb) => nodes[1].stop(cb)
|
|
], callback)
|
|
}
|
|
|
|
// There is a vast test suite on PubSub through js-ipfs
|
|
// https://github.com/ipfs/interface-ipfs-core/blob/master/js/src/pubsub.js
|
|
// and libp2p-floodsub itself
|
|
// https://github.com/libp2p/js-libp2p-floodsub/tree/master/test
|
|
// TODO: consider if all or some of those should come here
|
|
describe('.pubsub', () => {
|
|
describe('.pubsub on (default)', (done) => {
|
|
it('start two nodes and send one message, then unsubscribe', (done) => {
|
|
// Check the final series error, and the publish handler
|
|
expect(2).checks(done)
|
|
|
|
let nodes
|
|
const data = Buffer.from('test')
|
|
const handler = (msg) => {
|
|
// verify the data is correct and mark the expect
|
|
expect(msg.data).to.eql(data).mark()
|
|
}
|
|
|
|
series([
|
|
// Start the nodes
|
|
(cb) => startTwo((err, _nodes) => {
|
|
nodes = _nodes
|
|
cb(err)
|
|
}),
|
|
// subscribe on the first
|
|
(cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
|
|
// Wait a moment before publishing
|
|
(cb) => setTimeout(cb, 500),
|
|
// publish on the second
|
|
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
|
|
// ls subscripts
|
|
(cb) => nodes[1].pubsub.ls(cb),
|
|
// get subscribed peers
|
|
(cb) => nodes[1].pubsub.peers('pubsub', cb),
|
|
// Wait a moment before unsubscribing
|
|
(cb) => setTimeout(cb, 500),
|
|
// unsubscribe on the first
|
|
(cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb),
|
|
// Stop both nodes
|
|
(cb) => stopTwo(nodes, cb)
|
|
], (err) => {
|
|
// Verify there was no error, and mark the expect
|
|
expect(err).to.not.exist().mark()
|
|
})
|
|
})
|
|
it('start two nodes and send one message, then unsubscribe without handler', (done) => {
|
|
// Check the final series error, and the publish handler
|
|
expect(3).checks(done)
|
|
|
|
let nodes
|
|
const data = Buffer.from('test')
|
|
const handler = (msg) => {
|
|
// verify the data is correct and mark the expect
|
|
expect(msg.data).to.eql(data).mark()
|
|
}
|
|
|
|
series([
|
|
// Start the nodes
|
|
(cb) => startTwo((err, _nodes) => {
|
|
nodes = _nodes
|
|
cb(err)
|
|
}),
|
|
// subscribe on the first
|
|
(cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
|
|
// Wait a moment before publishing
|
|
(cb) => setTimeout(cb, 500),
|
|
// publish on the second
|
|
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
|
|
// Wait a moment before unsubscribing
|
|
(cb) => setTimeout(cb, 500),
|
|
// unsubscribe from all
|
|
(cb) => nodes[0].pubsub.unsubscribe('pubsub', null, cb),
|
|
// Verify unsubscribed
|
|
(cb) => {
|
|
nodes[0].pubsub.ls((err, topics) => {
|
|
expect(topics.length).to.eql(0).mark()
|
|
cb(err)
|
|
})
|
|
},
|
|
// Stop both nodes
|
|
(cb) => stopTwo(nodes, cb)
|
|
], (err) => {
|
|
// Verify there was no error, and mark the expect
|
|
expect(err).to.not.exist().mark()
|
|
})
|
|
})
|
|
it('publish should fail if data is not a buffer', (done) => {
|
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
|
config: {
|
|
peerDiscovery: {
|
|
mdns: {
|
|
enabled: false
|
|
}
|
|
},
|
|
EXPERIMENTAL: {
|
|
pubsub: true
|
|
}
|
|
}
|
|
}, (err, node) => {
|
|
expect(err).to.not.exist()
|
|
|
|
node.start((err) => {
|
|
expect(err).to.not.exist()
|
|
|
|
node.pubsub.publish('pubsub', 'datastr', (err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER')
|
|
|
|
done()
|
|
})
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
describe('.pubsub off', () => {
|
|
it('fail to use pubsub if disabled', (done) => {
|
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
|
config: {
|
|
peerDiscovery: {
|
|
mdns: {
|
|
enabled: false
|
|
}
|
|
},
|
|
EXPERIMENTAL: {
|
|
pubsub: false
|
|
}
|
|
}
|
|
}, (err, node) => {
|
|
expect(err).to.not.exist()
|
|
expect(node.pubsub).to.not.exist()
|
|
done()
|
|
})
|
|
})
|
|
})
|
|
|
|
describe('.pubsub on and node not started', () => {
|
|
let libp2pNode
|
|
|
|
before(function (done) {
|
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
|
config: {
|
|
peerDiscovery: {
|
|
mdns: {
|
|
enabled: false
|
|
}
|
|
},
|
|
EXPERIMENTAL: {
|
|
pubsub: true
|
|
}
|
|
}
|
|
}, (err, node) => {
|
|
expect(err).to.not.exist()
|
|
|
|
libp2pNode = node
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('fail to subscribe if node not started yet', (done) => {
|
|
libp2pNode.pubsub.subscribe('pubsub', () => { }, (err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)
|
|
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('fail to unsubscribe if node not started yet', (done) => {
|
|
libp2pNode.pubsub.unsubscribe('pubsub', () => { }, (err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)
|
|
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('fail to publish if node not started yet', (done) => {
|
|
libp2pNode.pubsub.publish('pubsub', Buffer.from('data'), (err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)
|
|
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('fail to ls if node not started yet', (done) => {
|
|
libp2pNode.pubsub.ls((err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)
|
|
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('fail to get subscribed peers to a topic if node not started yet', (done) => {
|
|
libp2pNode.pubsub.peers('pubsub', (err) => {
|
|
expect(err).to.exist()
|
|
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)
|
|
|
|
done()
|
|
})
|
|
})
|
|
})
|
|
})
|