mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
PUBSUB implemented
This commit is contained in:
parent
cac154c580
commit
befec3cd91
@ -159,6 +159,9 @@ static struct redisCommand cmdTable[] = {
|
|||||||
{"hgetall",2,REDIS_CMD_INLINE},
|
{"hgetall",2,REDIS_CMD_INLINE},
|
||||||
{"hexists",3,REDIS_CMD_BULK},
|
{"hexists",3,REDIS_CMD_BULK},
|
||||||
{"config",-2,REDIS_CMD_BULK},
|
{"config",-2,REDIS_CMD_BULK},
|
||||||
|
{"subscribe",-2,REDIS_CMD_INLINE},
|
||||||
|
{"unsubscribe",-1,REDIS_CMD_INLINE},
|
||||||
|
{"publish",3,REDIS_CMD_BULK},
|
||||||
{NULL,0,0}
|
{NULL,0,0}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
156
redis.c
156
redis.c
@ -327,6 +327,7 @@ typedef struct redisClient {
|
|||||||
* is >= blockingto then the operation timed out. */
|
* is >= blockingto then the operation timed out. */
|
||||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||||
* swap file in order to continue. */
|
* swap file in order to continue. */
|
||||||
|
dict *pubsub_classes; /* Classes a client is interested in (SUBSCRIBE) */
|
||||||
} redisClient;
|
} redisClient;
|
||||||
|
|
||||||
struct saveparam {
|
struct saveparam {
|
||||||
@ -435,6 +436,9 @@ struct redisServer {
|
|||||||
unsigned long long vm_stats_swapped_objects;
|
unsigned long long vm_stats_swapped_objects;
|
||||||
unsigned long long vm_stats_swapouts;
|
unsigned long long vm_stats_swapouts;
|
||||||
unsigned long long vm_stats_swapins;
|
unsigned long long vm_stats_swapins;
|
||||||
|
/* Pubsub */
|
||||||
|
dict *pubsub_classes; /* Associate classes to list of subscribed clients */
|
||||||
|
/* Misc */
|
||||||
FILE *devnull;
|
FILE *devnull;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -501,7 +505,8 @@ struct sharedObjectsStruct {
|
|||||||
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
||||||
*outofrangeerr, *plus,
|
*outofrangeerr, *plus,
|
||||||
*select0, *select1, *select2, *select3, *select4,
|
*select0, *select1, *select2, *select3, *select4,
|
||||||
*select5, *select6, *select7, *select8, *select9;
|
*select5, *select6, *select7, *select8, *select9,
|
||||||
|
*messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3;
|
||||||
} shared;
|
} shared;
|
||||||
|
|
||||||
/* Global vars that are actally used as constants. The following double
|
/* Global vars that are actally used as constants. The following double
|
||||||
@ -601,6 +606,8 @@ static struct redisCommand *lookupCommand(char *name);
|
|||||||
static void call(redisClient *c, struct redisCommand *cmd);
|
static void call(redisClient *c, struct redisCommand *cmd);
|
||||||
static void resetClient(redisClient *c);
|
static void resetClient(redisClient *c);
|
||||||
static void convertToRealHash(robj *o);
|
static void convertToRealHash(robj *o);
|
||||||
|
static int pubsubUnsubscribeAll(redisClient *c, int notify);
|
||||||
|
static void usage();
|
||||||
|
|
||||||
static void authCommand(redisClient *c);
|
static void authCommand(redisClient *c);
|
||||||
static void pingCommand(redisClient *c);
|
static void pingCommand(redisClient *c);
|
||||||
@ -698,6 +705,9 @@ static void hgetallCommand(redisClient *c);
|
|||||||
static void hexistsCommand(redisClient *c);
|
static void hexistsCommand(redisClient *c);
|
||||||
static void configCommand(redisClient *c);
|
static void configCommand(redisClient *c);
|
||||||
static void hincrbyCommand(redisClient *c);
|
static void hincrbyCommand(redisClient *c);
|
||||||
|
static void subscribeCommand(redisClient *c);
|
||||||
|
static void unsubscribeCommand(redisClient *c);
|
||||||
|
static void publishCommand(redisClient *c);
|
||||||
|
|
||||||
/*================================= Globals ================================= */
|
/*================================= Globals ================================= */
|
||||||
|
|
||||||
@ -801,11 +811,12 @@ static struct redisCommand cmdTable[] = {
|
|||||||
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
|
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
|
||||||
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
|
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
|
||||||
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
|
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
|
||||||
|
{"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
|
||||||
|
{"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
|
||||||
|
{"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0},
|
||||||
{NULL,NULL,0,0,NULL,0,0,0}
|
{NULL,NULL,0,0,NULL,0,0,0}
|
||||||
};
|
};
|
||||||
|
|
||||||
static void usage();
|
|
||||||
|
|
||||||
/*============================ Utility functions ============================ */
|
/*============================ Utility functions ============================ */
|
||||||
|
|
||||||
/* Glob-style pattern matching. */
|
/* Glob-style pattern matching. */
|
||||||
@ -1473,6 +1484,10 @@ static void createSharedObjects(void) {
|
|||||||
shared.select7 = createStringObject("select 7\r\n",10);
|
shared.select7 = createStringObject("select 7\r\n",10);
|
||||||
shared.select8 = createStringObject("select 8\r\n",10);
|
shared.select8 = createStringObject("select 8\r\n",10);
|
||||||
shared.select9 = createStringObject("select 9\r\n",10);
|
shared.select9 = createStringObject("select 9\r\n",10);
|
||||||
|
shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
|
||||||
|
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
|
||||||
|
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",17);
|
||||||
|
shared.mbulk3 = createStringObject("*3\r\n",4);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void appendServerSaveParams(time_t seconds, int changes) {
|
static void appendServerSaveParams(time_t seconds, int changes) {
|
||||||
@ -1576,6 +1591,7 @@ static void initServer() {
|
|||||||
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
|
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
|
||||||
server.db[j].id = j;
|
server.db[j].id = j;
|
||||||
}
|
}
|
||||||
|
server.pubsub_classes = dictCreate(&keylistDictType,NULL);
|
||||||
server.cronloops = 0;
|
server.cronloops = 0;
|
||||||
server.bgsavechildpid = -1;
|
server.bgsavechildpid = -1;
|
||||||
server.bgrewritechildpid = -1;
|
server.bgrewritechildpid = -1;
|
||||||
@ -1839,6 +1855,10 @@ static void freeClient(redisClient *c) {
|
|||||||
if (c->flags & REDIS_BLOCKED)
|
if (c->flags & REDIS_BLOCKED)
|
||||||
unblockClientWaitingData(c);
|
unblockClientWaitingData(c);
|
||||||
|
|
||||||
|
/* Unsubscribe from all the pubsub classes */
|
||||||
|
pubsubUnsubscribeAll(c,0);
|
||||||
|
dictRelease(c->pubsub_classes);
|
||||||
|
/* Obvious cleanup */
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
||||||
listRelease(c->reply);
|
listRelease(c->reply);
|
||||||
@ -1861,7 +1881,7 @@ static void freeClient(redisClient *c) {
|
|||||||
dontWaitForSwappedKey(c,ln->value);
|
dontWaitForSwappedKey(c,ln->value);
|
||||||
}
|
}
|
||||||
listRelease(c->io_keys);
|
listRelease(c->io_keys);
|
||||||
/* Other cleanup */
|
/* Master/slave cleanup */
|
||||||
if (c->flags & REDIS_SLAVE) {
|
if (c->flags & REDIS_SLAVE) {
|
||||||
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
|
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
|
||||||
close(c->repldbfd);
|
close(c->repldbfd);
|
||||||
@ -1874,6 +1894,7 @@ static void freeClient(redisClient *c) {
|
|||||||
server.master = NULL;
|
server.master = NULL;
|
||||||
server.replstate = REDIS_REPL_CONNECT;
|
server.replstate = REDIS_REPL_CONNECT;
|
||||||
}
|
}
|
||||||
|
/* Release memory */
|
||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
zfree(c->mbargv);
|
zfree(c->mbargv);
|
||||||
freeClientMultiState(c);
|
freeClientMultiState(c);
|
||||||
@ -2480,6 +2501,7 @@ static redisClient *createClient(int fd) {
|
|||||||
c->blockingkeys = NULL;
|
c->blockingkeys = NULL;
|
||||||
c->blockingkeysnum = 0;
|
c->blockingkeysnum = 0;
|
||||||
c->io_keys = listCreate();
|
c->io_keys = listCreate();
|
||||||
|
c->pubsub_classes = dictCreate(&setDictType,NULL);
|
||||||
listSetFreeMethod(c->io_keys,decrRefCount);
|
listSetFreeMethod(c->io_keys,decrRefCount);
|
||||||
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
|
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
|
||||||
readQueryFromClient, c) == AE_ERR) {
|
readQueryFromClient, c) == AE_ERR) {
|
||||||
@ -9236,6 +9258,132 @@ badarity:
|
|||||||
(char*) c->argv[1]->ptr));
|
(char*) c->argv[1]->ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* =========================== Pubsub implementation ======================== */
|
||||||
|
|
||||||
|
/* Subscribe a client to a class. Returns 1 if the operation succeeded, or
|
||||||
|
* 0 if the client was already subscribed to that class. */
|
||||||
|
static int pubsubSubscribe(redisClient *c, robj *class) {
|
||||||
|
struct dictEntry *de;
|
||||||
|
list *clients = NULL;
|
||||||
|
int retval = 0;
|
||||||
|
|
||||||
|
/* Add the class to the client -> classes hash table */
|
||||||
|
if (dictAdd(c->pubsub_classes,class,NULL) == DICT_OK) {
|
||||||
|
retval = 1;
|
||||||
|
incrRefCount(class);
|
||||||
|
/* Add the client to the class -> list of clients hash table */
|
||||||
|
de = dictFind(server.pubsub_classes,class);
|
||||||
|
if (de == NULL) {
|
||||||
|
clients = listCreate();
|
||||||
|
dictAdd(server.pubsub_classes,class,clients);
|
||||||
|
incrRefCount(class);
|
||||||
|
} else {
|
||||||
|
clients = dictGetEntryVal(de);
|
||||||
|
}
|
||||||
|
listAddNodeTail(clients,c);
|
||||||
|
}
|
||||||
|
/* Notify the client */
|
||||||
|
addReply(c,shared.mbulk3);
|
||||||
|
addReply(c,shared.subscribebulk);
|
||||||
|
addReplyBulk(c,class);
|
||||||
|
addReplyLong(c,dictSize(c->pubsub_classes));
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unsubscribe a client from a class. Returns 1 if the operation succeeded, or
|
||||||
|
* 0 if the client was not subscribed to the specified class. */
|
||||||
|
static int pubsubUnsubscribe(redisClient *c, robj *class, int notify) {
|
||||||
|
struct dictEntry *de;
|
||||||
|
list *clients;
|
||||||
|
listNode *ln;
|
||||||
|
int retval = 0;
|
||||||
|
|
||||||
|
/* Remove the class from the client -> classes hash table */
|
||||||
|
if (dictDelete(c->pubsub_classes,class) == DICT_OK) {
|
||||||
|
retval = 1;
|
||||||
|
/* Remove the client from the class -> clients list hash table */
|
||||||
|
de = dictFind(server.pubsub_classes,class);
|
||||||
|
assert(de != NULL);
|
||||||
|
clients = dictGetEntryVal(de);
|
||||||
|
ln = listSearchKey(clients,c);
|
||||||
|
assert(ln != NULL);
|
||||||
|
listDelNode(clients,ln);
|
||||||
|
}
|
||||||
|
/* Notify the client */
|
||||||
|
if (notify) {
|
||||||
|
addReply(c,shared.mbulk3);
|
||||||
|
addReply(c,shared.unsubscribebulk);
|
||||||
|
addReplyBulk(c,class);
|
||||||
|
addReplyLong(c,dictSize(c->pubsub_classes));
|
||||||
|
}
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unsubscribe from all the classes. Return the number of classes the
|
||||||
|
* client was subscribed to. */
|
||||||
|
static int pubsubUnsubscribeAll(redisClient *c, int notify) {
|
||||||
|
dictIterator *di = dictGetIterator(c->pubsub_classes);
|
||||||
|
dictEntry *de;
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
robj *class = dictGetEntryKey(de);
|
||||||
|
|
||||||
|
count += pubsubUnsubscribe(c,class,notify);
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Publish a message */
|
||||||
|
static int pubsubPublishMessage(robj *class, robj *message) {
|
||||||
|
int receivers = 0;
|
||||||
|
struct dictEntry *de;
|
||||||
|
|
||||||
|
de = dictFind(server.pubsub_classes,class);
|
||||||
|
if (de) {
|
||||||
|
list *list = dictGetEntryVal(de);
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
|
||||||
|
listRewind(list,&li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
redisClient *c = ln->value;
|
||||||
|
|
||||||
|
addReply(c,shared.mbulk3);
|
||||||
|
addReply(c,shared.messagebulk);
|
||||||
|
addReplyBulk(c,class);
|
||||||
|
addReplyBulk(c,message);
|
||||||
|
receivers++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return receivers;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void subscribeCommand(redisClient *c) {
|
||||||
|
int j;
|
||||||
|
|
||||||
|
for (j = 1; j < c->argc; j++)
|
||||||
|
pubsubSubscribe(c,c->argv[j]);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void unsubscribeCommand(redisClient *c) {
|
||||||
|
if (c->argc == 1) {
|
||||||
|
pubsubUnsubscribeAll(c,1);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
int j;
|
||||||
|
|
||||||
|
for (j = 1; j < c->argc; j++)
|
||||||
|
pubsubUnsubscribe(c,c->argv[j],1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void publishCommand(redisClient *c) {
|
||||||
|
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
|
||||||
|
addReplyLong(c,receivers);
|
||||||
|
}
|
||||||
|
|
||||||
/* ================================= Debugging ============================== */
|
/* ================================= Debugging ============================== */
|
||||||
|
|
||||||
static void debugCommand(redisClient *c) {
|
static void debugCommand(redisClient *c) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user