mirror of
https://github.com/fluencelabs/redis
synced 2025-03-31 14:51:04 +00:00
Merge branch 'master' of github.com:antirez/redis
This commit is contained in:
commit
603e616bf4
@ -41,8 +41,10 @@ redisClient *createClient(int fd) {
|
|||||||
c->reply = listCreate();
|
c->reply = listCreate();
|
||||||
listSetFreeMethod(c->reply,decrRefCount);
|
listSetFreeMethod(c->reply,decrRefCount);
|
||||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||||
c->blocking_keys = NULL;
|
c->bpop.keys = NULL;
|
||||||
c->blocking_keys_num = 0;
|
c->bpop.count = 0;
|
||||||
|
c->bpop.timeout = 0;
|
||||||
|
c->bpop.target = NULL;
|
||||||
c->io_keys = listCreate();
|
c->io_keys = listCreate();
|
||||||
c->watched_keys = listCreate();
|
c->watched_keys = listCreate();
|
||||||
listSetFreeMethod(c->io_keys,decrRefCount);
|
listSetFreeMethod(c->io_keys,decrRefCount);
|
||||||
@ -699,7 +701,7 @@ void closeTimedoutClients(void) {
|
|||||||
redisLog(REDIS_VERBOSE,"Closing idle client");
|
redisLog(REDIS_VERBOSE,"Closing idle client");
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
} else if (c->flags & REDIS_BLOCKED) {
|
} else if (c->flags & REDIS_BLOCKED) {
|
||||||
if (c->blockingto != 0 && c->blockingto < now) {
|
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
|
||||||
addReply(c,shared.nullmultibulk);
|
addReply(c,shared.nullmultibulk);
|
||||||
unblockClientWaitingData(c);
|
unblockClientWaitingData(c);
|
||||||
}
|
}
|
||||||
|
28
src/redis.c
28
src/redis.c
@ -93,6 +93,7 @@ struct redisCommand readonlyCommandTable[] = {
|
|||||||
{"rpop",rpopCommand,2,0,NULL,1,1,1},
|
{"rpop",rpopCommand,2,0,NULL,1,1,1},
|
||||||
{"lpop",lpopCommand,2,0,NULL,1,1,1},
|
{"lpop",lpopCommand,2,0,NULL,1,1,1},
|
||||||
{"brpop",brpopCommand,-3,0,NULL,1,1,1},
|
{"brpop",brpopCommand,-3,0,NULL,1,1,1},
|
||||||
|
{"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
|
||||||
{"blpop",blpopCommand,-3,0,NULL,1,1,1},
|
{"blpop",blpopCommand,-3,0,NULL,1,1,1},
|
||||||
{"llen",llenCommand,2,0,NULL,1,1,1},
|
{"llen",llenCommand,2,0,NULL,1,1,1},
|
||||||
{"lindex",lindexCommand,3,0,NULL,1,1,1},
|
{"lindex",lindexCommand,3,0,NULL,1,1,1},
|
||||||
@ -100,7 +101,7 @@ struct redisCommand readonlyCommandTable[] = {
|
|||||||
{"lrange",lrangeCommand,4,0,NULL,1,1,1},
|
{"lrange",lrangeCommand,4,0,NULL,1,1,1},
|
||||||
{"ltrim",ltrimCommand,4,0,NULL,1,1,1},
|
{"ltrim",ltrimCommand,4,0,NULL,1,1,1},
|
||||||
{"lrem",lremCommand,4,0,NULL,1,1,1},
|
{"lrem",lremCommand,4,0,NULL,1,1,1},
|
||||||
{"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
|
{"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
|
||||||
{"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
|
{"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
|
||||||
{"srem",sremCommand,3,0,NULL,1,1,1},
|
{"srem",sremCommand,3,0,NULL,1,1,1},
|
||||||
{"smove",smoveCommand,4,0,NULL,1,2,1},
|
{"smove",smoveCommand,4,0,NULL,1,2,1},
|
||||||
@ -576,7 +577,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Close connections of timedout clients */
|
/* Close connections of timedout clients */
|
||||||
if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
|
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
|
||||||
closeTimedoutClients();
|
closeTimedoutClients();
|
||||||
|
|
||||||
/* Check if a background saving or AOF rewrite in progress terminated */
|
/* Check if a background saving or AOF rewrite in progress terminated */
|
||||||
@ -649,15 +650,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
* for ready file descriptors. */
|
* for ready file descriptors. */
|
||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
REDIS_NOTUSED(eventLoop);
|
REDIS_NOTUSED(eventLoop);
|
||||||
|
listNode *ln;
|
||||||
|
redisClient *c;
|
||||||
|
|
||||||
/* Awake clients that got all the swapped keys they requested */
|
/* Awake clients that got all the swapped keys they requested */
|
||||||
if (server.vm_enabled && listLength(server.io_ready_clients)) {
|
if (server.vm_enabled && listLength(server.io_ready_clients)) {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
|
||||||
|
|
||||||
listRewind(server.io_ready_clients,&li);
|
listRewind(server.io_ready_clients,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
redisClient *c = ln->value;
|
c = ln->value;
|
||||||
struct redisCommand *cmd;
|
struct redisCommand *cmd;
|
||||||
|
|
||||||
/* Resume the client. */
|
/* Resume the client. */
|
||||||
@ -675,6 +677,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
processInputBuffer(c);
|
processInputBuffer(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
|
while (listLength(server.unblocked_clients)) {
|
||||||
|
ln = listFirst(server.unblocked_clients);
|
||||||
|
redisAssert(ln != NULL);
|
||||||
|
c = ln->value;
|
||||||
|
listDelNode(server.unblocked_clients,ln);
|
||||||
|
|
||||||
|
/* Process remaining data in the input buffer. */
|
||||||
|
if (c->querybuf && sdslen(c->querybuf) > 0)
|
||||||
|
processInputBuffer(c);
|
||||||
|
}
|
||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile();
|
flushAppendOnlyFile();
|
||||||
}
|
}
|
||||||
@ -762,7 +777,7 @@ void initServerConfig() {
|
|||||||
server.rdbcompression = 1;
|
server.rdbcompression = 1;
|
||||||
server.activerehashing = 1;
|
server.activerehashing = 1;
|
||||||
server.maxclients = 0;
|
server.maxclients = 0;
|
||||||
server.blpop_blocked_clients = 0;
|
server.bpop_blocked_clients = 0;
|
||||||
server.maxmemory = 0;
|
server.maxmemory = 0;
|
||||||
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
|
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
|
||||||
server.maxmemory_samples = 3;
|
server.maxmemory_samples = 3;
|
||||||
@ -821,6 +836,7 @@ void initServer() {
|
|||||||
server.clients = listCreate();
|
server.clients = listCreate();
|
||||||
server.slaves = listCreate();
|
server.slaves = listCreate();
|
||||||
server.monitors = listCreate();
|
server.monitors = listCreate();
|
||||||
|
server.unblocked_clients = listCreate();
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
server.el = aeCreateEventLoop();
|
server.el = aeCreateEventLoop();
|
||||||
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
|
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
|
||||||
@ -1173,7 +1189,7 @@ sds genRedisInfoString(void) {
|
|||||||
(float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
|
(float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
|
||||||
listLength(server.clients)-listLength(server.slaves),
|
listLength(server.clients)-listLength(server.slaves),
|
||||||
listLength(server.slaves),
|
listLength(server.slaves),
|
||||||
server.blpop_blocked_clients,
|
server.bpop_blocked_clients,
|
||||||
zmalloc_used_memory(),
|
zmalloc_used_memory(),
|
||||||
hmem,
|
hmem,
|
||||||
zmalloc_get_rss(),
|
zmalloc_get_rss(),
|
||||||
|
22
src/redis.h
22
src/redis.h
@ -293,6 +293,16 @@ typedef struct multiState {
|
|||||||
int count; /* Total number of MULTI commands */
|
int count; /* Total number of MULTI commands */
|
||||||
} multiState;
|
} multiState;
|
||||||
|
|
||||||
|
typedef struct blockingState {
|
||||||
|
robj **keys; /* The key we are waiting to terminate a blocking
|
||||||
|
* operation such as BLPOP. Otherwise NULL. */
|
||||||
|
int count; /* Number of blocking keys */
|
||||||
|
time_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||||
|
* is >= timeout then the operation timed out. */
|
||||||
|
robj *target; /* The key that should receive the element,
|
||||||
|
* for BRPOPLPUSH. */
|
||||||
|
} blockingState;
|
||||||
|
|
||||||
/* With multiplexing we need to take per-clinet state.
|
/* With multiplexing we need to take per-clinet state.
|
||||||
* Clients are taken in a liked list. */
|
* Clients are taken in a liked list. */
|
||||||
typedef struct redisClient {
|
typedef struct redisClient {
|
||||||
@ -316,11 +326,7 @@ typedef struct redisClient {
|
|||||||
long repldboff; /* replication DB file offset */
|
long repldboff; /* replication DB file offset */
|
||||||
off_t repldbsize; /* replication DB file size */
|
off_t repldbsize; /* replication DB file size */
|
||||||
multiState mstate; /* MULTI/EXEC state */
|
multiState mstate; /* MULTI/EXEC state */
|
||||||
robj **blocking_keys; /* The key we are waiting to terminate a blocking
|
blockingState bpop; /* blocking state */
|
||||||
* operation such as BLPOP. Otherwise NULL. */
|
|
||||||
int blocking_keys_num; /* Number of blocking keys */
|
|
||||||
time_t blockingto; /* Blocking operation timeout. If UNIX current time
|
|
||||||
* is >= blockingto then the operation timed out. */
|
|
||||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||||
* swap file in order to continue. */
|
* swap file in order to continue. */
|
||||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||||
@ -427,8 +433,9 @@ struct redisServer {
|
|||||||
int maxmemory_policy;
|
int maxmemory_policy;
|
||||||
int maxmemory_samples;
|
int maxmemory_samples;
|
||||||
/* Blocked clients */
|
/* Blocked clients */
|
||||||
unsigned int blpop_blocked_clients;
|
unsigned int bpop_blocked_clients;
|
||||||
unsigned int vm_blocked_clients;
|
unsigned int vm_blocked_clients;
|
||||||
|
list *unblocked_clients;
|
||||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||||
* have to take this state global, in order to pass it to sortCompare() */
|
* have to take this state global, in order to pass it to sortCompare() */
|
||||||
int sort_desc;
|
int sort_desc;
|
||||||
@ -941,7 +948,7 @@ void flushdbCommand(redisClient *c);
|
|||||||
void flushallCommand(redisClient *c);
|
void flushallCommand(redisClient *c);
|
||||||
void sortCommand(redisClient *c);
|
void sortCommand(redisClient *c);
|
||||||
void lremCommand(redisClient *c);
|
void lremCommand(redisClient *c);
|
||||||
void rpoplpushcommand(redisClient *c);
|
void rpoplpushCommand(redisClient *c);
|
||||||
void infoCommand(redisClient *c);
|
void infoCommand(redisClient *c);
|
||||||
void mgetCommand(redisClient *c);
|
void mgetCommand(redisClient *c);
|
||||||
void monitorCommand(redisClient *c);
|
void monitorCommand(redisClient *c);
|
||||||
@ -970,6 +977,7 @@ void execCommand(redisClient *c);
|
|||||||
void discardCommand(redisClient *c);
|
void discardCommand(redisClient *c);
|
||||||
void blpopCommand(redisClient *c);
|
void blpopCommand(redisClient *c);
|
||||||
void brpopCommand(redisClient *c);
|
void brpopCommand(redisClient *c);
|
||||||
|
void brpoplpushCommand(redisClient *c);
|
||||||
void appendCommand(redisClient *c);
|
void appendCommand(redisClient *c);
|
||||||
void strlenCommand(redisClient *c);
|
void strlenCommand(redisClient *c);
|
||||||
void zrankCommand(redisClient *c);
|
void zrankCommand(redisClient *c);
|
||||||
|
@ -219,8 +219,8 @@ int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char
|
|||||||
* reference is retained. */
|
* reference is retained. */
|
||||||
robj *hashTypeCurrentObject(hashTypeIterator *hi, int what) {
|
robj *hashTypeCurrentObject(hashTypeIterator *hi, int what) {
|
||||||
robj *obj;
|
robj *obj;
|
||||||
unsigned char *v;
|
unsigned char *v = NULL;
|
||||||
unsigned int vlen;
|
unsigned int vlen = 0;
|
||||||
int encoding = hashTypeCurrent(hi,what,&obj,&v,&vlen);
|
int encoding = hashTypeCurrent(hi,what,&obj,&v,&vlen);
|
||||||
|
|
||||||
if (encoding == REDIS_ENCODING_HT) {
|
if (encoding == REDIS_ENCODING_HT) {
|
||||||
@ -430,8 +430,8 @@ void genericHgetallCommand(redisClient *c, int flags) {
|
|||||||
hi = hashTypeInitIterator(o);
|
hi = hashTypeInitIterator(o);
|
||||||
while (hashTypeNext(hi) != REDIS_ERR) {
|
while (hashTypeNext(hi) != REDIS_ERR) {
|
||||||
robj *obj;
|
robj *obj;
|
||||||
unsigned char *v;
|
unsigned char *v = NULL;
|
||||||
unsigned int vlen;
|
unsigned int vlen = 0;
|
||||||
int encoding;
|
int encoding;
|
||||||
|
|
||||||
if (flags & REDIS_HASH_KEY) {
|
if (flags & REDIS_HASH_KEY) {
|
||||||
|
183
src/t_list.c
183
src/t_list.c
@ -634,7 +634,25 @@ void lremCommand(redisClient *c) {
|
|||||||
* since the element is not just returned but pushed against another list
|
* since the element is not just returned but pushed against another list
|
||||||
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
||||||
*/
|
*/
|
||||||
void rpoplpushcommand(redisClient *c) {
|
|
||||||
|
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
|
||||||
|
if (!handleClientsWaitingListPush(c,dstkey,value)) {
|
||||||
|
/* Create the list if the key does not exist */
|
||||||
|
if (!dstobj) {
|
||||||
|
dstobj = createZiplistObject();
|
||||||
|
dbAdd(c->db,dstkey,dstobj);
|
||||||
|
} else {
|
||||||
|
touchWatchedKey(c->db,dstkey);
|
||||||
|
server.dirty++;
|
||||||
|
}
|
||||||
|
listTypePush(dstobj,value,REDIS_HEAD);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Always send the pushed value to the client. */
|
||||||
|
addReplyBulk(c,value);
|
||||||
|
}
|
||||||
|
|
||||||
|
void rpoplpushCommand(redisClient *c) {
|
||||||
robj *sobj, *value;
|
robj *sobj, *value;
|
||||||
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
|
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
|
||||||
checkType(c,sobj,REDIS_LIST)) return;
|
checkType(c,sobj,REDIS_LIST)) return;
|
||||||
@ -645,20 +663,7 @@ void rpoplpushcommand(redisClient *c) {
|
|||||||
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
||||||
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
|
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
|
||||||
value = listTypePop(sobj,REDIS_TAIL);
|
value = listTypePop(sobj,REDIS_TAIL);
|
||||||
|
rpoplpushHandlePush(c,c->argv[2],dobj,value);
|
||||||
/* Add the element to the target list (unless it's directly
|
|
||||||
* passed to some BLPOP-ing client */
|
|
||||||
if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
|
|
||||||
/* Create the list if the key does not exist */
|
|
||||||
if (!dobj) {
|
|
||||||
dobj = createZiplistObject();
|
|
||||||
dbAdd(c->db,c->argv[2],dobj);
|
|
||||||
}
|
|
||||||
listTypePush(dobj,value,REDIS_HEAD);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Send the element to the client as reply as well */
|
|
||||||
addReplyBulk(c,value);
|
|
||||||
|
|
||||||
/* listTypePop returns an object with its refcount incremented */
|
/* listTypePop returns an object with its refcount incremented */
|
||||||
decrRefCount(value);
|
decrRefCount(value);
|
||||||
@ -705,17 +710,23 @@ void rpoplpushcommand(redisClient *c) {
|
|||||||
|
|
||||||
/* Set a client in blocking mode for the specified key, with the specified
|
/* Set a client in blocking mode for the specified key, with the specified
|
||||||
* timeout */
|
* timeout */
|
||||||
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
|
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
list *l;
|
list *l;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
|
c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
|
||||||
c->blocking_keys_num = numkeys;
|
c->bpop.count = numkeys;
|
||||||
c->blockingto = timeout;
|
c->bpop.timeout = timeout;
|
||||||
|
c->bpop.target = target;
|
||||||
|
|
||||||
|
if (target != NULL) {
|
||||||
|
incrRefCount(target);
|
||||||
|
}
|
||||||
|
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
/* Add the key in the client structure, to map clients -> keys */
|
/* Add the key in the client structure, to map clients -> keys */
|
||||||
c->blocking_keys[j] = keys[j];
|
c->bpop.keys[j] = keys[j];
|
||||||
incrRefCount(keys[j]);
|
incrRefCount(keys[j]);
|
||||||
|
|
||||||
/* And in the other "side", to map keys -> clients */
|
/* And in the other "side", to map keys -> clients */
|
||||||
@ -735,7 +746,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
|
|||||||
}
|
}
|
||||||
/* Mark the client as a blocked client */
|
/* Mark the client as a blocked client */
|
||||||
c->flags |= REDIS_BLOCKED;
|
c->flags |= REDIS_BLOCKED;
|
||||||
server.blpop_blocked_clients++;
|
server.bpop_blocked_clients++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
|
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
|
||||||
@ -744,30 +755,27 @@ void unblockClientWaitingData(redisClient *c) {
|
|||||||
list *l;
|
list *l;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
redisAssert(c->blocking_keys != NULL);
|
redisAssert(c->bpop.keys != NULL);
|
||||||
/* The client may wait for multiple keys, so unblock it for every key. */
|
/* The client may wait for multiple keys, so unblock it for every key. */
|
||||||
for (j = 0; j < c->blocking_keys_num; j++) {
|
for (j = 0; j < c->bpop.count; j++) {
|
||||||
/* Remove this client from the list of clients waiting for this key. */
|
/* Remove this client from the list of clients waiting for this key. */
|
||||||
de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
|
de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
|
||||||
redisAssert(de != NULL);
|
redisAssert(de != NULL);
|
||||||
l = dictGetEntryVal(de);
|
l = dictGetEntryVal(de);
|
||||||
listDelNode(l,listSearchKey(l,c));
|
listDelNode(l,listSearchKey(l,c));
|
||||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||||
if (listLength(l) == 0)
|
if (listLength(l) == 0)
|
||||||
dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
|
dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
|
||||||
decrRefCount(c->blocking_keys[j]);
|
decrRefCount(c->bpop.keys[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cleanup the client structure */
|
/* Cleanup the client structure */
|
||||||
zfree(c->blocking_keys);
|
zfree(c->bpop.keys);
|
||||||
c->blocking_keys = NULL;
|
c->bpop.keys = NULL;
|
||||||
|
c->bpop.target = NULL;
|
||||||
c->flags &= (~REDIS_BLOCKED);
|
c->flags &= (~REDIS_BLOCKED);
|
||||||
server.blpop_blocked_clients--;
|
server.bpop_blocked_clients--;
|
||||||
/* We want to process data if there is some command waiting
|
listAddNodeTail(server.unblocked_clients,c);
|
||||||
* in the input buffer. Note that this is safe even if
|
|
||||||
* unblockClientWaitingData() gets called from freeClient() because
|
|
||||||
* freeClient() will be smart enough to call this function
|
|
||||||
* *after* c->querybuf was set to NULL. */
|
|
||||||
if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This should be called from any function PUSHing into lists.
|
/* This should be called from any function PUSHing into lists.
|
||||||
@ -783,39 +791,81 @@ void unblockClientWaitingData(redisClient *c) {
|
|||||||
int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
||||||
struct dictEntry *de;
|
struct dictEntry *de;
|
||||||
redisClient *receiver;
|
redisClient *receiver;
|
||||||
list *l;
|
int numclients;
|
||||||
|
list *clients;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
|
robj *dstkey, *dstobj;
|
||||||
|
|
||||||
de = dictFind(c->db->blocking_keys,key);
|
de = dictFind(c->db->blocking_keys,key);
|
||||||
if (de == NULL) return 0;
|
if (de == NULL) return 0;
|
||||||
l = dictGetEntryVal(de);
|
clients = dictGetEntryVal(de);
|
||||||
ln = listFirst(l);
|
numclients = listLength(clients);
|
||||||
|
|
||||||
|
/* Try to handle the push as long as there are clients waiting for a push.
|
||||||
|
* Note that "numclients" is used because the list of clients waiting for a
|
||||||
|
* push on "key" is deleted by unblockClient() when empty.
|
||||||
|
*
|
||||||
|
* This loop will have more than 1 iteration when there is a BRPOPLPUSH
|
||||||
|
* that cannot push the target list because it does not contain a list. If
|
||||||
|
* this happens, it simply tries the next client waiting for a push. */
|
||||||
|
while (numclients--) {
|
||||||
|
ln = listFirst(clients);
|
||||||
redisAssert(ln != NULL);
|
redisAssert(ln != NULL);
|
||||||
receiver = ln->value;
|
receiver = ln->value;
|
||||||
|
dstkey = receiver->bpop.target;
|
||||||
|
|
||||||
|
/* This should remove the first element of the "clients" list. */
|
||||||
|
unblockClientWaitingData(receiver);
|
||||||
|
redisAssert(ln != listFirst(clients));
|
||||||
|
|
||||||
|
if (dstkey == NULL) {
|
||||||
|
/* BRPOP/BLPOP */
|
||||||
addReplyMultiBulkLen(receiver,2);
|
addReplyMultiBulkLen(receiver,2);
|
||||||
addReplyBulk(receiver,key);
|
addReplyBulk(receiver,key);
|
||||||
addReplyBulk(receiver,ele);
|
addReplyBulk(receiver,ele);
|
||||||
unblockClientWaitingData(receiver);
|
|
||||||
return 1;
|
return 1;
|
||||||
|
} else {
|
||||||
|
/* BRPOPLPUSH */
|
||||||
|
dstobj = lookupKeyWrite(receiver->db,dstkey);
|
||||||
|
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
|
||||||
|
decrRefCount(dstkey);
|
||||||
|
} else {
|
||||||
|
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
|
||||||
|
decrRefCount(dstkey);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
|
||||||
|
long tval;
|
||||||
|
|
||||||
|
if (getLongFromObjectOrReply(c,object,&tval,
|
||||||
|
"timeout is not an integer or out of range") != REDIS_OK)
|
||||||
|
return REDIS_ERR;
|
||||||
|
|
||||||
|
if (tval < 0) {
|
||||||
|
addReplyError(c,"timeout is negative");
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tval > 0) tval += time(NULL);
|
||||||
|
*timeout = tval;
|
||||||
|
|
||||||
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Blocking RPOP/LPOP */
|
/* Blocking RPOP/LPOP */
|
||||||
void blockingPopGenericCommand(redisClient *c, int where) {
|
void blockingPopGenericCommand(redisClient *c, int where) {
|
||||||
robj *o;
|
robj *o;
|
||||||
long long lltimeout;
|
|
||||||
time_t timeout;
|
time_t timeout;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
/* Make sure timeout is an integer value */
|
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
|
|
||||||
"timeout is not an integer") != REDIS_OK) return;
|
|
||||||
|
|
||||||
/* Make sure the timeout is not negative */
|
|
||||||
if (lltimeout < 0) {
|
|
||||||
addReplyError(c,"timeout is negative");
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
for (j = 1; j < c->argc-1; j++) {
|
for (j = 1; j < c->argc-1; j++) {
|
||||||
o = lookupKeyWrite(c->db,c->argv[j]);
|
o = lookupKeyWrite(c->db,c->argv[j]);
|
||||||
@ -845,11 +895,13 @@ void blockingPopGenericCommand(redisClient *c, int where) {
|
|||||||
* because it is... */
|
* because it is... */
|
||||||
addReplyMultiBulkLen(c,2);
|
addReplyMultiBulkLen(c,2);
|
||||||
addReplyBulk(c,argv[1]);
|
addReplyBulk(c,argv[1]);
|
||||||
|
|
||||||
popGenericCommand(c,where);
|
popGenericCommand(c,where);
|
||||||
|
|
||||||
/* Fix the client structure with the original stuff */
|
/* Fix the client structure with the original stuff */
|
||||||
c->argv = orig_argv;
|
c->argv = orig_argv;
|
||||||
c->argc = orig_argc;
|
c->argc = orig_argc;
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -864,9 +916,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* If the list is empty or the key does not exists we must block */
|
/* If the list is empty or the key does not exists we must block */
|
||||||
timeout = lltimeout;
|
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
|
||||||
if (timeout > 0) timeout += time(NULL);
|
|
||||||
blockForKeys(c,c->argv+1,c->argc-2,timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void blpopCommand(redisClient *c) {
|
void blpopCommand(redisClient *c) {
|
||||||
@ -876,3 +926,34 @@ void blpopCommand(redisClient *c) {
|
|||||||
void brpopCommand(redisClient *c) {
|
void brpopCommand(redisClient *c) {
|
||||||
blockingPopGenericCommand(c,REDIS_TAIL);
|
blockingPopGenericCommand(c,REDIS_TAIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void brpoplpushCommand(redisClient *c) {
|
||||||
|
time_t timeout;
|
||||||
|
|
||||||
|
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
|
||||||
|
return;
|
||||||
|
|
||||||
|
robj *key = lookupKeyWrite(c->db, c->argv[1]);
|
||||||
|
|
||||||
|
if (key == NULL) {
|
||||||
|
if (c->flags & REDIS_MULTI) {
|
||||||
|
|
||||||
|
/* Blocking against an empty list in a multi state
|
||||||
|
* returns immediately. */
|
||||||
|
addReply(c, shared.nullmultibulk);
|
||||||
|
} else {
|
||||||
|
/* The list is empty and the client blocks. */
|
||||||
|
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (key->type != REDIS_LIST) {
|
||||||
|
addReply(c, shared.wrongtypeerr);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
/* The list exists and has elements, so
|
||||||
|
* the regular rpoplpushCommand is executed. */
|
||||||
|
redisAssert(listTypeLength(key) > 0);
|
||||||
|
rpoplpushCommand(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -127,7 +127,140 @@ start_server {
|
|||||||
assert_equal 0 [r llen blist1]
|
assert_equal 0 [r llen blist1]
|
||||||
assert_equal 1 [r llen blist2]
|
assert_equal 1 [r llen blist2]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH - $type" {
|
||||||
|
r del target
|
||||||
|
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
create_$type blist "a b $large c d"
|
||||||
|
|
||||||
|
$rd brpoplpush blist target 1
|
||||||
|
assert_equal d [$rd read]
|
||||||
|
|
||||||
|
assert_equal d [r rpop target]
|
||||||
|
assert_equal "a b $large c" [r lrange blist 0 -1]
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH with zero timeout should block indefinitely" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r del blist target
|
||||||
|
$rd brpoplpush blist target 0
|
||||||
|
after 1000
|
||||||
|
r rpush blist foo
|
||||||
|
assert_equal foo [$rd read]
|
||||||
|
assert_equal {foo} [r lrange target 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH with a client BLPOPing the target list" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
set rd2 [redis_deferring_client]
|
||||||
|
r del blist target
|
||||||
|
$rd2 blpop target 0
|
||||||
|
$rd brpoplpush blist target 0
|
||||||
|
after 1000
|
||||||
|
r rpush blist foo
|
||||||
|
assert_equal foo [$rd read]
|
||||||
|
assert_equal {target foo} [$rd2 read]
|
||||||
|
assert_equal 0 [r exists target]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH with wrong source type" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r del blist target
|
||||||
|
r set blist nolist
|
||||||
|
$rd brpoplpush blist target 1
|
||||||
|
assert_error "ERR*wrong kind*" {$rd read}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH with wrong destination type" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r del blist target
|
||||||
|
r set target nolist
|
||||||
|
r lpush blist foo
|
||||||
|
$rd brpoplpush blist target 1
|
||||||
|
assert_error "ERR*wrong kind*" {$rd read}
|
||||||
|
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r del blist target
|
||||||
|
r set target nolist
|
||||||
|
$rd brpoplpush blist target 0
|
||||||
|
after 1000
|
||||||
|
r rpush blist foo
|
||||||
|
assert_error "ERR*wrong kind*" {$rd read}
|
||||||
|
assert_equal {foo} [r lrange blist 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH with multiple blocked clients" {
|
||||||
|
set rd1 [redis_deferring_client]
|
||||||
|
set rd2 [redis_deferring_client]
|
||||||
|
r del blist target1 target2
|
||||||
|
r set target1 nolist
|
||||||
|
$rd1 brpoplpush blist target1 0
|
||||||
|
$rd2 brpoplpush blist target2 0
|
||||||
|
r lpush blist foo
|
||||||
|
|
||||||
|
assert_error "ERR*wrong kind*" {$rd1 read}
|
||||||
|
assert_equal {foo} [$rd2 read]
|
||||||
|
assert_equal {foo} [r lrange target2 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Linked BRPOPLPUSH" {
|
||||||
|
set rd1 [redis_deferring_client]
|
||||||
|
set rd2 [redis_deferring_client]
|
||||||
|
|
||||||
|
r del list1 list2 list3
|
||||||
|
|
||||||
|
$rd1 brpoplpush list1 list2 0
|
||||||
|
$rd2 brpoplpush list2 list3 0
|
||||||
|
|
||||||
|
r rpush list1 foo
|
||||||
|
|
||||||
|
assert_equal {} [r lrange list1 0 -1]
|
||||||
|
assert_equal {} [r lrange list2 0 -1]
|
||||||
|
assert_equal {foo} [r lrange list3 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Circular BRPOPLPUSH" {
|
||||||
|
set rd1 [redis_deferring_client]
|
||||||
|
set rd2 [redis_deferring_client]
|
||||||
|
|
||||||
|
r del list1 list2
|
||||||
|
|
||||||
|
$rd1 brpoplpush list1 list2 0
|
||||||
|
$rd2 brpoplpush list2 list1 0
|
||||||
|
|
||||||
|
r rpush list1 foo
|
||||||
|
|
||||||
|
assert_equal {foo} [r lrange list1 0 -1]
|
||||||
|
assert_equal {} [r lrange list2 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Self-referential BRPOPLPUSH" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
r del blist
|
||||||
|
|
||||||
|
$rd brpoplpush blist blist 0
|
||||||
|
|
||||||
|
r rpush blist foo
|
||||||
|
|
||||||
|
assert_equal {foo} [r lrange blist 0 -1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "BRPOPLPUSH inside a transaction" {
|
||||||
|
r del xlist target
|
||||||
|
r lpush xlist foo
|
||||||
|
r lpush xlist bar
|
||||||
|
|
||||||
|
r multi
|
||||||
|
r brpoplpush xlist target 0
|
||||||
|
r brpoplpush xlist target 0
|
||||||
|
r brpoplpush xlist target 0
|
||||||
|
r lrange xlist 0 -1
|
||||||
|
r lrange target 0 -1
|
||||||
|
r exec
|
||||||
|
} {foo bar {} {} {bar foo}}
|
||||||
|
|
||||||
foreach {pop} {BLPOP BRPOP} {
|
foreach {pop} {BLPOP BRPOP} {
|
||||||
test "$pop: with single empty list argument" {
|
test "$pop: with single empty list argument" {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user