From 9b7f8b1c9b379ab842d40df4636dfbbeb6376fcb Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Mar 2015 11:07:10 +0100 Subject: [PATCH] Cluster: redirection refactoring + handling of blocked clients. There was a bug in Redis Cluster caused by clients blocked in a blocking list pop operation, for keys no longer handled by the instance, or in a condition where the cluster became down after the client blocked. A typical situation is: 1) BLPOP 0 2) hash slot is resharded to another master. The client will block forever int this case. A symmentrical non-cluster-specific bug happens when an instance is turned from master to slave. In that case it is more serious since this will desynchronize data between slaves and masters. This other bug was discovered as a side effect of thinking about the bug explained and fixed in this commit, but will be fixed in a separated commit. --- src/blocked.c | 2 ++ src/cluster.c | 90 ++++++++++++++++++++++++++++++++++++++++++++++++--- src/cluster.h | 9 ++++-- src/redis.c | 30 ++++++----------- 4 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 3509dd13..ae2500aa 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -59,6 +59,8 @@ * When implementing a new type of blocking opeation, the implementation * should modify unblockClient() and replyToBlockedClientTimedOut() in order * to handle the btype-specific behavior of this two functions. + * If the blocking operation waits for certain keys to change state, the + * clusterRedirectBlockedClientIfNeeded() function should also be updated. */ #include "redis.h" diff --git a/src/cluster.c b/src/cluster.c index e1ae92f0..916a4be6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4768,10 +4768,10 @@ void readwriteCommand(redisClient *c) { * belonging to the same slot, but the slot is not stable (in migration or * importing state, likely because a resharding is in progress). * - * REDIS_CLUSTER_REDIR_DOWN if the request addresses a slot which is not - * bound to any node. In this case the cluster global state should be already - * "down" but it is fragile to rely on the update of the global state, so - * we also handle it here. */ + * REDIS_CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is + * not bound to any node. In this case the cluster global state should be + * already "down" but it is fragile to rely on the update of the global state, + * so we also handle it here. */ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { clusterNode *n = NULL; robj *firstkey = NULL; @@ -4833,7 +4833,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (n == NULL) { getKeysFreeResult(keyindex); if (error_code) - *error_code = REDIS_CLUSTER_REDIR_DOWN; + *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND; return NULL; } @@ -4925,3 +4925,83 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED; return n; } + +/* Send the client the right redirection code, according to error_code + * that should be set to one of REDIS_CLUSTER_REDIR_* macros. + * + * If REDIS_CLUSTER_REDIR_ASK or REDIS_CLUSTER_REDIR_MOVED error codes + * are used, then the node 'n' should not be NULL, but should be the + * node we want to mention in the redirection. Moreover hashslot should + * be set to the hash slot that caused the redirection. */ +void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code) { + if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { + addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { + /* The request spawns mutliple keys in the same slot, + * but the slot is not "stable" currently as there is + * a migration or import in progress. */ + addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_STATE) { + addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_UNBOUND) { + addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_MOVED || + error_code == REDIS_CLUSTER_REDIR_ASK) + { + addReplySds(c,sdscatprintf(sdsempty(), + "-%s %d %s:%d\r\n", + (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", + hashslot,n->ip,n->port)); + } else { + redisPanic("getNodeByQuery() unknown error."); + } +} + +/* This function is called by the function processing clients incrementally + * to detect timeouts, in order to handle the following case: + * + * 1) A client blocks with BLPOP or similar blocking operation. + * 2) The master migrates the hash slot elsewhere or turns into a slave. + * 3) The client may remain blocked forever (or up to the max timeout time) + * waiting for a key change that will never happen. + * + * If the client is found to be blocked into an hash slot this node no + * longer handles, the client is sent a redirection error, and the function + * returns 1. Otherwise 0 is returned and no operation is performed. */ +int clusterRedirectBlockedClientIfNeeded(redisClient *c) { + if (c->flags & REDIS_BLOCKED && c->btype == REDIS_BLOCKED_LIST) { + dictEntry *de; + dictIterator *di; + + /* If the cluster is down, unblock the client with the right error. */ + if (server.cluster->state == REDIS_CLUSTER_FAIL) { + clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE); + return 1; + } + + di = dictGetIterator(c->bpop.keys); + while((de = dictNext(di)) != NULL) { + robj *key = dictGetKey(de); + int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); + clusterNode *node = server.cluster->slots[slot]; + + /* We send an error and unblock the client if: + * 1) The slot is unassigned, emitting a cluster down error. + * 2) The slot is not handled by this node, nor being imported. */ + if (node != myself && + server.cluster->importing_slots_from[slot] == NULL) + { + if (node == NULL) { + clusterRedirectClient(c,NULL,0, + REDIS_CLUSTER_REDIR_DOWN_UNBOUND); + } else { + clusterRedirectClient(c,node,slot, + REDIS_CLUSTER_REDIR_MOVED); + } + return 1; + } + } + dictReleaseIterator(di); + } + return 0; +} diff --git a/src/cluster.h b/src/cluster.h index 8eaa0ab9..bf442a22 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -26,11 +26,12 @@ /* Redirection errors returned by getNodeByQuery(). */ #define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */ -#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */ -#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */ +#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */ +#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */ #define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */ #define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */ -#define REDIS_CLUSTER_REDIR_DOWN 5 /* -CLUSTERDOWN error. */ +#define REDIS_CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */ +#define REDIS_CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */ struct clusterNode; @@ -249,5 +250,7 @@ typedef struct { /* ---------------------- API exported outside cluster.c -------------------- */ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); +int clusterRedirectBlockedClientIfNeeded(redisClient *c); +void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code); #endif /* __REDIS_CLUSTER_H */ diff --git a/src/redis.c b/src/redis.c index 81fa7be2..787663e4 100644 --- a/src/redis.c +++ b/src/redis.c @@ -926,8 +926,14 @@ int clientsCronHandleTimeout(redisClient *c) { mstime_t now_ms = mstime(); if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { + /* Handle blocking operation specific timeout. */ replyToBlockedClientTimedOut(c); unblockClient(c); + } else if (server.cluster_enabled) { + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (clusterRedirectBlockedClientIfNeeded(c)) + unblockClient(c); } } return 0; @@ -2207,32 +2213,14 @@ int processCommand(redisClient *c) { if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); + clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE); return REDIS_OK; } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); - if (n == NULL) { + if (n == NULL || n != server.cluster->myself) { flagTransaction(c); - if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { - addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); - } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { - /* The request spawns mutliple keys in the same slot, - * but the slot is not "stable" currently as there is - * a migration or import in progress. */ - addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); - } else if (error_code == REDIS_CLUSTER_REDIR_DOWN) { - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Hash slot is unbound\r\n")); - } else { - redisPanic("getNodeByQuery() unknown error."); - } - return REDIS_OK; - } else if (n != server.cluster->myself) { - flagTransaction(c); - addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", - (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", - hashslot,n->ip,n->port)); + clusterRedirectClient(c,n,hashslot,error_code); return REDIS_OK; } }