From 4a377cecd82e21307a887bb5f9fba55d79044bb8 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 6 Sep 2017 15:43:28 +0200 Subject: [PATCH] Streams: initial work to use blocking lists logic for streams XREAD. --- src/blocked.c | 208 +++++++++++++++++++++++++++++++++++++++++++++++ src/db.c | 9 +- src/networking.c | 2 +- src/server.c | 15 +++- src/server.h | 11 ++- src/t_list.c | 204 ---------------------------------------------- 6 files changed, 234 insertions(+), 215 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 54b26b71..acd3b948 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -65,6 +65,8 @@ #include "server.h" +int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); + /* Get a timeout value from an object and store it into 'timeout'. * The final timeout is always stored as milliseconds as a time where the * timeout will expire, however the parsing is performed according to @@ -193,3 +195,209 @@ void disconnectAllBlockedClients(void) { } } } + +/* This function should be called by Redis every time a single command, + * a MULTI/EXEC block, or a Lua script, terminated its execution after + * being called by a client. + * + * All the keys with at least one client blocked that received at least + * one new element via some PUSH/XADD operation are accumulated into + * the server.ready_keys list. This function will run the list and will + * serve clients accordingly. Note that the function will iterate again and + * again as a result of serving BRPOPLPUSH we can have new blocking clients + * to serve because of the PUSH side of BRPOPLPUSH. */ +void handleClientsBlockedOnKeys(void) { + while(listLength(server.ready_keys) != 0) { + list *l; + + /* Point server.ready_keys to a fresh list and save the current one + * locally. This way as we run the old list we are free to call + * signalKeyAsReady() that may push new elements in server.ready_keys + * when handling clients blocked into BRPOPLPUSH. */ + l = server.ready_keys; + server.ready_keys = listCreate(); + + while(listLength(l) != 0) { + listNode *ln = listFirst(l); + readyList *rl = ln->value; + + /* First of all remove this key from db->ready_keys so that + * we can safely call signalKeyAsReady() against this key. */ + dictDelete(rl->db->ready_keys,rl->key); + + /* If the key exists and it's a list, serve blocked clients + * with data. */ + robj *o = lookupKeyWrite(rl->db,rl->key); + if (o != NULL && o->type == OBJ_LIST) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + robj *dstkey = receiver->bpop.target; + int where = (receiver->lastcmd && + receiver->lastcmd->proc == blpopCommand) ? + LIST_HEAD : LIST_TAIL; + robj *value = listTypePop(o,where); + + if (value) { + /* Protect receiver->bpop.target, that will be + * freed by the next unblockClient() + * call. */ + if (dstkey) incrRefCount(dstkey); + unblockClient(receiver); + + if (serveClientBlockedOnList(receiver, + rl->key,dstkey,rl->db,value, + where) == C_ERR) + { + /* If we failed serving the client we need + * to also undo the POP operation. */ + listTypePush(o,value,where); + } + + if (dstkey) decrRefCount(dstkey); + decrRefCount(value); + } else { + break; + } + } + } + + if (listTypeLength(o) == 0) { + dbDelete(rl->db,rl->key); + } + /* We don't call signalModifiedKey() as it was already called + * when an element was pushed on the list. */ + } + + /* Free this item. */ + decrRefCount(rl->key); + zfree(rl); + listDelNode(l,ln); + } + listRelease(l); /* We have the new list on place at this point. */ + } +} + +/* This is how the current blocking POP works, we use BLPOP as example: + * - If the user calls BLPOP and the key exists and contains a non empty list + * then LPOP is called instead. So BLPOP is semantically the same as LPOP + * if blocking is not required. + * - If instead BLPOP is called and the key does not exists or the list is + * empty we need to block. In order to do so we remove the notification for + * new data to read in the client socket (so that we'll not serve new + * requests if the blocking request is not served). Also we put the client + * in a dictionary (db->blocking_keys) mapping keys to a list of clients + * blocking for this keys. + * - If a PUSH operation against a key with blocked clients waiting is + * performed, we mark this key as "ready", and after the current command, + * MULTI/EXEC block, or script, is executed, we serve all the clients waiting + * for this list, from the one that blocked first, to the last, accordingly + * to the number of elements we have in the ready list. + */ + +/* Set a client in blocking mode for the specified key, with the specified + * timeout */ +void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { + dictEntry *de; + list *l; + int j; + + c->bpop.timeout = timeout; + c->bpop.target = target; + + if (target != NULL) incrRefCount(target); + + for (j = 0; j < numkeys; j++) { + /* If the key already exists in the dict ignore it. */ + if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; + incrRefCount(keys[j]); + + /* And in the other "side", to map keys -> clients */ + de = dictFind(c->db->blocking_keys,keys[j]); + if (de == NULL) { + int retval; + + /* For every key we take a list of clients blocked for it */ + l = listCreate(); + retval = dictAdd(c->db->blocking_keys,keys[j],l); + incrRefCount(keys[j]); + serverAssertWithInfo(c,keys[j],retval == DICT_OK); + } else { + l = dictGetVal(de); + } + listAddNodeTail(l,c); + } + blockClient(c,BLOCKED_LIST); +} + +/* Unblock a client that's waiting in a blocking operation such as BLPOP. + * You should never call this function directly, but unblockClient() instead. */ +void unblockClientWaitingData(client *c) { + dictEntry *de; + dictIterator *di; + list *l; + + serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); + di = dictGetIterator(c->bpop.keys); + /* The client may wait for multiple keys, so unblock it for every key. */ + while((de = dictNext(di)) != NULL) { + robj *key = dictGetKey(de); + + /* Remove this client from the list of clients waiting for this key. */ + l = dictFetchValue(c->db->blocking_keys,key); + serverAssertWithInfo(c,key,l != NULL); + listDelNode(l,listSearchKey(l,c)); + /* If the list is empty we need to remove it to avoid wasting memory */ + if (listLength(l) == 0) + dictDelete(c->db->blocking_keys,key); + } + dictReleaseIterator(di); + + /* Cleanup the client structure */ + dictEmpty(c->bpop.keys,NULL); + if (c->bpop.target) { + decrRefCount(c->bpop.target); + c->bpop.target = NULL; + } +} + +/* If the specified key has clients blocked waiting for list pushes, this + * function will put the key reference into the server.ready_keys list. + * Note that db->ready_keys is a hash table that allows us to avoid putting + * the same key again and again in the list in case of multiple pushes + * made by a script or in the context of MULTI/EXEC. + * + * The list will be finally processed by handleClientsBlockedOnLists() */ +void signalKeyAsReady(redisDb *db, robj *key) { + readyList *rl; + + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys,key) == NULL) return; + + /* Key was already signaled? No need to queue it again. */ + if (dictFind(db->ready_keys,key) != NULL) return; + + /* Ok, we need to queue this key into server.ready_keys. */ + rl = zmalloc(sizeof(*rl)); + rl->key = key; + rl->db = db; + incrRefCount(key); + listAddNodeTail(server.ready_keys,rl); + + /* We also add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); +} + + diff --git a/src/db.c b/src/db.c index 4d6999be..6682e573 100644 --- a/src/db.c +++ b/src/db.c @@ -169,9 +169,10 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST) signalListAsReady(db, key); + if (val->type == OBJ_LIST || val->type == OBJ_STREAM) + signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); - } +} /* Overwrite an existing key with a new value. Incrementing the reference * count of the new value is up to the caller. @@ -951,8 +952,8 @@ void scanDatabaseForReadyLists(redisDb *db) { while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); - if (value && value->type == OBJ_LIST) - signalListAsReady(db, key); + if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM)) + signalKeyAsReady(db, key); } dictReleaseIterator(di); } diff --git a/src/networking.c b/src/networking.c index aeaeca96..d672ec32 100644 --- a/src/networking.c +++ b/src/networking.c @@ -124,7 +124,7 @@ client *createClient(int fd) { listSetDupMethod(c->reply,dupClientReplyValue); c->btype = BLOCKED_NONE; c->bpop.timeout = 0; - c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL); + c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.target = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; diff --git a/src/server.c b/src/server.c index f3338f56..56b2188e 100644 --- a/src/server.c +++ b/src/server.c @@ -550,10 +550,21 @@ dictType objectKeyPointerValueDictType = { NULL, /* key dup */ NULL, /* val dup */ dictEncObjKeyCompare, /* key compare */ - dictObjectDestructor, /* key destructor */ + dictObjectDestructor, /* key destructor */ NULL /* val destructor */ }; +/* Like objectKeyPointerValueDictType(), but values can be destroyed, if + * not NULL, calling zfree(). */ +dictType objectKeyHeapPointerValueDictType = { + dictEncObjHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictEncObjKeyCompare, /* key compare */ + dictObjectDestructor, /* key destructor */ + dictVanillaFree /* val destructor */ +}; + /* Set dictionary type. Keys are SDS strings, values are ot used. */ dictType setDictType = { dictSdsHash, /* hash function */ @@ -2508,7 +2519,7 @@ int processCommand(client *c) { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) - handleClientsBlockedOnLists(); + handleClientsBlockedOnKeys(); } return C_OK; } diff --git a/src/server.h b/src/server.h index 8ea18341..8e50d030 100644 --- a/src/server.h +++ b/src/server.h @@ -256,6 +256,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_LIST 1 /* BLPOP & co. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ +#define BLOCKED_STREAM 4 /* XREAD. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -641,9 +642,9 @@ typedef struct blockingState { mstime_t timeout; /* Blocking operation timeout. If UNIX current time * is > timeout then the operation timed out. */ - /* BLOCKED_LIST */ + /* BLOCKED_LIST and BLOCKED_STREAM */ dict *keys; /* The keys we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ + * operation such as BLPOP or XREAD. Or NULL. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ @@ -1291,6 +1292,7 @@ typedef struct { extern struct redisServer server; extern struct sharedObjectsStruct shared; extern dictType objectKeyPointerValueDictType; +extern dictType objectKeyHeapPointerValueDictType; extern dictType setDictType; extern dictType zsetDictType; extern dictType clusterNodesDictType; @@ -1413,9 +1415,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); void listTypeConvert(robj *subject, int enc); void unblockClientWaitingData(client *c); -void handleClientsBlockedOnLists(void); void popGenericCommand(client *c, int where); -void signalListAsReady(redisDb *db, robj *key); /* Stream data type. */ stream *streamNew(void); @@ -1798,6 +1798,9 @@ void unblockClient(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); +void handleClientsBlockedOnKeys(void); +void signalKeyAsReady(redisDb *db, robj *key); +void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); diff --git a/src/t_list.c b/src/t_list.c index a0a30998..c7eacb0e 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -603,119 +603,6 @@ void rpoplpushCommand(client *c) { * Blocking POP operations *----------------------------------------------------------------------------*/ -/* This is how the current blocking POP works, we use BLPOP as example: - * - If the user calls BLPOP and the key exists and contains a non empty list - * then LPOP is called instead. So BLPOP is semantically the same as LPOP - * if blocking is not required. - * - If instead BLPOP is called and the key does not exists or the list is - * empty we need to block. In order to do so we remove the notification for - * new data to read in the client socket (so that we'll not serve new - * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blocking_keys) mapping keys to a list of clients - * blocking for this keys. - * - If a PUSH operation against a key with blocked clients waiting is - * performed, we mark this key as "ready", and after the current command, - * MULTI/EXEC block, or script, is executed, we serve all the clients waiting - * for this list, from the one that blocked first, to the last, accordingly - * to the number of elements we have in the ready list. - */ - -/* Set a client in blocking mode for the specified key, with the specified - * timeout */ -void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { - dictEntry *de; - list *l; - int j; - - c->bpop.timeout = timeout; - c->bpop.target = target; - - if (target != NULL) incrRefCount(target); - - for (j = 0; j < numkeys; j++) { - /* If the key already exists in the dict ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; - incrRefCount(keys[j]); - - /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blocking_keys,keys[j]); - if (de == NULL) { - int retval; - - /* For every key we take a list of clients blocked for it */ - l = listCreate(); - retval = dictAdd(c->db->blocking_keys,keys[j],l); - incrRefCount(keys[j]); - serverAssertWithInfo(c,keys[j],retval == DICT_OK); - } else { - l = dictGetVal(de); - } - listAddNodeTail(l,c); - } - blockClient(c,BLOCKED_LIST); -} - -/* Unblock a client that's waiting in a blocking operation such as BLPOP. - * You should never call this function directly, but unblockClient() instead. */ -void unblockClientWaitingData(client *c) { - dictEntry *de; - dictIterator *di; - list *l; - - serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); - di = dictGetIterator(c->bpop.keys); - /* The client may wait for multiple keys, so unblock it for every key. */ - while((de = dictNext(di)) != NULL) { - robj *key = dictGetKey(de); - - /* Remove this client from the list of clients waiting for this key. */ - l = dictFetchValue(c->db->blocking_keys,key); - serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,listSearchKey(l,c)); - /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,key); - } - dictReleaseIterator(di); - - /* Cleanup the client structure */ - dictEmpty(c->bpop.keys,NULL); - if (c->bpop.target) { - decrRefCount(c->bpop.target); - c->bpop.target = NULL; - } -} - -/* If the specified key has clients blocked waiting for list pushes, this - * function will put the key reference into the server.ready_keys list. - * Note that db->ready_keys is a hash table that allows us to avoid putting - * the same key again and again in the list in case of multiple pushes - * made by a script or in the context of MULTI/EXEC. - * - * The list will be finally processed by handleClientsBlockedOnLists() */ -void signalListAsReady(redisDb *db, robj *key) { - readyList *rl; - - /* No clients blocking for this key? No need to queue it. */ - if (dictFind(db->blocking_keys,key) == NULL) return; - - /* Key was already signaled? No need to queue it again. */ - if (dictFind(db->ready_keys,key) != NULL) return; - - /* Ok, we need to queue this key into server.ready_keys. */ - rl = zmalloc(sizeof(*rl)); - rl->key = key; - rl->db = db; - incrRefCount(key); - listAddNodeTail(server.ready_keys,rl); - - /* We also add the key in the db->ready_keys dictionary in order - * to avoid adding it multiple times into a list with a simple O(1) - * check. */ - incrRefCount(key); - serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); -} - /* This is a helper function for handleClientsBlockedOnLists(). It's work * is to serve a specific client (receiver) that is blocked on 'key' * in the context of the specified 'db', doing the following: @@ -785,97 +672,6 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb return C_OK; } -/* This function should be called by Redis every time a single command, - * a MULTI/EXEC block, or a Lua script, terminated its execution after - * being called by a client. - * - * All the keys with at least one client blocked that received at least - * one new element via some PUSH operation are accumulated into - * the server.ready_keys list. This function will run the list and will - * serve clients accordingly. Note that the function will iterate again and - * again as a result of serving BRPOPLPUSH we can have new blocking clients - * to serve because of the PUSH side of BRPOPLPUSH. */ -void handleClientsBlockedOnLists(void) { - while(listLength(server.ready_keys) != 0) { - list *l; - - /* Point server.ready_keys to a fresh list and save the current one - * locally. This way as we run the old list we are free to call - * signalListAsReady() that may push new elements in server.ready_keys - * when handling clients blocked into BRPOPLPUSH. */ - l = server.ready_keys; - server.ready_keys = listCreate(); - - while(listLength(l) != 0) { - listNode *ln = listFirst(l); - readyList *rl = ln->value; - - /* First of all remove this key from db->ready_keys so that - * we can safely call signalListAsReady() against this key. */ - dictDelete(rl->db->ready_keys,rl->key); - - /* If the key exists and it's a list, serve blocked clients - * with data. */ - robj *o = lookupKeyWrite(rl->db,rl->key); - if (o != NULL && o->type == OBJ_LIST) { - dictEntry *de; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - int numclients = listLength(clients); - - while(numclients--) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; - robj *dstkey = receiver->bpop.target; - int where = (receiver->lastcmd && - receiver->lastcmd->proc == blpopCommand) ? - LIST_HEAD : LIST_TAIL; - robj *value = listTypePop(o,where); - - if (value) { - /* Protect receiver->bpop.target, that will be - * freed by the next unblockClient() - * call. */ - if (dstkey) incrRefCount(dstkey); - unblockClient(receiver); - - if (serveClientBlockedOnList(receiver, - rl->key,dstkey,rl->db,value, - where) == C_ERR) - { - /* If we failed serving the client we need - * to also undo the POP operation. */ - listTypePush(o,value,where); - } - - if (dstkey) decrRefCount(dstkey); - decrRefCount(value); - } else { - break; - } - } - } - - if (listTypeLength(o) == 0) { - dbDelete(rl->db,rl->key); - } - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - } - - /* Free this item. */ - decrRefCount(rl->key); - zfree(rl); - listDelNode(l,ln); - } - listRelease(l); /* We have the new list on place at this point. */ - } -} - /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(client *c, int where) { robj *o;