js-libp2p/src/pubsub.js

94 lines
2.3 KiB
JavaScript
Raw Normal View History

2018-02-14 11:30:36 +01:00
'use strict'
const setImmediate = require('async/setImmediate')
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
const FloodSub = require('libp2p-floodsub')
module.exports = (node) => {
const floodSub = new FloodSub(node)
node._floodSub = floodSub
return {
subscribe: (topic, options, handler, callback) => {
2018-02-15 19:39:06 +01:00
if (!node.isStarted() && !floodSub.started) {
2018-02-14 11:30:36 +01:00
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}
if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}
function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
}
2018-02-15 19:39:35 +01:00
floodSub.on(topic, handler)
2018-02-14 11:30:36 +01:00
setImmediate(cb)
}
subscribe(callback)
},
unsubscribe: (topic, handler) => {
2018-02-15 19:39:06 +01:00
if (!node.isStarted() && !floodSub.started) {
throw new Error(NOT_STARTED_YET)
}
2018-02-14 11:30:36 +01:00
floodSub.removeListener(topic, handler)
if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
}
},
publish: (topic, data, callback) => {
2018-02-15 19:39:06 +01:00
if (!node.isStarted() && !floodSub.started) {
2018-02-14 11:30:36 +01:00
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}
if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}
floodSub.publish(topic, data)
setImmediate(() => callback())
},
ls: (callback) => {
2018-02-15 19:39:06 +01:00
if (!node.isStarted() && !floodSub.started) {
2018-02-14 11:30:36 +01:00
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}
const subscriptions = Array.from(floodSub.subscriptions)
setImmediate(() => callback(null, subscriptions))
},
peers: (topic, callback) => {
2018-02-15 19:39:06 +01:00
if (!node.isStarted() && !floodSub.started) {
2018-02-14 11:30:36 +01:00
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}
if (typeof topic === 'function') {
callback = topic
topic = null
}
const peers = Array.from(floodSub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())
setImmediate(() => callback(null, peers))
},
setMaxListeners (n) {
return floodSub.setMaxListeners(n)
}
}
}