diff --git a/src/blocked.c b/src/blocked.c index 3509dd13..ae2500aa 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -59,6 +59,8 @@ * When implementing a new type of blocking opeation, the implementation * should modify unblockClient() and replyToBlockedClientTimedOut() in order * to handle the btype-specific behavior of this two functions. + * If the blocking operation waits for certain keys to change state, the + * clusterRedirectBlockedClientIfNeeded() function should also be updated. */ #include "redis.h" diff --git a/src/cluster.c b/src/cluster.c index e1ae92f0..916a4be6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4768,10 +4768,10 @@ void readwriteCommand(redisClient *c) { * belonging to the same slot, but the slot is not stable (in migration or * importing state, likely because a resharding is in progress). * - * REDIS_CLUSTER_REDIR_DOWN if the request addresses a slot which is not - * bound to any node. In this case the cluster global state should be already - * "down" but it is fragile to rely on the update of the global state, so - * we also handle it here. */ + * REDIS_CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is + * not bound to any node. In this case the cluster global state should be + * already "down" but it is fragile to rely on the update of the global state, + * so we also handle it here. */ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { clusterNode *n = NULL; robj *firstkey = NULL; @@ -4833,7 +4833,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (n == NULL) { getKeysFreeResult(keyindex); if (error_code) - *error_code = REDIS_CLUSTER_REDIR_DOWN; + *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND; return NULL; } @@ -4925,3 +4925,83 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED; return n; } + +/* Send the client the right redirection code, according to error_code + * that should be set to one of REDIS_CLUSTER_REDIR_* macros. + * + * If REDIS_CLUSTER_REDIR_ASK or REDIS_CLUSTER_REDIR_MOVED error codes + * are used, then the node 'n' should not be NULL, but should be the + * node we want to mention in the redirection. Moreover hashslot should + * be set to the hash slot that caused the redirection. */ +void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code) { + if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { + addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { + /* The request spawns mutliple keys in the same slot, + * but the slot is not "stable" currently as there is + * a migration or import in progress. */ + addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_STATE) { + addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_UNBOUND) { + addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_MOVED || + error_code == REDIS_CLUSTER_REDIR_ASK) + { + addReplySds(c,sdscatprintf(sdsempty(), + "-%s %d %s:%d\r\n", + (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", + hashslot,n->ip,n->port)); + } else { + redisPanic("getNodeByQuery() unknown error."); + } +} + +/* This function is called by the function processing clients incrementally + * to detect timeouts, in order to handle the following case: + * + * 1) A client blocks with BLPOP or similar blocking operation. + * 2) The master migrates the hash slot elsewhere or turns into a slave. + * 3) The client may remain blocked forever (or up to the max timeout time) + * waiting for a key change that will never happen. + * + * If the client is found to be blocked into an hash slot this node no + * longer handles, the client is sent a redirection error, and the function + * returns 1. Otherwise 0 is returned and no operation is performed. */ +int clusterRedirectBlockedClientIfNeeded(redisClient *c) { + if (c->flags & REDIS_BLOCKED && c->btype == REDIS_BLOCKED_LIST) { + dictEntry *de; + dictIterator *di; + + /* If the cluster is down, unblock the client with the right error. */ + if (server.cluster->state == REDIS_CLUSTER_FAIL) { + clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE); + return 1; + } + + di = dictGetIterator(c->bpop.keys); + while((de = dictNext(di)) != NULL) { + robj *key = dictGetKey(de); + int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); + clusterNode *node = server.cluster->slots[slot]; + + /* We send an error and unblock the client if: + * 1) The slot is unassigned, emitting a cluster down error. + * 2) The slot is not handled by this node, nor being imported. */ + if (node != myself && + server.cluster->importing_slots_from[slot] == NULL) + { + if (node == NULL) { + clusterRedirectClient(c,NULL,0, + REDIS_CLUSTER_REDIR_DOWN_UNBOUND); + } else { + clusterRedirectClient(c,node,slot, + REDIS_CLUSTER_REDIR_MOVED); + } + return 1; + } + } + dictReleaseIterator(di); + } + return 0; +} diff --git a/src/cluster.h b/src/cluster.h index 8eaa0ab9..bf442a22 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -26,11 +26,12 @@ /* Redirection errors returned by getNodeByQuery(). */ #define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */ -#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */ -#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */ +#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */ +#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */ #define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */ #define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */ -#define REDIS_CLUSTER_REDIR_DOWN 5 /* -CLUSTERDOWN error. */ +#define REDIS_CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */ +#define REDIS_CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */ struct clusterNode; @@ -249,5 +250,7 @@ typedef struct { /* ---------------------- API exported outside cluster.c -------------------- */ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); +int clusterRedirectBlockedClientIfNeeded(redisClient *c); +void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code); #endif /* __REDIS_CLUSTER_H */ diff --git a/src/redis.c b/src/redis.c index 81fa7be2..787663e4 100644 --- a/src/redis.c +++ b/src/redis.c @@ -926,8 +926,14 @@ int clientsCronHandleTimeout(redisClient *c) { mstime_t now_ms = mstime(); if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { + /* Handle blocking operation specific timeout. */ replyToBlockedClientTimedOut(c); unblockClient(c); + } else if (server.cluster_enabled) { + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (clusterRedirectBlockedClientIfNeeded(c)) + unblockClient(c); } } return 0; @@ -2207,32 +2213,14 @@ int processCommand(redisClient *c) { if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); + clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE); return REDIS_OK; } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); - if (n == NULL) { + if (n == NULL || n != server.cluster->myself) { flagTransaction(c); - if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { - addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); - } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { - /* The request spawns mutliple keys in the same slot, - * but the slot is not "stable" currently as there is - * a migration or import in progress. */ - addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); - } else if (error_code == REDIS_CLUSTER_REDIR_DOWN) { - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Hash slot is unbound\r\n")); - } else { - redisPanic("getNodeByQuery() unknown error."); - } - return REDIS_OK; - } else if (n != server.cluster->myself) { - flagTransaction(c); - addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", - (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", - hashslot,n->ip,n->port)); + clusterRedirectClient(c,n,hashslot,error_code); return REDIS_OK; } }