From 36676c23186617ff096d534c6faa353829c4e437 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Mar 2014 13:19:09 +0100 Subject: [PATCH] Redis Cluster: support for multi-key operations. --- src/cluster.c | 131 ++++++++++++++++++++++++++++++++++++-------------- src/cluster.h | 7 +++ src/redis.c | 18 +++++-- 3 files changed, 117 insertions(+), 39 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index f4ddeef4..0edfe0e2 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -3804,21 +3804,39 @@ void readwriteCommand(redisClient *c) { } /* Return the pointer to the cluster node that is able to serve the command. - * For the function to succeed the command should only target a single - * key (or the same key multiple times). + * For the function to succeed the command should only target either: * - * If the returned node should be used only for this request, the *ask - * integer is set to '1', otherwise to '0'. This is used in order to - * let the caller know if we should reply with -MOVED or with -ASK. + * 1) A single key (even multiple times like LPOPRPUSH mylist mylist). + * 2) Multiple keys in the same hash slot, while the slot is stable (no + * resharding in progress). * - * If the command contains multiple keys, and as a consequence it is not - * possible to handle the request in Redis Cluster, NULL is returned. */ -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) { + * On success the function returns the node that is able to serve the request. + * If the node is not 'myself' a redirection must be perfomed. The kind of + * redirection is specified setting the integer passed by reference + * 'error_code', which will be set to REDIS_CLUSTER_REDIR_ASK or + * REDIS_CLUSTER_REDIR_MOVED. + * + * When the node is 'myself' 'error_code' is set to REDIS_CLUSTER_REDIR_NONE. + * + * If the command fails NULL is returned, and the reason of the failure is + * provided via 'error_code', which will be set to: + * + * REDIS_CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that + * don't belong to the same hash slot. + * + * REDIS_CLUSTER_REDIR_UNSTABLE if the request contains mutliple keys + * belonging to the same slot, but the slot is not stable (in migration or + * importing state, likely because a resharding is in progress). */ +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { clusterNode *n = NULL; robj *firstkey = NULL; + int multiple_keys = 0; multiState *ms, _ms; multiCmd mc; - int i, slot = 0; + int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; + + /* Set error code optimistically for the base case. */ + if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE; /* We handle all the cases as if they were EXEC commands, so we have * a common code path for everything */ @@ -3839,8 +3857,8 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg mc.cmd = cmd; } - /* Check that all the keys are the same key, and get the slot and - * node for this key. */ + /* Check that all the keys are in the same hash slot, and obtain this + * slot and the node associated. */ for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; @@ -3853,48 +3871,88 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys, REDIS_GETKEYS_ALL); for (j = 0; j < numkeys; j++) { + robj *thiskey = margv[keyindex[j]]; + int thisslot = keyHashSlot((char*)thiskey->ptr, + sdslen(thiskey->ptr)); + if (firstkey == NULL) { /* This is the first key we see. Check what is the slot * and node. */ - firstkey = margv[keyindex[j]]; - - slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); + firstkey = thiskey; + slot = thisslot; n = server.cluster->slots[slot]; redisAssertWithInfo(c,firstkey,n != NULL); + /* If we are migrating or importing this slot, we need to check + * if we have all the keys in the request (the only way we + * can safely serve the request, otherwise we return a TRYAGAIN + * error). To do so we set the importing/migrating state and + * increment a counter for every missing key. */ + if (n == myself && + server.cluster->migrating_slots_to[slot] != NULL) + { + migrating_slot = 1; + } else if (server.cluster->importing_slots_from[slot] != NULL) { + importing_slot = 1; + } } else { /* If it is not the first key, make sure it is exactly * the same key as the first we saw. */ - if (!equalStringObjects(firstkey,margv[keyindex[j]])) { - getKeysFreeResult(keyindex); - return NULL; + if (!equalStringObjects(firstkey,thiskey)) { + if (slot != thisslot) { + /* Error: multiple keys from different slots. */ + getKeysFreeResult(keyindex); + if (error_code) + *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } else { + /* Flag this request as one with multiple different + * keys. */ + multiple_keys = 1; + } } } + + /* Migarting / Improrting slot? Count keys we don't have. */ + if ((migrating_slot || importing_slot) && + lookupKeyRead(&server.db[0],thiskey) == NULL) + { + missing_keys++; + } } getKeysFreeResult(keyindex); } - if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */ + /* No key at all in command? then we can serve the request - * without redirections. */ + * without redirections or errors. */ if (n == NULL) return myself; + + /* Return the hashslot by reference. */ if (hashslot) *hashslot = slot; + /* This request is about a slot we are migrating into another instance? - * Then we need to check if we have the key. If we have it we can reply. - * If instead is a new key, we pass the request to the node that is - * receiving the slot. */ - if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) { - if (lookupKeyRead(&server.db[0],firstkey) == NULL) { - if (ask) *ask = 1; - return server.cluster->migrating_slots_to[slot]; + * Then if we have all the keys. */ + + /* If we don't have all the keys and we are migrating the slot, send + * an ASK redirection. */ + if (migrating_slot && missing_keys) { + if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK; + return server.cluster->migrating_slots_to[slot]; + } + + /* If we are receiving the slot, we have all the keys, and the client + * correctly flagged the request as "ASKING", we can serve + * the request, otherwise the only option is to send a TRYAGAIN error. */ + if (importing_slot && + (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) + { + if (missing_keys) { + if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE; + return NULL; + } else { + return myself; } } - /* Handle the case in which we are receiving this hash slot from - * another instance, so we'll accept the query even if in the table - * it is assigned to a different node, but only if the client - * issued an ASKING command before. */ - if (server.cluster->importing_slots_from[slot] != NULL && - (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) { - return myself; - } + /* Handle the read-only client case reading from a slave: if this * node is a slave and the request is about an hash slot our master * is serving, we can reply without redirection. */ @@ -3905,6 +3963,9 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg { return myself; } - /* It's not a -ASK case. Base case: just return the right node. */ + + /* Base case: just return the right node. However if this node is not + * myself, set error_code to MOVED since we need to issue a rediretion. */ + if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED; return n; } diff --git a/src/cluster.h b/src/cluster.h index 8c440b9f..9581b575 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -24,6 +24,13 @@ #define REDIS_CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */ #define REDIS_CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */ +/* Redirection errors returned by getNodeByQuery(). */ +#define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */ +#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */ +#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */ +#define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */ +#define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */ + struct clusterNode; /* clusterLink encapsulates everything needed to talk with a remote node. */ diff --git a/src/redis.c b/src/redis.c index 1219fdbd..72a907af 100644 --- a/src/redis.c +++ b/src/redis.c @@ -2021,15 +2021,25 @@ int processCommand(redisClient *c) { addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK; } else { - int ask; - clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask); + int error_code; + clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL) { - addReplyError(c,"Multi keys request invalid in cluster"); + if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { + addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); + } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { + /* The request spawns mutliple keys in the same slot, + * but the slot is not "stable" currently as there is + * a migration or import in progress. */ + addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); + } else { + redisPanic("getNodeByQuery() unknown error."); + } return REDIS_OK; } else if (n != server.cluster->myself) { flagTransaction(c); addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED", + "-%s %d %s:%d\r\n", + (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot,n->ip,n->port)); return REDIS_OK; }