js-libp2p/src/pubsub.js

166 lines
4.8 KiB
JavaScript
Raw Normal View History

2018-02-14 11:30:36 +01:00
'use strict'
const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const promisify = require('promisify-es6')
2018-02-14 11:30:36 +01:00
const errCode = require('err-code')
module.exports = (node, Pubsub, config) => {
const pubsub = new Pubsub(node, config)
2018-02-14 11:30:36 +01:00
return {
/**
* Subscribe the given handler to a pubsub topic
*
* @param {string} topic
* @param {function} handler The handler to subscribe
* @param {object|null} [options]
* @param {function} [callback] An optional callback
*
* @returns {Promise|void} A promise is returned if no callback is provided
*
* @example <caption>Subscribe a handler to a topic</caption>
*
* // `null` must be passed for options until subscribe is no longer using promisify
* const handler = (message) => { }
* await libp2p.subscribe(topic, handler, null)
*
* @example <caption>Use a callback instead of the Promise api</caption>
*
* // `options` may be passed or omitted when supplying a callback
* const handler = (message) => { }
* libp2p.subscribe(topic, handler, callback)
*/
subscribe: (topic, handler, options, callback) => {
// can't use promisify because it thinks the handler is a callback
2018-02-14 11:30:36 +01:00
if (typeof options === 'function') {
callback = options
2018-02-14 11:30:36 +01:00
options = {}
}
if (!node.isStarted() && !pubsub.started) {
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-15 20:14:54 +01:00
if (callback) {
return nextTick(() => callback(err))
2018-02-14 11:30:36 +01:00
}
return Promise.reject(err)
2018-02-14 11:30:36 +01:00
}
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}
pubsub.on(topic, handler)
if (callback) {
return nextTick(() => callback())
}
return Promise.resolve()
},
/**
* Unsubscribes from a pubsub topic
*
* @param {string} topic
* @param {function|null} handler The handler to unsubscribe from
* @param {function} [callback] An optional callback
*
* @returns {Promise|void} A promise is returned if no callback is provided
*
* @example <caption>Unsubscribe a topic for all handlers</caption>
*
* // `null` must be passed until unsubscribe is no longer using promisify
* await libp2p.unsubscribe(topic, null)
*
* @example <caption>Unsubscribe a topic for 1 handler</caption>
*
* await libp2p.unsubscribe(topic, handler)
*
* @example <caption>Use a callback instead of the Promise api</caption>
*
* libp2p.unsubscribe(topic, handler, callback)
*/
unsubscribe: (topic, handler, callback) => {
// can't use promisify because it thinks the handler is a callback
if (!node.isStarted() && !pubsub.started) {
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
if (callback) {
return nextTick(() => callback(err))
}
return Promise.reject(err)
2018-02-15 19:39:06 +01:00
}
if (!handler) {
pubsub.removeAllListeners(topic)
} else {
pubsub.removeListener(topic, handler)
}
2018-02-14 11:30:36 +01:00
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
2018-02-14 11:30:36 +01:00
}
if (callback) {
return nextTick(() => callback())
}
2018-02-14 11:30:36 +01:00
return Promise.resolve()
},
publish: promisify((topic, data, callback) => {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
2018-02-14 11:30:36 +01:00
}
try {
data = Buffer.from(data)
} catch (err) {
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
2018-02-14 11:30:36 +01:00
}
pubsub.publish(topic, data, callback)
}),
2018-02-14 11:30:36 +01:00
ls: promisify((callback) => {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
2018-02-14 11:30:36 +01:00
}
const subscriptions = Array.from(pubsub.subscriptions)
2018-02-14 11:30:36 +01:00
nextTick(() => callback(null, subscriptions))
}),
2018-02-14 11:30:36 +01:00
peers: promisify((topic, callback) => {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
2018-02-14 11:30:36 +01:00
}
if (typeof topic === 'function') {
callback = topic
topic = null
}
const peers = Array.from(pubsub.peers.values())
2018-02-14 11:30:36 +01:00
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())
nextTick(() => callback(null, peers))
}),
2018-02-14 11:30:36 +01:00
setMaxListeners (n) {
return pubsub.setMaxListeners(n)
},
start: promisify((cb) => pubsub.start(cb)),
stop: promisify((cb) => pubsub.stop(cb))
2018-02-14 11:30:36 +01:00
}
}