mirror of
https://github.com/fluencelabs/redis
synced 2025-04-01 15:21:03 +00:00
Fix blocking operations from missing new lists
Behrad Zari discovered [1] and Josiah reported [2]: if you block and wait for a list to exist, but the list creates from a non-push command, the blocked client never gets notified. This commit adds notification of blocked clients into the DB layer and away from individual commands. Lists can be created by [LR]PUSH, SORT..STORE, RENAME, MOVE, and RESTORE. Previously, blocked client notifications were only triggered by [LR]PUSH. Your client would never get notified if a list were created by SORT..STORE or RENAME or a RESTORE, etc. Blocked client notification now happens in one unified place: - dbAdd() triggers notification when adding a list to the DB Two new tests are added that fail prior to this commit. All test pass. Fixes #1668 [1]: https://groups.google.com/forum/#!topic/redis-db/k4oWfMkN1NU [2]: #1668
This commit is contained in:
parent
56161ca0a4
commit
33f943b4cd
1
src/db.c
1
src/db.c
@ -95,6 +95,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
|||||||
int retval = dictAdd(db->dict, copy, val);
|
int retval = dictAdd(db->dict, copy, val);
|
||||||
|
|
||||||
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
|
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
|
||||||
|
if (val->type == REDIS_LIST) signalListAsReady(db, key);
|
||||||
if (server.cluster_enabled) slotToKeyAdd(key);
|
if (server.cluster_enabled) slotToKeyAdd(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1035,6 +1035,7 @@ void listTypeConvert(robj *subject, int enc);
|
|||||||
void unblockClientWaitingData(redisClient *c);
|
void unblockClientWaitingData(redisClient *c);
|
||||||
void handleClientsBlockedOnLists(void);
|
void handleClientsBlockedOnLists(void);
|
||||||
void popGenericCommand(redisClient *c, int where);
|
void popGenericCommand(redisClient *c, int where);
|
||||||
|
void signalListAsReady(redisDb *db, robj *key);
|
||||||
|
|
||||||
/* MULTI/EXEC/WATCH... */
|
/* MULTI/EXEC/WATCH... */
|
||||||
void unwatchAllKeys(redisClient *c);
|
void unwatchAllKeys(redisClient *c);
|
||||||
|
16
src/t_list.c
16
src/t_list.c
@ -29,8 +29,6 @@
|
|||||||
|
|
||||||
#include "redis.h"
|
#include "redis.h"
|
||||||
|
|
||||||
void signalListAsReady(redisClient *c, robj *key);
|
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* List API
|
* List API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
@ -297,15 +295,12 @@ void listTypeConvert(robj *subject, int enc) {
|
|||||||
void pushGenericCommand(redisClient *c, int where) {
|
void pushGenericCommand(redisClient *c, int where) {
|
||||||
int j, waiting = 0, pushed = 0;
|
int j, waiting = 0, pushed = 0;
|
||||||
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
|
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
|
||||||
int may_have_waiting_clients = (lobj == NULL);
|
|
||||||
|
|
||||||
if (lobj && lobj->type != REDIS_LIST) {
|
if (lobj && lobj->type != REDIS_LIST) {
|
||||||
addReply(c,shared.wrongtypeerr);
|
addReply(c,shared.wrongtypeerr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
|
|
||||||
|
|
||||||
for (j = 2; j < c->argc; j++) {
|
for (j = 2; j < c->argc; j++) {
|
||||||
c->argv[j] = tryObjectEncoding(c->argv[j]);
|
c->argv[j] = tryObjectEncoding(c->argv[j]);
|
||||||
if (!lobj) {
|
if (!lobj) {
|
||||||
@ -709,7 +704,6 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
|
|||||||
if (!dstobj) {
|
if (!dstobj) {
|
||||||
dstobj = createZiplistObject();
|
dstobj = createZiplistObject();
|
||||||
dbAdd(c->db,dstkey,dstobj);
|
dbAdd(c->db,dstkey,dstobj);
|
||||||
signalListAsReady(c,dstkey);
|
|
||||||
}
|
}
|
||||||
signalModifiedKey(c->db,dstkey);
|
signalModifiedKey(c->db,dstkey);
|
||||||
listTypePush(dstobj,value,REDIS_HEAD);
|
listTypePush(dstobj,value,REDIS_HEAD);
|
||||||
@ -849,19 +843,19 @@ void unblockClientWaitingData(redisClient *c) {
|
|||||||
* made by a script or in the context of MULTI/EXEC.
|
* made by a script or in the context of MULTI/EXEC.
|
||||||
*
|
*
|
||||||
* The list will be finally processed by handleClientsBlockedOnLists() */
|
* The list will be finally processed by handleClientsBlockedOnLists() */
|
||||||
void signalListAsReady(redisClient *c, robj *key) {
|
void signalListAsReady(redisDb *db, robj *key) {
|
||||||
readyList *rl;
|
readyList *rl;
|
||||||
|
|
||||||
/* No clients blocking for this key? No need to queue it. */
|
/* No clients blocking for this key? No need to queue it. */
|
||||||
if (dictFind(c->db->blocking_keys,key) == NULL) return;
|
if (dictFind(db->blocking_keys,key) == NULL) return;
|
||||||
|
|
||||||
/* Key was already signaled? No need to queue it again. */
|
/* Key was already signaled? No need to queue it again. */
|
||||||
if (dictFind(c->db->ready_keys,key) != NULL) return;
|
if (dictFind(db->ready_keys,key) != NULL) return;
|
||||||
|
|
||||||
/* Ok, we need to queue this key into server.ready_keys. */
|
/* Ok, we need to queue this key into server.ready_keys. */
|
||||||
rl = zmalloc(sizeof(*rl));
|
rl = zmalloc(sizeof(*rl));
|
||||||
rl->key = key;
|
rl->key = key;
|
||||||
rl->db = c->db;
|
rl->db = db;
|
||||||
incrRefCount(key);
|
incrRefCount(key);
|
||||||
listAddNodeTail(server.ready_keys,rl);
|
listAddNodeTail(server.ready_keys,rl);
|
||||||
|
|
||||||
@ -869,7 +863,7 @@ void signalListAsReady(redisClient *c, robj *key) {
|
|||||||
* to avoid adding it multiple times into a list with a simple O(1)
|
* to avoid adding it multiple times into a list with a simple O(1)
|
||||||
* check. */
|
* check. */
|
||||||
incrRefCount(key);
|
incrRefCount(key);
|
||||||
redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
|
redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is a helper function for handleClientsBlockedOnLists(). It's work
|
/* This is a helper function for handleClientsBlockedOnLists(). It's work
|
||||||
|
@ -408,6 +408,29 @@ start_server {
|
|||||||
$rd read
|
$rd read
|
||||||
} {}
|
} {}
|
||||||
|
|
||||||
|
test "BLPOP when new key is moved into place" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
$rd blpop foo 5
|
||||||
|
r lpush bob abc def hij
|
||||||
|
r rename bob foo
|
||||||
|
$rd read
|
||||||
|
} {foo hij}
|
||||||
|
|
||||||
|
test "BLPOP when result key is created by SORT..STORE" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
|
||||||
|
# zero out list from previous test without explicit delete
|
||||||
|
r lpop foo
|
||||||
|
r lpop foo
|
||||||
|
r lpop foo
|
||||||
|
|
||||||
|
$rd blpop foo 5
|
||||||
|
r lpush notfoo hello hola aguacate konichiwa zanzibar
|
||||||
|
r sort notfoo ALPHA store foo
|
||||||
|
$rd read
|
||||||
|
} {foo aguacate}
|
||||||
|
|
||||||
foreach {pop} {BLPOP BRPOP} {
|
foreach {pop} {BLPOP BRPOP} {
|
||||||
test "$pop: with single empty list argument" {
|
test "$pop: with single empty list argument" {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user