diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e558a30..a3fb065d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -117,6 +117,7 @@ #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6 #define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8 +#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 @@ -1378,6 +1379,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; + } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1846,7 +1850,7 @@ static int evalMode(int argc, char **argv) { if (eval_ldb) { if (!config.eval_ldb) { /* If the debugging session ended immediately, there was an - * error compiling the script. Show it and don't enter + * error compiling the script. Show it and they don't enter * the REPL at all. */ printf("Eval debugging session can't start:\n"); cliReadReply(0); @@ -1929,6 +1933,7 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); /* Cluster Manager helper functions */ @@ -1990,14 +1995,17 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", "replicas "}, - {"check", clusterManagerCommandCheck, -1, "host:port", NULL}, + {"check", clusterManagerCommandCheck, -1, "host:port", + "search-multiple-owners"}, {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, - {"fix", clusterManagerCommandFix, -1, "host:port", NULL}, + {"fix", clusterManagerCommandFix, -1, "host:port", + "search-multiple-owners"}, {"reshard", clusterManagerCommandReshard, -1, "host:port", - "from ,to ,slots ,yes,timeout ,pipeline "}, + "from ,to ,slots ,yes,timeout ,pipeline ," + "replace"}, {"rebalance", clusterManagerCommandRebalance, -1, "host:port", "weight ,use-empty-masters," - "timeout ,simulate,pipeline ,threshold "}, + "timeout ,simulate,pipeline ,threshold ,replace"}, {"add-node", clusterManagerCommandAddNode, 2, "new_host:new_port existing_host:existing_port", "slave,master-id "}, {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL}, @@ -2188,6 +2196,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } +/* Execute MULTI command on a cluster node. */ +static int clusterManagerStartTransaction(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Execute EXEC command on a cluster node. */ +static int clusterManagerExecTransaction(clusterManagerNode *node, + clusterManagerOnReplyError onerror) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success) { + if (reply->type != REDIS_REPLY_ARRAY) { + success = 0; + goto cleanup; + } + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + char *err = NULL; + success = clusterManagerCheckRedisReply(node, r, &err); + if (!success && onerror) success = onerror(r, i); + if (err) { + if (!success) + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } + if (!success) break; + } + } +cleanup: + if (reply) freeReplyObject(reply); + return success; +} + static int clusterManagerNodeConnect(clusterManagerNode *node) { if (node->context) redisFree(node->context); node->context = redisConnect(node->ip, node->port); @@ -2746,6 +2792,84 @@ cleanup: return success; } +static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER SETSLOT %d %s", slot, "STABLE"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerDelSlot(clusterManagerNode *node, int slot, + int ignore_unassigned_err) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER DELSLOTS %d", slot); + char *err = NULL; + int success = clusterManagerCheckRedisReply(node, reply, &err); + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + ignore_unassigned_err && + strstr(reply->str, "already unassigned") != NULL) success = 1; + if (!success && err != NULL) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER ADDSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, + int slot) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER COUNTKEYSINSLOT %d", slot); + int count = -1; + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; + if (reply) freeReplyObject(reply); + return count; +} + +static int clusterManagerBumpEpoch(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { + if (bulk_idx == 0 && reply) { + if (reply->type == REDIS_REPLY_ERROR) + return strstr(reply->str, "already unassigned") != NULL; + } + return 0; +} + +static int clusterManagerSetSlotOwner(clusterManagerNode *owner, + int slot, + int do_clear) +{ + int success = clusterManagerStartTransaction(owner); + if (!success) return 0; + /* Ensure the slot is not already assigned. */ + clusterManagerDelSlot(owner, slot, 1); + /* Add the slot and bump epoch. */ + clusterManagerAddSlot(owner, slot); + if (do_clear) clusterManagerClearSlotStatus(owner, slot); + clusterManagerBumpEpoch(owner); + success = clusterManagerExecTransaction(owner, + clusterManagerIgnoreUnassignedErr); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3623,24 +3747,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ - n->slots[atoi(slot)] = 1; + n->slots[s] = 1; fixed++; } } @@ -3648,7 +3765,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { /* Handle case "2": keys only in one node. */ if (listLength(single) > 0) { - printf("The following uncovered slots have keys in just one node:\n"); + printf("The following uncovered slots have keys in just one node:\n"); clusterManagerPrintSlotsList(single); if (confirmWithYes("Fix these slots by covering with those nodes?")){ listIter li; @@ -3656,6 +3773,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(single, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); assert(entry != NULL); list *nodes = (list *) dictGetVal(entry); @@ -3664,18 +3782,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[atoi(slot)] = 1; @@ -3708,23 +3818,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); - /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER SETSLOT %s %s", slot, "STABLE"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(target, s, 1)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ target->slots[atoi(slot)] = 1; @@ -3735,23 +3832,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *src = nln->value; if (src == target) continue; /* Assign the slot to target node in the source node. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "NODE", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) - fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerSetSlot(src, target, s, + "IMPORTING", NULL)) fixed = -1; if (fixed < 0) goto cleanup; int opts = CLUSTER_MANAGER_OPT_VERBOSE | CLUSTER_MANAGER_OPT_COLD; @@ -3759,12 +3848,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s", slot, - "STABLE"); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerClearSlotStatus(src, s)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; } fixed++; @@ -3888,24 +3973,9 @@ static int clusterManagerFixOpenSlot(int slot) { // Use ADDSLOTS to assign the slot. clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", owner->ip, owner->port); - redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " - "SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; - /* Ensure that the slot is unassigned before assigning it to the - * owner. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerSetSlotOwner(owner, slot, 0); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3913,9 +3983,7 @@ static int clusterManagerFixOpenSlot(int slot) { /* Make sure this information will propagate. Not strictly needed * since there is no past owner, so all the other nodes will accept * whatever epoch this node will claim the slot with. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerBumpEpoch(owner); if (!success) goto cleanup; /* Remove the owner from the list of migrating/importing * nodes. */ @@ -3935,16 +4003,10 @@ static int clusterManagerFixOpenSlot(int slot) { * the owner has been set in the previous condition (owner == NULL). */ assert(owner != NULL); listRewind(owners, &li); - redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(n, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(n, slot, 1); if (!success) goto cleanup; n->slots[slot] = 0; /* Assign the slot to the owner in the node 'n' configuration.' */ @@ -3968,6 +4030,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Case 1: Moving slot %d from " "%s:%d to %s:%d\n", slot, src->ip, src->port, dst->ip, dst->port); + move_opts |= CLUSTER_MANAGER_OPT_UPDATE; success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3986,11 +4049,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; clusterManagerLogInfo(">>> Setting %d as STABLE in " "%s:%d\n", slot, n->ip, n->port); - - redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerClearSlotStatus(n, slot); if (!success) goto cleanup; } /* Since the slot has been moved in "cold" mode, ensure that all the @@ -4000,12 +4059,76 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerNode *n = ln->value; if (n == owner) continue; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } + } + /* Case 3: The slot is in migrating state in one node but multiple + * other nodes claim to be in importing state and don't have any key in + * the slot. We search for the importing node having the same ID as + * the destination node of the migrating node. + * In that case we move the slot from the migrating node to this node and + * we close the importing states on all the other importing nodes. + * If no importing node has the same ID as the destination node of the + * migrating node, the slot's state is closed on both the migrating node + * and the importing nodes. */ + else if (listLength(migrating) == 1 && listLength(importing) > 1) { + int try_to_fix = 1; + clusterManagerNode *src = listFirst(migrating)->value; + clusterManagerNode *dst = NULL; + sds target_id = NULL; + for (int i = 0; i < src->migrating_count; i += 2) { + sds migrating_slot = src->migrating[i]; + if (atoi(migrating_slot) == slot) { + target_id = src->migrating[i + 1]; + break; + } + } + assert(target_id != NULL); + listIter li; + listNode *ln; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) { + try_to_fix = 0; + break; + } + if (strcmp(n->name, target_id) == 0) dst = n; + } + if (!try_to_fix) goto unhandled_case; + if (dst != NULL) { + clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to " + "%s:%d and closing it on all the other " + "importing nodes.\n", + slot, src->ip, src->port, + dst->ip, dst->port); + /* Move the slot to the destination node. */ + success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); + if (!success) goto cleanup; + /* Close slot on all the other importing nodes. */ + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (dst == n) continue; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } else { + clusterManagerLogInfo(">>> Case 3: Closing slot %d on both " + "migrating and importing nodes.\n", slot); + /* Close the slot on both the migrating node and the importing + * nodes. */ + success = clusterManagerClearSlotStatus(src, slot); + if (!success) goto cleanup; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -4022,13 +4145,13 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key or is the - * slot owner. We can just close the slot, probably a reshard interrupted - * in the middle. */ + /* Case 4: There are no slots claiming to be in importing state, but + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard + * interrupted in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n", slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); @@ -4036,6 +4159,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } else { +unhandled_case: success = 0; clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot " "yet (work in progress). Slot is set as " @@ -4053,17 +4177,55 @@ cleanup: return success; } +static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { + clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); + int success = 0; + assert(listLength(owners) > 1); + clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, + slot, + NULL); + if (!owner) owner = listFirst(owners)->value; + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n", + slot, owner->ip, owner->port); + /* Set the slot owner. */ + if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Update configuration in all the other master nodes by assigning the slot + * itself to the new owner, and by eventually migrating keys if the node + * has keys for the slot. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + int count = clusterManagerCountKeysInSlot(n, slot); + success = (count >= 0); + if (!success) break; + clusterManagerDelSlot(n, slot, 1); + if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; + if (count > 0) { + int opts = CLUSTER_MANAGER_OPT_VERBOSE | + CLUSTER_MANAGER_OPT_COLD; + success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); + if (!success) break; + } + } + return success; +} + static int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; - int result = 1; - int do_fix = config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX; clusterManagerNode *node = ln->value; clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", node->ip, node->port); + int result = 1, consistent = 0; + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; if (!quiet) clusterManagerShowNodes(); - if (!clusterManagerIsConfigConsistent()) { + consistent = clusterManagerIsConfigConsistent(); + if (!consistent) { sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); clusterManagerOnError(err); result = 0; @@ -4071,7 +4233,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogOk("[OK] All nodes agree about slots " "configuration.\n"); } - // Check open slots + /* Check open slots */ clusterManagerLogInfo(">>> Check for open slots...\n"); listIter li; listRewind(cluster_manager.nodes, &li); @@ -4130,7 +4292,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogErr("%s.\n", (char *) errstr); sdsfree(errstr); if (do_fix) { - // Fix open slots. + /* Fix open slots. */ dictReleaseIterator(iter); iter = dictGetIterator(open_slots); while ((entry = dictNext(iter)) != NULL) { @@ -4165,6 +4327,51 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } + int search_multiple_owners = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; + if (search_multiple_owners) { + /* Check whether there are multiple owners, even when slots are + * fully covered and there are no open slots. */ + clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); + int slot = 0; + for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + list *owners = listCreate(); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + /* Nodes having keys for the slot will be considered + * owners too. */ + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) listAddNodeTail(owners, n); + } + } + if (listLength(owners) > 1) { + result = 0; + clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", + slot, listLength(owners)); + listRewind(owners, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + clusterManagerLogErr(" %s:%d\n", n->ip, n->port); + } + if (do_fix) { + result = clusterManagerFixMultipleSlotOwners(slot, owners); + if (!result) { + clusterManagerLogErr("Failed to fix multiple owners " + "for slot %d\n", slot); + listRelease(owners); + break; + } + } + } + listRelease(owners); + } + } return result; }