mirror of
https://github.com/fluencelabs/redis
synced 2025-04-03 00:01:04 +00:00
Streams: initial work to use blocking lists logic for streams XREAD.
This commit is contained in:
parent
439120c620
commit
4a377cecd8
208
src/blocked.c
208
src/blocked.c
@ -65,6 +65,8 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
|
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
||||||
|
|
||||||
/* Get a timeout value from an object and store it into 'timeout'.
|
/* Get a timeout value from an object and store it into 'timeout'.
|
||||||
* The final timeout is always stored as milliseconds as a time where the
|
* The final timeout is always stored as milliseconds as a time where the
|
||||||
* timeout will expire, however the parsing is performed according to
|
* timeout will expire, however the parsing is performed according to
|
||||||
@ -193,3 +195,209 @@ void disconnectAllBlockedClients(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function should be called by Redis every time a single command,
|
||||||
|
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
||||||
|
* being called by a client.
|
||||||
|
*
|
||||||
|
* All the keys with at least one client blocked that received at least
|
||||||
|
* one new element via some PUSH/XADD operation are accumulated into
|
||||||
|
* the server.ready_keys list. This function will run the list and will
|
||||||
|
* serve clients accordingly. Note that the function will iterate again and
|
||||||
|
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
||||||
|
* to serve because of the PUSH side of BRPOPLPUSH. */
|
||||||
|
void handleClientsBlockedOnKeys(void) {
|
||||||
|
while(listLength(server.ready_keys) != 0) {
|
||||||
|
list *l;
|
||||||
|
|
||||||
|
/* Point server.ready_keys to a fresh list and save the current one
|
||||||
|
* locally. This way as we run the old list we are free to call
|
||||||
|
* signalKeyAsReady() that may push new elements in server.ready_keys
|
||||||
|
* when handling clients blocked into BRPOPLPUSH. */
|
||||||
|
l = server.ready_keys;
|
||||||
|
server.ready_keys = listCreate();
|
||||||
|
|
||||||
|
while(listLength(l) != 0) {
|
||||||
|
listNode *ln = listFirst(l);
|
||||||
|
readyList *rl = ln->value;
|
||||||
|
|
||||||
|
/* First of all remove this key from db->ready_keys so that
|
||||||
|
* we can safely call signalKeyAsReady() against this key. */
|
||||||
|
dictDelete(rl->db->ready_keys,rl->key);
|
||||||
|
|
||||||
|
/* If the key exists and it's a list, serve blocked clients
|
||||||
|
* with data. */
|
||||||
|
robj *o = lookupKeyWrite(rl->db,rl->key);
|
||||||
|
if (o != NULL && o->type == OBJ_LIST) {
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
/* We serve clients in the same order they blocked for
|
||||||
|
* this key, from the first blocked to the last. */
|
||||||
|
de = dictFind(rl->db->blocking_keys,rl->key);
|
||||||
|
if (de) {
|
||||||
|
list *clients = dictGetVal(de);
|
||||||
|
int numclients = listLength(clients);
|
||||||
|
|
||||||
|
while(numclients--) {
|
||||||
|
listNode *clientnode = listFirst(clients);
|
||||||
|
client *receiver = clientnode->value;
|
||||||
|
robj *dstkey = receiver->bpop.target;
|
||||||
|
int where = (receiver->lastcmd &&
|
||||||
|
receiver->lastcmd->proc == blpopCommand) ?
|
||||||
|
LIST_HEAD : LIST_TAIL;
|
||||||
|
robj *value = listTypePop(o,where);
|
||||||
|
|
||||||
|
if (value) {
|
||||||
|
/* Protect receiver->bpop.target, that will be
|
||||||
|
* freed by the next unblockClient()
|
||||||
|
* call. */
|
||||||
|
if (dstkey) incrRefCount(dstkey);
|
||||||
|
unblockClient(receiver);
|
||||||
|
|
||||||
|
if (serveClientBlockedOnList(receiver,
|
||||||
|
rl->key,dstkey,rl->db,value,
|
||||||
|
where) == C_ERR)
|
||||||
|
{
|
||||||
|
/* If we failed serving the client we need
|
||||||
|
* to also undo the POP operation. */
|
||||||
|
listTypePush(o,value,where);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dstkey) decrRefCount(dstkey);
|
||||||
|
decrRefCount(value);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listTypeLength(o) == 0) {
|
||||||
|
dbDelete(rl->db,rl->key);
|
||||||
|
}
|
||||||
|
/* We don't call signalModifiedKey() as it was already called
|
||||||
|
* when an element was pushed on the list. */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Free this item. */
|
||||||
|
decrRefCount(rl->key);
|
||||||
|
zfree(rl);
|
||||||
|
listDelNode(l,ln);
|
||||||
|
}
|
||||||
|
listRelease(l); /* We have the new list on place at this point. */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This is how the current blocking POP works, we use BLPOP as example:
|
||||||
|
* - If the user calls BLPOP and the key exists and contains a non empty list
|
||||||
|
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
||||||
|
* if blocking is not required.
|
||||||
|
* - If instead BLPOP is called and the key does not exists or the list is
|
||||||
|
* empty we need to block. In order to do so we remove the notification for
|
||||||
|
* new data to read in the client socket (so that we'll not serve new
|
||||||
|
* requests if the blocking request is not served). Also we put the client
|
||||||
|
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
|
||||||
|
* blocking for this keys.
|
||||||
|
* - If a PUSH operation against a key with blocked clients waiting is
|
||||||
|
* performed, we mark this key as "ready", and after the current command,
|
||||||
|
* MULTI/EXEC block, or script, is executed, we serve all the clients waiting
|
||||||
|
* for this list, from the one that blocked first, to the last, accordingly
|
||||||
|
* to the number of elements we have in the ready list.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* Set a client in blocking mode for the specified key, with the specified
|
||||||
|
* timeout */
|
||||||
|
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
|
||||||
|
dictEntry *de;
|
||||||
|
list *l;
|
||||||
|
int j;
|
||||||
|
|
||||||
|
c->bpop.timeout = timeout;
|
||||||
|
c->bpop.target = target;
|
||||||
|
|
||||||
|
if (target != NULL) incrRefCount(target);
|
||||||
|
|
||||||
|
for (j = 0; j < numkeys; j++) {
|
||||||
|
/* If the key already exists in the dict ignore it. */
|
||||||
|
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
|
||||||
|
incrRefCount(keys[j]);
|
||||||
|
|
||||||
|
/* And in the other "side", to map keys -> clients */
|
||||||
|
de = dictFind(c->db->blocking_keys,keys[j]);
|
||||||
|
if (de == NULL) {
|
||||||
|
int retval;
|
||||||
|
|
||||||
|
/* For every key we take a list of clients blocked for it */
|
||||||
|
l = listCreate();
|
||||||
|
retval = dictAdd(c->db->blocking_keys,keys[j],l);
|
||||||
|
incrRefCount(keys[j]);
|
||||||
|
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
|
||||||
|
} else {
|
||||||
|
l = dictGetVal(de);
|
||||||
|
}
|
||||||
|
listAddNodeTail(l,c);
|
||||||
|
}
|
||||||
|
blockClient(c,BLOCKED_LIST);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
||||||
|
* You should never call this function directly, but unblockClient() instead. */
|
||||||
|
void unblockClientWaitingData(client *c) {
|
||||||
|
dictEntry *de;
|
||||||
|
dictIterator *di;
|
||||||
|
list *l;
|
||||||
|
|
||||||
|
serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
|
||||||
|
di = dictGetIterator(c->bpop.keys);
|
||||||
|
/* The client may wait for multiple keys, so unblock it for every key. */
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
robj *key = dictGetKey(de);
|
||||||
|
|
||||||
|
/* Remove this client from the list of clients waiting for this key. */
|
||||||
|
l = dictFetchValue(c->db->blocking_keys,key);
|
||||||
|
serverAssertWithInfo(c,key,l != NULL);
|
||||||
|
listDelNode(l,listSearchKey(l,c));
|
||||||
|
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||||
|
if (listLength(l) == 0)
|
||||||
|
dictDelete(c->db->blocking_keys,key);
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
|
/* Cleanup the client structure */
|
||||||
|
dictEmpty(c->bpop.keys,NULL);
|
||||||
|
if (c->bpop.target) {
|
||||||
|
decrRefCount(c->bpop.target);
|
||||||
|
c->bpop.target = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If the specified key has clients blocked waiting for list pushes, this
|
||||||
|
* function will put the key reference into the server.ready_keys list.
|
||||||
|
* Note that db->ready_keys is a hash table that allows us to avoid putting
|
||||||
|
* the same key again and again in the list in case of multiple pushes
|
||||||
|
* made by a script or in the context of MULTI/EXEC.
|
||||||
|
*
|
||||||
|
* The list will be finally processed by handleClientsBlockedOnLists() */
|
||||||
|
void signalKeyAsReady(redisDb *db, robj *key) {
|
||||||
|
readyList *rl;
|
||||||
|
|
||||||
|
/* No clients blocking for this key? No need to queue it. */
|
||||||
|
if (dictFind(db->blocking_keys,key) == NULL) return;
|
||||||
|
|
||||||
|
/* Key was already signaled? No need to queue it again. */
|
||||||
|
if (dictFind(db->ready_keys,key) != NULL) return;
|
||||||
|
|
||||||
|
/* Ok, we need to queue this key into server.ready_keys. */
|
||||||
|
rl = zmalloc(sizeof(*rl));
|
||||||
|
rl->key = key;
|
||||||
|
rl->db = db;
|
||||||
|
incrRefCount(key);
|
||||||
|
listAddNodeTail(server.ready_keys,rl);
|
||||||
|
|
||||||
|
/* We also add the key in the db->ready_keys dictionary in order
|
||||||
|
* to avoid adding it multiple times into a list with a simple O(1)
|
||||||
|
* check. */
|
||||||
|
incrRefCount(key);
|
||||||
|
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
9
src/db.c
9
src/db.c
@ -169,9 +169,10 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
|||||||
int retval = dictAdd(db->dict, copy, val);
|
int retval = dictAdd(db->dict, copy, val);
|
||||||
|
|
||||||
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
||||||
if (val->type == OBJ_LIST) signalListAsReady(db, key);
|
if (val->type == OBJ_LIST || val->type == OBJ_STREAM)
|
||||||
|
signalKeyAsReady(db, key);
|
||||||
if (server.cluster_enabled) slotToKeyAdd(key);
|
if (server.cluster_enabled) slotToKeyAdd(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Overwrite an existing key with a new value. Incrementing the reference
|
/* Overwrite an existing key with a new value. Incrementing the reference
|
||||||
* count of the new value is up to the caller.
|
* count of the new value is up to the caller.
|
||||||
@ -951,8 +952,8 @@ void scanDatabaseForReadyLists(redisDb *db) {
|
|||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
robj *key = dictGetKey(de);
|
robj *key = dictGetKey(de);
|
||||||
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
|
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
|
||||||
if (value && value->type == OBJ_LIST)
|
if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM))
|
||||||
signalListAsReady(db, key);
|
signalKeyAsReady(db, key);
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ client *createClient(int fd) {
|
|||||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||||
c->btype = BLOCKED_NONE;
|
c->btype = BLOCKED_NONE;
|
||||||
c->bpop.timeout = 0;
|
c->bpop.timeout = 0;
|
||||||
c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL);
|
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
|
||||||
c->bpop.target = NULL;
|
c->bpop.target = NULL;
|
||||||
c->bpop.numreplicas = 0;
|
c->bpop.numreplicas = 0;
|
||||||
c->bpop.reploffset = 0;
|
c->bpop.reploffset = 0;
|
||||||
|
15
src/server.c
15
src/server.c
@ -550,10 +550,21 @@ dictType objectKeyPointerValueDictType = {
|
|||||||
NULL, /* key dup */
|
NULL, /* key dup */
|
||||||
NULL, /* val dup */
|
NULL, /* val dup */
|
||||||
dictEncObjKeyCompare, /* key compare */
|
dictEncObjKeyCompare, /* key compare */
|
||||||
dictObjectDestructor, /* key destructor */
|
dictObjectDestructor, /* key destructor */
|
||||||
NULL /* val destructor */
|
NULL /* val destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* Like objectKeyPointerValueDictType(), but values can be destroyed, if
|
||||||
|
* not NULL, calling zfree(). */
|
||||||
|
dictType objectKeyHeapPointerValueDictType = {
|
||||||
|
dictEncObjHash, /* hash function */
|
||||||
|
NULL, /* key dup */
|
||||||
|
NULL, /* val dup */
|
||||||
|
dictEncObjKeyCompare, /* key compare */
|
||||||
|
dictObjectDestructor, /* key destructor */
|
||||||
|
dictVanillaFree /* val destructor */
|
||||||
|
};
|
||||||
|
|
||||||
/* Set dictionary type. Keys are SDS strings, values are ot used. */
|
/* Set dictionary type. Keys are SDS strings, values are ot used. */
|
||||||
dictType setDictType = {
|
dictType setDictType = {
|
||||||
dictSdsHash, /* hash function */
|
dictSdsHash, /* hash function */
|
||||||
@ -2508,7 +2519,7 @@ int processCommand(client *c) {
|
|||||||
call(c,CMD_CALL_FULL);
|
call(c,CMD_CALL_FULL);
|
||||||
c->woff = server.master_repl_offset;
|
c->woff = server.master_repl_offset;
|
||||||
if (listLength(server.ready_keys))
|
if (listLength(server.ready_keys))
|
||||||
handleClientsBlockedOnLists();
|
handleClientsBlockedOnKeys();
|
||||||
}
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
11
src/server.h
11
src/server.h
@ -256,6 +256,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define BLOCKED_LIST 1 /* BLPOP & co. */
|
#define BLOCKED_LIST 1 /* BLPOP & co. */
|
||||||
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
|
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
|
||||||
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
|
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
|
||||||
|
#define BLOCKED_STREAM 4 /* XREAD. */
|
||||||
|
|
||||||
/* Client request types */
|
/* Client request types */
|
||||||
#define PROTO_REQ_INLINE 1
|
#define PROTO_REQ_INLINE 1
|
||||||
@ -641,9 +642,9 @@ typedef struct blockingState {
|
|||||||
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
|
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||||
* is > timeout then the operation timed out. */
|
* is > timeout then the operation timed out. */
|
||||||
|
|
||||||
/* BLOCKED_LIST */
|
/* BLOCKED_LIST and BLOCKED_STREAM */
|
||||||
dict *keys; /* The keys we are waiting to terminate a blocking
|
dict *keys; /* The keys we are waiting to terminate a blocking
|
||||||
* operation such as BLPOP. Otherwise NULL. */
|
* operation such as BLPOP or XREAD. Or NULL. */
|
||||||
robj *target; /* The key that should receive the element,
|
robj *target; /* The key that should receive the element,
|
||||||
* for BRPOPLPUSH. */
|
* for BRPOPLPUSH. */
|
||||||
|
|
||||||
@ -1291,6 +1292,7 @@ typedef struct {
|
|||||||
extern struct redisServer server;
|
extern struct redisServer server;
|
||||||
extern struct sharedObjectsStruct shared;
|
extern struct sharedObjectsStruct shared;
|
||||||
extern dictType objectKeyPointerValueDictType;
|
extern dictType objectKeyPointerValueDictType;
|
||||||
|
extern dictType objectKeyHeapPointerValueDictType;
|
||||||
extern dictType setDictType;
|
extern dictType setDictType;
|
||||||
extern dictType zsetDictType;
|
extern dictType zsetDictType;
|
||||||
extern dictType clusterNodesDictType;
|
extern dictType clusterNodesDictType;
|
||||||
@ -1413,9 +1415,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
|
|||||||
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
|
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
|
||||||
void listTypeConvert(robj *subject, int enc);
|
void listTypeConvert(robj *subject, int enc);
|
||||||
void unblockClientWaitingData(client *c);
|
void unblockClientWaitingData(client *c);
|
||||||
void handleClientsBlockedOnLists(void);
|
|
||||||
void popGenericCommand(client *c, int where);
|
void popGenericCommand(client *c, int where);
|
||||||
void signalListAsReady(redisDb *db, robj *key);
|
|
||||||
|
|
||||||
/* Stream data type. */
|
/* Stream data type. */
|
||||||
stream *streamNew(void);
|
stream *streamNew(void);
|
||||||
@ -1798,6 +1798,9 @@ void unblockClient(client *c);
|
|||||||
void replyToBlockedClientTimedOut(client *c);
|
void replyToBlockedClientTimedOut(client *c);
|
||||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
|
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
|
||||||
void disconnectAllBlockedClients(void);
|
void disconnectAllBlockedClients(void);
|
||||||
|
void handleClientsBlockedOnKeys(void);
|
||||||
|
void signalKeyAsReady(redisDb *db, robj *key);
|
||||||
|
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target);
|
||||||
|
|
||||||
/* expire.c -- Handling of expired keys */
|
/* expire.c -- Handling of expired keys */
|
||||||
void activeExpireCycle(int type);
|
void activeExpireCycle(int type);
|
||||||
|
204
src/t_list.c
204
src/t_list.c
@ -603,119 +603,6 @@ void rpoplpushCommand(client *c) {
|
|||||||
* Blocking POP operations
|
* Blocking POP operations
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
/* This is how the current blocking POP works, we use BLPOP as example:
|
|
||||||
* - If the user calls BLPOP and the key exists and contains a non empty list
|
|
||||||
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
|
||||||
* if blocking is not required.
|
|
||||||
* - If instead BLPOP is called and the key does not exists or the list is
|
|
||||||
* empty we need to block. In order to do so we remove the notification for
|
|
||||||
* new data to read in the client socket (so that we'll not serve new
|
|
||||||
* requests if the blocking request is not served). Also we put the client
|
|
||||||
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
|
|
||||||
* blocking for this keys.
|
|
||||||
* - If a PUSH operation against a key with blocked clients waiting is
|
|
||||||
* performed, we mark this key as "ready", and after the current command,
|
|
||||||
* MULTI/EXEC block, or script, is executed, we serve all the clients waiting
|
|
||||||
* for this list, from the one that blocked first, to the last, accordingly
|
|
||||||
* to the number of elements we have in the ready list.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* Set a client in blocking mode for the specified key, with the specified
|
|
||||||
* timeout */
|
|
||||||
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
|
|
||||||
dictEntry *de;
|
|
||||||
list *l;
|
|
||||||
int j;
|
|
||||||
|
|
||||||
c->bpop.timeout = timeout;
|
|
||||||
c->bpop.target = target;
|
|
||||||
|
|
||||||
if (target != NULL) incrRefCount(target);
|
|
||||||
|
|
||||||
for (j = 0; j < numkeys; j++) {
|
|
||||||
/* If the key already exists in the dict ignore it. */
|
|
||||||
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
|
|
||||||
incrRefCount(keys[j]);
|
|
||||||
|
|
||||||
/* And in the other "side", to map keys -> clients */
|
|
||||||
de = dictFind(c->db->blocking_keys,keys[j]);
|
|
||||||
if (de == NULL) {
|
|
||||||
int retval;
|
|
||||||
|
|
||||||
/* For every key we take a list of clients blocked for it */
|
|
||||||
l = listCreate();
|
|
||||||
retval = dictAdd(c->db->blocking_keys,keys[j],l);
|
|
||||||
incrRefCount(keys[j]);
|
|
||||||
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
|
|
||||||
} else {
|
|
||||||
l = dictGetVal(de);
|
|
||||||
}
|
|
||||||
listAddNodeTail(l,c);
|
|
||||||
}
|
|
||||||
blockClient(c,BLOCKED_LIST);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
|
||||||
* You should never call this function directly, but unblockClient() instead. */
|
|
||||||
void unblockClientWaitingData(client *c) {
|
|
||||||
dictEntry *de;
|
|
||||||
dictIterator *di;
|
|
||||||
list *l;
|
|
||||||
|
|
||||||
serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
|
|
||||||
di = dictGetIterator(c->bpop.keys);
|
|
||||||
/* The client may wait for multiple keys, so unblock it for every key. */
|
|
||||||
while((de = dictNext(di)) != NULL) {
|
|
||||||
robj *key = dictGetKey(de);
|
|
||||||
|
|
||||||
/* Remove this client from the list of clients waiting for this key. */
|
|
||||||
l = dictFetchValue(c->db->blocking_keys,key);
|
|
||||||
serverAssertWithInfo(c,key,l != NULL);
|
|
||||||
listDelNode(l,listSearchKey(l,c));
|
|
||||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
|
||||||
if (listLength(l) == 0)
|
|
||||||
dictDelete(c->db->blocking_keys,key);
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
|
|
||||||
/* Cleanup the client structure */
|
|
||||||
dictEmpty(c->bpop.keys,NULL);
|
|
||||||
if (c->bpop.target) {
|
|
||||||
decrRefCount(c->bpop.target);
|
|
||||||
c->bpop.target = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If the specified key has clients blocked waiting for list pushes, this
|
|
||||||
* function will put the key reference into the server.ready_keys list.
|
|
||||||
* Note that db->ready_keys is a hash table that allows us to avoid putting
|
|
||||||
* the same key again and again in the list in case of multiple pushes
|
|
||||||
* made by a script or in the context of MULTI/EXEC.
|
|
||||||
*
|
|
||||||
* The list will be finally processed by handleClientsBlockedOnLists() */
|
|
||||||
void signalListAsReady(redisDb *db, robj *key) {
|
|
||||||
readyList *rl;
|
|
||||||
|
|
||||||
/* No clients blocking for this key? No need to queue it. */
|
|
||||||
if (dictFind(db->blocking_keys,key) == NULL) return;
|
|
||||||
|
|
||||||
/* Key was already signaled? No need to queue it again. */
|
|
||||||
if (dictFind(db->ready_keys,key) != NULL) return;
|
|
||||||
|
|
||||||
/* Ok, we need to queue this key into server.ready_keys. */
|
|
||||||
rl = zmalloc(sizeof(*rl));
|
|
||||||
rl->key = key;
|
|
||||||
rl->db = db;
|
|
||||||
incrRefCount(key);
|
|
||||||
listAddNodeTail(server.ready_keys,rl);
|
|
||||||
|
|
||||||
/* We also add the key in the db->ready_keys dictionary in order
|
|
||||||
* to avoid adding it multiple times into a list with a simple O(1)
|
|
||||||
* check. */
|
|
||||||
incrRefCount(key);
|
|
||||||
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This is a helper function for handleClientsBlockedOnLists(). It's work
|
/* This is a helper function for handleClientsBlockedOnLists(). It's work
|
||||||
* is to serve a specific client (receiver) that is blocked on 'key'
|
* is to serve a specific client (receiver) that is blocked on 'key'
|
||||||
* in the context of the specified 'db', doing the following:
|
* in the context of the specified 'db', doing the following:
|
||||||
@ -785,97 +672,6 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function should be called by Redis every time a single command,
|
|
||||||
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
|
||||||
* being called by a client.
|
|
||||||
*
|
|
||||||
* All the keys with at least one client blocked that received at least
|
|
||||||
* one new element via some PUSH operation are accumulated into
|
|
||||||
* the server.ready_keys list. This function will run the list and will
|
|
||||||
* serve clients accordingly. Note that the function will iterate again and
|
|
||||||
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
|
||||||
* to serve because of the PUSH side of BRPOPLPUSH. */
|
|
||||||
void handleClientsBlockedOnLists(void) {
|
|
||||||
while(listLength(server.ready_keys) != 0) {
|
|
||||||
list *l;
|
|
||||||
|
|
||||||
/* Point server.ready_keys to a fresh list and save the current one
|
|
||||||
* locally. This way as we run the old list we are free to call
|
|
||||||
* signalListAsReady() that may push new elements in server.ready_keys
|
|
||||||
* when handling clients blocked into BRPOPLPUSH. */
|
|
||||||
l = server.ready_keys;
|
|
||||||
server.ready_keys = listCreate();
|
|
||||||
|
|
||||||
while(listLength(l) != 0) {
|
|
||||||
listNode *ln = listFirst(l);
|
|
||||||
readyList *rl = ln->value;
|
|
||||||
|
|
||||||
/* First of all remove this key from db->ready_keys so that
|
|
||||||
* we can safely call signalListAsReady() against this key. */
|
|
||||||
dictDelete(rl->db->ready_keys,rl->key);
|
|
||||||
|
|
||||||
/* If the key exists and it's a list, serve blocked clients
|
|
||||||
* with data. */
|
|
||||||
robj *o = lookupKeyWrite(rl->db,rl->key);
|
|
||||||
if (o != NULL && o->type == OBJ_LIST) {
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
/* We serve clients in the same order they blocked for
|
|
||||||
* this key, from the first blocked to the last. */
|
|
||||||
de = dictFind(rl->db->blocking_keys,rl->key);
|
|
||||||
if (de) {
|
|
||||||
list *clients = dictGetVal(de);
|
|
||||||
int numclients = listLength(clients);
|
|
||||||
|
|
||||||
while(numclients--) {
|
|
||||||
listNode *clientnode = listFirst(clients);
|
|
||||||
client *receiver = clientnode->value;
|
|
||||||
robj *dstkey = receiver->bpop.target;
|
|
||||||
int where = (receiver->lastcmd &&
|
|
||||||
receiver->lastcmd->proc == blpopCommand) ?
|
|
||||||
LIST_HEAD : LIST_TAIL;
|
|
||||||
robj *value = listTypePop(o,where);
|
|
||||||
|
|
||||||
if (value) {
|
|
||||||
/* Protect receiver->bpop.target, that will be
|
|
||||||
* freed by the next unblockClient()
|
|
||||||
* call. */
|
|
||||||
if (dstkey) incrRefCount(dstkey);
|
|
||||||
unblockClient(receiver);
|
|
||||||
|
|
||||||
if (serveClientBlockedOnList(receiver,
|
|
||||||
rl->key,dstkey,rl->db,value,
|
|
||||||
where) == C_ERR)
|
|
||||||
{
|
|
||||||
/* If we failed serving the client we need
|
|
||||||
* to also undo the POP operation. */
|
|
||||||
listTypePush(o,value,where);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dstkey) decrRefCount(dstkey);
|
|
||||||
decrRefCount(value);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listTypeLength(o) == 0) {
|
|
||||||
dbDelete(rl->db,rl->key);
|
|
||||||
}
|
|
||||||
/* We don't call signalModifiedKey() as it was already called
|
|
||||||
* when an element was pushed on the list. */
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Free this item. */
|
|
||||||
decrRefCount(rl->key);
|
|
||||||
zfree(rl);
|
|
||||||
listDelNode(l,ln);
|
|
||||||
}
|
|
||||||
listRelease(l); /* We have the new list on place at this point. */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Blocking RPOP/LPOP */
|
/* Blocking RPOP/LPOP */
|
||||||
void blockingPopGenericCommand(client *c, int where) {
|
void blockingPopGenericCommand(client *c, int where) {
|
||||||
robj *o;
|
robj *o;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user