diff --git a/src/redis-cli.c b/src/redis-cli.c index 3db212f4..c9e306fa 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3589,10 +3589,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = node_n->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3620,10 +3627,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3657,7 +3671,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(target, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); @@ -3666,6 +3684,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "CLUSTER SETSLOT %s %s", slot, "STABLE"); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3768,7 +3789,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds migrating_slot = n->migrating[i]; if (atoi(migrating_slot) == slot) { char *sep = (listLength(migrating) == 0 ? "" : ","); - migrating_str = sdscatfmt(migrating_str, "%s%S:%u", + migrating_str = sdscatfmt(migrating_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(migrating, n); is_migrating = 1; @@ -3781,7 +3802,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds importing_slot = n->importing[i]; if (atoi(importing_slot) == slot) { char *sep = (listLength(importing) == 0 ? "" : ","); - importing_str = sdscatfmt(importing_str, "%s%S:%u", + importing_str = sdscatfmt(importing_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(importing, n); is_importing = 1; @@ -3809,8 +3830,10 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - printf("Set as migrating in: %s\n", migrating_str); - printf("Set as importing in: %s\n", importing_str); + if (sdslen(migrating_str) > 0) + printf("Set as migrating in: %s\n", migrating_str); + if (sdslen(importing_str) > 0) + printf("Set as importing in: %s\n", importing_str); /* If there is no slot owner, set as owner the node with the biggest * number of keys, among the set of migrating / importing nodes. */ if (owner == NULL) { @@ -3872,7 +3895,9 @@ static int clusterManagerFixOpenSlot(int slot) { * in migrating state, since migrating is a valid state only for * slot owners. */ if (listLength(owners) > 1) { - owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL); + /* Owner cannot be NULL at this point, since if there are more owners, + * the owner has been set in the previous condition (owner == NULL). */ + assert(owner != NULL); listRewind(owners, &li); redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { @@ -3897,10 +3922,6 @@ static int clusterManagerFixOpenSlot(int slot) { /* Ensure that the node is not in the migrating list. */ clusterManagerRemoveNodeFromList(migrating, n); } - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; } int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; /* Case 1: The slot is in migrating state in one node, and in @@ -3908,6 +3929,9 @@ static int clusterManagerFixOpenSlot(int slot) { if (listLength(migrating) == 1 && listLength(importing) == 1) { clusterManagerNode *src = listFirst(migrating)->value; clusterManagerNode *dst = listFirst(importing)->value; + clusterManagerLogInfo(">>> Case 1: Moving slot %d from " + "%s:%d to %s:%d\n", slot, + src->ip, src->port, dst->ip, dst->port); success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3915,7 +3939,7 @@ static int clusterManagerFixOpenSlot(int slot) { * the slot. In this case we just move all the keys to the owner * according to the configuration. */ else if (listLength(migrating) == 0 && listLength(importing) > 0) { - clusterManagerLogInfo(">>> Moving all the %d slot keys to its " + clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its " "owner %s:%d\n", slot, owner->ip, owner->port); move_opts |= CLUSTER_MANAGER_OPT_COLD; listRewind(importing, &li); @@ -3933,6 +3957,18 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } + /* Since the slot has been moved in "cold" mode, ensure that all the + * other nodes update their own configuration about the slot itself. */ + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) freeReplyObject(r); + if (!success) goto cleanup; + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -3955,6 +3991,8 @@ static int clusterManagerFixOpenSlot(int slot) { * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; + clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); success = clusterManagerCheckRedisReply(n, r, NULL); @@ -4083,6 +4121,7 @@ static int clusterManagerCheckCluster(int quiet) { result = 0; if (do_fix/* && result*/) { dictType dtype = clusterManagerDictType; + dtype.keyDestructor = dictSdsDestructor; dtype.valDestructor = dictListDestructor; clusterManagerUncoveredSlots = dictCreate(&dtype, NULL); int fixed = clusterManagerFixSlotsCoverage(slots);