diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e558a30..9d8c0bca 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,6 +2746,41 @@ cleanup: return success; } +static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER DELSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER ADDSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, + int slot) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER COUNTKEYSINSLOT %d", slot); + int count = -1; + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; + if (reply) freeReplyObject(reply); + return count; +} + +static int clusterManagerBumpEpoch(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -4053,17 +4088,62 @@ cleanup: return success; } +static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { + clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); + int success = 0; + assert(listLength(owners) > 1); + clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, + slot, + NULL); + if (!owner) owner = listFirst(owners)->value; + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", + slot, owner->ip, owner->port); + /* Set the owner node by calling DELSLOTS in order to unassign the slot + * in case it's already assigned to another node and by finally calling + * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it + * could reply with an "already unassigned" error and if it should fail + * for other reasons, it would lead to a failure in the follwing ADDSLOTS + * command. */ + clusterManagerDelSlot(owner, slot); + if (!clusterManagerAddSlot(owner, slot)) return 0; + if (!clusterManagerBumpEpoch(owner)) return 0; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Update configuration in all the other master nodes by assigning the slot + * itself to the new owner, and by eventually migrating keys if the node + * has keys for the slot. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + int count = clusterManagerCountKeysInSlot(n, slot); + success = (count >= 0); + if (!success) break; + clusterManagerDelSlot(n, slot); + if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; + if (count > 0) { + int opts = CLUSTER_MANAGER_OPT_VERBOSE | + CLUSTER_MANAGER_OPT_COLD; + success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); + if (!success) break; + } + } + return success; +} + static int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; - int result = 1; - int do_fix = config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX; clusterManagerNode *node = ln->value; clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", node->ip, node->port); + int result = 1, consistent = 0; + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; if (!quiet) clusterManagerShowNodes(); - if (!clusterManagerIsConfigConsistent()) { + consistent = clusterManagerIsConfigConsistent(); + if (!consistent) { sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); clusterManagerOnError(err); result = 0; @@ -4071,7 +4151,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogOk("[OK] All nodes agree about slots " "configuration.\n"); } - // Check open slots + /* Check open slots */ clusterManagerLogInfo(">>> Check for open slots...\n"); listIter li; listRewind(cluster_manager.nodes, &li); @@ -4130,7 +4210,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogErr("%s.\n", (char *) errstr); sdsfree(errstr); if (do_fix) { - // Fix open slots. + /* Fix open slots. */ dictReleaseIterator(iter); iter = dictGetIterator(open_slots); while ((entry = dictNext(iter)) != NULL) { @@ -4165,6 +4245,49 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } + if (!consistent) { + /* Check whether there are multiple owners, even when slots are + * fully covered and there are no open slots. */ + clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); + int slot = 0; + for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + list *owners = listCreate(); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + /* Nodes having keys for the slot will be considered + * owners too. */ + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) listAddNodeTail(owners, n); + } + } + if (listLength(owners) > 1) { + result = 0; + clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", + slot, listLength(owners)); + listRewind(owners, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + clusterManagerLogErr(" %s:%d\n", n->ip, n->port); + } + if (do_fix) { + result = clusterManagerFixMultipleSlotOwners(slot, owners); + if (!result) { + clusterManagerLogErr("Failed to fix multiple owners " + "for slot %d\n", slot); + listRelease(owners); + break; + } + } + } + listRelease(owners); + } + } return result; }