diff --git a/src/redis-cli.c b/src/redis-cli.c index 57f812b9..61776c72 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err); - } + } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str); goto cleanup; } cleanup: @@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { + /* If the key already exists, try to migrate keys + * adding REPLACE option. + * If the key's slot is not served, try to assign slot + * to the target node. */ + int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); + if (strstr(migrate_reply->str, "slot not served") != NULL) + clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerLogWarn("*** Target key exists. " "Replacing it for FIX.\n"); freeReplyObject(migrate_reply); - /* Try to migrate keys adding REPLACE option. */ migrate_reply = clusterManagerMigrateKeysInReply(source, target, reply, - 1, timeout, + is_busy, + timeout, NULL); success = (migrate_reply != NULL && migrate_reply->type != REDIS_REPLY_ERROR); @@ -3301,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { nodename = token; tot_size = (p - token); name_len = tot_size++; // Make room for ':' in tot_size - } else if (i == 8) break; - i++; + } + if (++i == 8) break; } if (i != 8) continue; if (nodename == NULL) continue; @@ -3341,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { char *sp = cfg + name_len; *(sp++) = ':'; for (i = 0; i < c; i++) { - if (i > 0) *(sp++) = '|'; + if (i > 0) *(sp++) = ','; int slen = strlen(slots[i]); memcpy(sp, slots[i], slen); sp += slen; @@ -3583,10 +3589,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = node_n->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3614,10 +3627,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3651,7 +3671,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(target, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); @@ -3660,6 +3684,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "CLUSTER SETSLOT %s %s", slot, "STABLE"); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3670,14 +3697,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { while ((nln = listNext(&nli)) != NULL) { clusterManagerNode *src = nln->value; if (src == target) continue; + /* Assign the slot to target node in the source node. */ + redisReply *r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s %s", slot, + "NODE", target->name); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, + r = CLUSTER_MANAGER_COMMAND(src, "CLUSTER SETSLOT %s %s %s", slot, "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(target, r, NULL)) + if (!clusterManagerCheckRedisReply(src, r, NULL)) fixed = -1; if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; @@ -3687,6 +3722,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } + r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s", slot, + "STABLE"); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; } fixed++; } @@ -3720,11 +3762,22 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - if (n->slots[slot]) { - if (owner == NULL) owner = n; - listAddNodeTail(owners, n); + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER COUNTKEYSINSLOT %d", slot); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (success && r->integer > 0) { + clusterManagerLogWarn("*** Found keys about slot %d " + "in non-owner node %s:%d!\n", slot, + n->ip, n->port); + listAddNodeTail(owners, n); + } + if (r) freeReplyObject(r); + if (!success) goto cleanup; } } + if (listLength(owners) == 1) owner = listFirst(owners)->value; listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; @@ -3735,7 +3788,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds migrating_slot = n->migrating[i]; if (atoi(migrating_slot) == slot) { char *sep = (listLength(migrating) == 0 ? "" : ","); - migrating_str = sdscatfmt(migrating_str, "%s%S:%u", + migrating_str = sdscatfmt(migrating_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(migrating, n); is_migrating = 1; @@ -3748,7 +3801,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds importing_slot = n->importing[i]; if (atoi(importing_slot) == slot) { char *sep = (listLength(importing) == 0 ? "" : ","); - importing_str = sdscatfmt(importing_str, "%s%S:%u", + importing_str = sdscatfmt(importing_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(importing, n); is_importing = 1; @@ -3767,15 +3820,20 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogWarn("*** Found keys about slot %d " "in node %s:%d!\n", slot, n->ip, n->port); + char *sep = (listLength(importing) == 0 ? "" : ","); + importing_str = sdscatfmt(importing_str, "%s%S:%u", + sep, n->ip, n->port); listAddNodeTail(importing, n); } if (r) freeReplyObject(r); if (!success) goto cleanup; } } - printf("Set as migrating in: %s\n", migrating_str); - printf("Set as importing in: %s\n", importing_str); - /* If there is no slot owner, set as owner the slot with the biggest + if (sdslen(migrating_str) > 0) + printf("Set as migrating in: %s\n", migrating_str); + if (sdslen(importing_str) > 0) + printf("Set as importing in: %s\n", importing_str); + /* If there is no slot owner, set as owner the node with the biggest * number of keys, among the set of migrating / importing nodes. */ if (owner == NULL) { clusterManagerLogInfo(">>> Nobody claims ownership, " @@ -3799,6 +3857,15 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); if (!success) goto cleanup; + /* Ensure that the slot is unassigned before assigning it to the + * owner. */ + reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); + success = clusterManagerCheckRedisReply(owner, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; + if (reply) freeReplyObject(reply); + if (!success) goto cleanup; reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); @@ -3827,32 +3894,43 @@ static int clusterManagerFixOpenSlot(int slot) { * in migrating state, since migrating is a valid state only for * slot owners. */ if (listLength(owners) > 1) { - owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL); + /* Owner cannot be NULL at this point, since if there are more owners, + * the owner has been set in the previous condition (owner == NULL). */ + assert(owner != NULL); listRewind(owners, &li); redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot); + reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); success = clusterManagerCheckRedisReply(n, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); if (!success) goto cleanup; + n->slots[slot] = 0; + /* Assign the slot to the owner in the node 'n' configuration.' */ + success = clusterManagerSetSlot(n, owner, slot, "node", NULL); + if (!success) goto cleanup; success = clusterManagerSetSlot(n, owner, slot, "importing", NULL); if (!success) goto cleanup; - clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates + /* Avoid duplicates. */ + clusterManagerRemoveNodeFromList(importing, n); listAddNodeTail(importing, n); + /* Ensure that the node is not in the migrating list. */ + clusterManagerRemoveNodeFromList(migrating, n); } - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; } int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; - /* Case 1: The slot is in migrating state in one slot, and in - * importing state in 1 slot. That's trivial to address. */ + /* Case 1: The slot is in migrating state in one node, and in + * importing state in 1 node. That's trivial to address. */ if (listLength(migrating) == 1 && listLength(importing) == 1) { clusterManagerNode *src = listFirst(migrating)->value; clusterManagerNode *dst = listFirst(importing)->value; + clusterManagerLogInfo(">>> Case 1: Moving slot %d from " + "%s:%d to %s:%d\n", slot, + src->ip, src->port, dst->ip, dst->port); success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3860,7 +3938,7 @@ static int clusterManagerFixOpenSlot(int slot) { * the slot. In this case we just move all the keys to the owner * according to the configuration. */ else if (listLength(migrating) == 0 && listLength(importing) > 0) { - clusterManagerLogInfo(">>> Moving all the %d slot keys to its " + clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its " "owner %s:%d\n", slot, owner->ip, owner->port); move_opts |= CLUSTER_MANAGER_OPT_COLD; listRewind(importing, &li); @@ -3878,25 +3956,42 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } + /* Since the slot has been moved in "cold" mode, ensure that all the + * other nodes update their own configuration about the slot itself. */ + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) freeReplyObject(r); + if (!success) goto cleanup; + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER GETKEYSINSLOT %d %d", slot, 10); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) { - if (success) try_to_close_slot = (r->elements == 0); - freeReplyObject(r); + if (!owner || owner != n) { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER GETKEYSINSLOT %d %d", slot, 10); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) { + if (success) try_to_close_slot = (r->elements == 0); + freeReplyObject(r); + } + if (!success) goto cleanup; } - if (!success) goto cleanup; } /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key. We - * can just close the slot, probably a reshard interrupted in the middle. */ + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard interrupted + * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; + clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); success = clusterManagerCheckRedisReply(n, r, NULL); @@ -4025,6 +4120,7 @@ static int clusterManagerCheckCluster(int quiet) { result = 0; if (do_fix/* && result*/) { dictType dtype = clusterManagerDictType; + dtype.keyDestructor = dictSdsDestructor; dtype.valDestructor = dictListDestructor; clusterManagerUncoveredSlots = dictCreate(&dtype, NULL); int fixed = clusterManagerFixSlotsCoverage(slots);