fix: add callback to pubsub.unsubscribe and test (#300)

This commit is contained in:
ebinks 2018-12-20 11:48:07 -05:00 committed by Jacob Heun
parent 8047fb76fa
commit 679d446daa
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
2 changed files with 40 additions and 22 deletions

View File

@ -33,7 +33,7 @@ module.exports = (node) => {
subscribe(callback) subscribe(callback)
}, },
unsubscribe: (topic, handler) => { unsubscribe: (topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) { if (!node.isStarted() && !floodSub.started) {
throw new Error(NOT_STARTED_YET) throw new Error(NOT_STARTED_YET)
} }
@ -43,6 +43,10 @@ module.exports = (node) => {
if (floodSub.listenerCount(topic) === 0) { if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic) floodSub.unsubscribe(topic)
} }
if (typeof callback === 'function') {
setImmediate(() => callback())
}
}, },
publish: (topic, data, callback) => { publish: (topic, data, callback) => {

View File

@ -5,9 +5,10 @@
const chai = require('chai') const chai = require('chai')
chai.use(require('dirty-chai')) chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect const expect = chai.expect
const parallel = require('async/parallel') const parallel = require('async/parallel')
const waterfall = require('async/waterfall') const series = require('async/series')
const _times = require('lodash.times') const _times = require('lodash.times')
const createNode = require('./utils/create-node') const createNode = require('./utils/create-node')
@ -52,26 +53,39 @@ function stopTwo (nodes, callback) {
// TODO: consider if all or some of those should come here // TODO: consider if all or some of those should come here
describe('.pubsub', () => { describe('.pubsub', () => {
describe('.pubsub on (default)', (done) => { describe('.pubsub on (default)', (done) => {
it('start two nodes and send one message', (done) => { it('start two nodes and send one message, then unsubscribe', (done) => {
waterfall([ // Check the final series error, and the publish handler
(cb) => startTwo(cb), expect(2).checks(done)
(nodes, cb) => {
const data = Buffer.from('test') let nodes
nodes[0].pubsub.subscribe('pubsub', const data = Buffer.from('test')
(msg) => { const handler = (msg) => {
expect(msg.data).to.eql(data) // verify the data is correct and mark the expect
cb(null, nodes) expect(msg.data).to.eql(data).mark()
}, }
(err) => {
expect(err).to.not.exist() series([
setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { // Start the nodes
expect(err).to.not.exist() (cb) => startTwo((err, _nodes) => {
}), 500) nodes = _nodes
} cb(err)
) }),
}, // subscribe on the first
(nodes, cb) => stopTwo(nodes, cb) (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
], done) // Wait a moment before publishing
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe on the first
(cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb),
// Stop both nodes
(cb) => stopTwo(nodes, cb)
], (err) => {
// Verify there was no error, and mark the expect
expect(err).to.not.exist().mark()
})
}) })
}) })