mirror of
https://github.com/fluencelabs/redis
synced 2025-04-01 15:21:03 +00:00
RESP3: pubsub messages API completely refactored.
This commit is contained in:
parent
798a329192
commit
bc75a94e2d
132
src/pubsub.c
132
src/pubsub.c
@ -29,6 +29,75 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
|
int clientSubscriptionsCount(client *c);
|
||||||
|
|
||||||
|
/*-----------------------------------------------------------------------------
|
||||||
|
* Pubsub client replies API
|
||||||
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
/* Send a pubsub message of type "message" to the client. */
|
||||||
|
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
||||||
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
|
addReply(c,shared.messagebulk);
|
||||||
|
addReplyBulk(c,channel);
|
||||||
|
addReplyBulk(c,msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send a pubsub message of type "pmessage" to the client. The difference
|
||||||
|
* with the "message" type delivered by addReplyPubsubMessage() is that
|
||||||
|
* this message format also includes the pattern that matched the message. */
|
||||||
|
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
||||||
|
addReply(c,shared.mbulkhdr[4]);
|
||||||
|
addReply(c,shared.pmessagebulk);
|
||||||
|
addReplyBulk(c,pat);
|
||||||
|
addReplyBulk(c,channel);
|
||||||
|
addReplyBulk(c,msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send the pubsub subscription notification to the client. */
|
||||||
|
void addReplyPubsubSubscribed(client *c, robj *channel) {
|
||||||
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
|
addReply(c,shared.subscribebulk);
|
||||||
|
addReplyBulk(c,channel);
|
||||||
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send the pubsub unsubscription notification to the client.
|
||||||
|
* Channel can be NULL: this is useful when the client sends a mass
|
||||||
|
* unsubscribe command but there are no channels to unsubscribe from: we
|
||||||
|
* still send a notification. */
|
||||||
|
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
|
||||||
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
|
addReply(c,shared.unsubscribebulk);
|
||||||
|
if (channel)
|
||||||
|
addReplyBulk(c,channel);
|
||||||
|
else
|
||||||
|
addReplyNull(c);
|
||||||
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send the pubsub pattern subscription notification to the client. */
|
||||||
|
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
|
||||||
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
|
addReply(c,shared.psubscribebulk);
|
||||||
|
addReplyBulk(c,pattern);
|
||||||
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Send the pubsub pattern unsubscription notification to the client.
|
||||||
|
* Pattern can be NULL: this is useful when the client sends a mass
|
||||||
|
* punsubscribe command but there are no pattern to unsubscribe from: we
|
||||||
|
* still send a notification. */
|
||||||
|
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
|
||||||
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
|
addReply(c,shared.punsubscribebulk);
|
||||||
|
if (pattern)
|
||||||
|
addReplyBulk(c,pattern);
|
||||||
|
else
|
||||||
|
addReplyNull(c);
|
||||||
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
||||||
|
}
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Pubsub low level API
|
* Pubsub low level API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
@ -76,10 +145,7 @@ int pubsubSubscribeChannel(client *c, robj *channel) {
|
|||||||
listAddNodeTail(clients,c);
|
listAddNodeTail(clients,c);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
addReplyPubsubSubscribed(c,channel);
|
||||||
addReply(c,shared.subscribebulk);
|
|
||||||
addReplyBulk(c,channel);
|
|
||||||
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,14 +177,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
if (notify) {
|
if (notify) addReplyPubsubUnsubscribed(c,channel);
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
|
||||||
addReply(c,shared.unsubscribebulk);
|
|
||||||
addReplyBulk(c,channel);
|
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
|
||||||
listLength(c->pubsub_patterns));
|
|
||||||
|
|
||||||
}
|
|
||||||
decrRefCount(channel); /* it is finally safe to release it */
|
decrRefCount(channel); /* it is finally safe to release it */
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
@ -138,10 +197,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
|
|||||||
listAddNodeTail(server.pubsub_patterns,pat);
|
listAddNodeTail(server.pubsub_patterns,pat);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
addReplyPubsubPatSubscribed(c,pattern);
|
||||||
addReply(c,shared.psubscribebulk);
|
|
||||||
addReplyBulk(c,pattern);
|
|
||||||
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,13 +218,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
|||||||
listDelNode(server.pubsub_patterns,ln);
|
listDelNode(server.pubsub_patterns,ln);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
if (notify) {
|
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
|
||||||
addReply(c,shared.punsubscribebulk);
|
|
||||||
addReplyBulk(c,pattern);
|
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
|
||||||
listLength(c->pubsub_patterns));
|
|
||||||
}
|
|
||||||
decrRefCount(pattern);
|
decrRefCount(pattern);
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
@ -186,13 +236,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) {
|
|||||||
count += pubsubUnsubscribeChannel(c,channel,notify);
|
count += pubsubUnsubscribeChannel(c,channel,notify);
|
||||||
}
|
}
|
||||||
/* We were subscribed to nothing? Still reply to the client. */
|
/* We were subscribed to nothing? Still reply to the client. */
|
||||||
if (notify && count == 0) {
|
if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
|
||||||
addReply(c,shared.unsubscribebulk);
|
|
||||||
addReplyNull(c);
|
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
|
||||||
listLength(c->pubsub_patterns));
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
@ -210,36 +254,10 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
|
|||||||
|
|
||||||
count += pubsubUnsubscribePattern(c,pattern,notify);
|
count += pubsubUnsubscribePattern(c,pattern,notify);
|
||||||
}
|
}
|
||||||
if (notify && count == 0) {
|
if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
|
||||||
/* We were subscribed to nothing? Still reply to the client. */
|
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
|
||||||
addReply(c,shared.punsubscribebulk);
|
|
||||||
addReplyNull(c);
|
|
||||||
addReplyLongLong(c,dictSize(c->pubsub_channels)+
|
|
||||||
listLength(c->pubsub_patterns));
|
|
||||||
}
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a pubsub message of type "message" to the client. */
|
|
||||||
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
|
||||||
addReply(c,shared.messagebulk);
|
|
||||||
addReplyBulk(c,channel);
|
|
||||||
addReplyBulk(c,msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Send a pubsub message of type "pmessage" to the client. The difference
|
|
||||||
* with the "message" type delivered by addReplyPubsubMessage() is that
|
|
||||||
* this message format also includes the pattern that matched the message. */
|
|
||||||
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
|
||||||
addReply(c,shared.mbulkhdr[4]);
|
|
||||||
addReply(c,shared.pmessagebulk);
|
|
||||||
addReplyBulk(c,pat);
|
|
||||||
addReplyBulk(c,channel);
|
|
||||||
addReplyBulk(c,msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Publish a message */
|
/* Publish a message */
|
||||||
int pubsubPublishMessage(robj *channel, robj *message) {
|
int pubsubPublishMessage(robj *channel, robj *message) {
|
||||||
int receivers = 0;
|
int receivers = 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user