js-libp2p/src/pubsub.js

106 lines
2.7 KiB
JavaScript
Raw Normal View History

2018-02-14 11:30:36 +01:00
'use strict'
const { Buffer } = require('buffer')
const errCode = require('err-code')
const { messages, codes } = require('./errors')
module.exports = (node, Pubsub, config) => {
const pubsub = new Pubsub(node.peerInfo, node.registrar, config)
2018-02-14 11:30:36 +01:00
return {
/**
* Subscribe the given handler to a pubsub topic
* @param {string} topic
* @param {function} handler The handler to subscribe
* @returns {void}
*/
subscribe: (topic, handler) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-14 11:30:36 +01:00
}
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}
pubsub.on(topic, handler)
},
/**
* Unsubscribes from a pubsub topic
* @param {string} topic
* @param {function} [handler] The handler to unsubscribe from
*/
unsubscribe: (topic, handler) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-15 19:39:06 +01:00
}
if (!handler) {
pubsub.removeAllListeners(topic)
} else {
pubsub.removeListener(topic, handler)
}
2018-02-14 11:30:36 +01:00
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
2018-02-14 11:30:36 +01:00
}
},
/**
* Publish messages to the given topics.
* @param {Array<string>|string} topic
* @param {Buffer} data
* @returns {Promise<void>}
*/
publish: (topic, data) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-14 11:30:36 +01:00
}
try {
data = Buffer.from(data)
} catch (err) {
throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')
2018-02-14 11:30:36 +01:00
}
return pubsub.publish(topic, data)
},
2018-02-14 11:30:36 +01:00
/**
* Get a list of topics the node is subscribed to.
* @returns {Array<String>} topics
*/
getTopics: () => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-14 11:30:36 +01:00
}
return pubsub.getTopics()
},
2018-02-14 11:30:36 +01:00
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getSubscribers: (topic) => {
if (!node.isStarted() && !pubsub.started) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
2018-02-14 11:30:36 +01:00
}
return pubsub.getSubscribers(topic)
},
2018-02-14 11:30:36 +01:00
setMaxListeners (n) {
return pubsub.setMaxListeners(n)
},
_pubsub: pubsub,
start: () => pubsub.start(),
stop: () => pubsub.stop()
2018-02-14 11:30:36 +01:00
}
}