diff --git a/src/redis-cli.c b/src/redis-cli.c index ac021e85..e6e20b6f 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err); - } + } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str); goto cleanup; } cleanup: @@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { + /* If the key already exists, try to migrate keys + * adding REPLACE option. + * If the key's slot is not served, try to assign slot + * to the target node. */ + int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); + if (strstr(migrate_reply->str, "slot not served") != NULL) + clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerLogWarn("*** Target key exists. " "Replacing it for FIX.\n"); freeReplyObject(migrate_reply); - /* Try to migrate keys adding REPLACE option. */ migrate_reply = clusterManagerMigrateKeysInReply(source, target, reply, - 1, timeout, + is_busy, + timeout, NULL); success = (migrate_reply != NULL && migrate_reply->type != REDIS_REPLY_ERROR); @@ -3670,14 +3676,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { while ((nln = listNext(&nli)) != NULL) { clusterManagerNode *src = nln->value; if (src == target) continue; + /* Assign the slot to target node in the source node. */ + redisReply *r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s %s", slot, + "NODE", target->name); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, + r = CLUSTER_MANAGER_COMMAND(src, "CLUSTER SETSLOT %s %s %s", slot, "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(target, r, NULL)) + if (!clusterManagerCheckRedisReply(src, r, NULL)) fixed = -1; if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; @@ -3687,6 +3701,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } + r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s", slot, + "STABLE"); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; } fixed++; } @@ -3707,7 +3728,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Fixing open slot %d\n", slot); /* Try to obtain the current slot owner, according to the current * nodes configuration. */ - int success = 1; + int success = 1, keys_in_multiple_nodes = 0; list *owners = listCreate(); list *migrating = listCreate(); list *importing = listCreate(); @@ -3720,11 +3741,23 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - if (n->slots[slot]) { - if (owner == NULL) owner = n; - listAddNodeTail(owners, n); + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER COUNTKEYSINSLOT %d", slot); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (success && r->integer > 0) { + clusterManagerLogWarn("*** Found keys about slot %d " + "in non-owner node %s:%d!\n", slot, + n->ip, n->port); + listAddNodeTail(owners, n); + keys_in_multiple_nodes = 1; + } + if (r) freeReplyObject(r); + if (!success) goto cleanup; } } + if (listLength(owners) == 1) owner = listFirst(owners)->value; listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; @@ -3767,6 +3800,9 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogWarn("*** Found keys about slot %d " "in node %s:%d!\n", slot, n->ip, n->port); + char *sep = (listLength(importing) == 0 ? "" : ","); + importing_str = sdscatfmt(importing_str, "%s%S:%u", + sep, n->ip, n->port); listAddNodeTail(importing, n); } if (r) freeReplyObject(r); @@ -3799,6 +3835,15 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); if (!success) goto cleanup; + /* Ensure that the slot is unassigned before assigning it to the + * owner. */ + reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); + success = clusterManagerCheckRedisReply(owner, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; + if (reply) freeReplyObject(reply); + if (!success) goto cleanup; reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); @@ -3835,12 +3880,22 @@ static int clusterManagerFixOpenSlot(int slot) { if (n == owner) continue; reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); success = clusterManagerCheckRedisReply(n, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); if (!success) goto cleanup; + n->slots[slot] = 0; + /* Assign the slot to the owner in the node 'n' configuration.' */ + success = clusterManagerSetSlot(n, owner, slot, "node", NULL); + if (!success) goto cleanup; success = clusterManagerSetSlot(n, owner, slot, "importing", NULL); if (!success) goto cleanup; - clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates + /* Avoid duplicates. */ + clusterManagerRemoveNodeFromList(importing, n); listAddNodeTail(importing, n); + /* Ensure that the node is not in the migrating list. */ + clusterManagerRemoveNodeFromList(migrating, n); } reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); success = clusterManagerCheckRedisReply(owner, reply, NULL); @@ -3883,18 +3938,21 @@ static int clusterManagerFixOpenSlot(int slot) { listLength(migrating) == 1); if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER GETKEYSINSLOT %d %d", slot, 10); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) { - if (success) try_to_close_slot = (r->elements == 0); - freeReplyObject(r); + if (!owner || owner != n) { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER GETKEYSINSLOT %d %d", slot, 10); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) { + if (success) try_to_close_slot = (r->elements == 0); + freeReplyObject(r); + } + if (!success) goto cleanup; } - if (!success) goto cleanup; } /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key. We - * can just close the slot, probably a reshard interrupted in the middle. */ + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard interrupted + * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",