diff --git a/src/redis-cli.c b/src/redis-cli.c index a3fb065d..a93bd9b1 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); -typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, + clusterManagerNode *n, int bulk_idx); /* Cluster Manager helper functions */ @@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } -/* Execute MULTI command on a cluster node. */ +/* Call MULTI command on a cluster node. */ static int clusterManagerStartTransaction(clusterManagerNode *node) { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); int success = clusterManagerCheckRedisReply(node, reply, NULL); @@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) { return success; } -/* Execute EXEC command on a cluster node. */ +/* Call EXEC command on a cluster node. */ static int clusterManagerExecTransaction(clusterManagerNode *node, clusterManagerOnReplyError onerror) { @@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, redisReply *r = reply->element[i]; char *err = NULL; success = clusterManagerCheckRedisReply(node, r, &err); - if (!success && onerror) success = onerror(r, i); + if (!success && onerror) success = onerror(r, node, i); if (err) { if (!success) CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); @@ -2768,6 +2769,55 @@ cleanup: return success; } +/* Get the node the slot is assigned to from the point of view of node *n. + * If the slot is unassigned or if the reply is an error, return NULL. + * Use the **err argument in order to check wether the slot is unassigned + * or the reply resulted in an error. */ +static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, + int slot, char **err) +{ + assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS); + clusterManagerNode *owner = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS"); + if (clusterManagerCheckRedisReply(n, reply, err)) { + assert(reply->type == REDIS_REPLY_ARRAY); + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3); + int from, to; + from = r->element[0]->integer; + to = r->element[1]->integer; + if (slot < from || slot > to) continue; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2); + char *name = NULL; + if (nr->elements >= 3) + name = nr->element[2]->str; + if (name != NULL) + owner = clusterManagerNodeByName(name); + else { + char *ip = nr->element[0]->str; + assert(ip != NULL); + int port = (int) nr->element[1]->integer; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *nd = ln->value; + if (strcmp(nd->ip, ip) == 0 && port == nd->port) { + owner = nd; + break; + } + } + } + if (owner) break; + } + } + if (reply) freeReplyObject(reply); + return owner; +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, char *err = NULL; int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && - ignore_unassigned_err && - strstr(reply->str, "already unassigned") != NULL) success = 1; + ignore_unassigned_err) + { + char *get_owner_err = NULL; + clusterManagerNode *assigned_to = + clusterManagerGetSlotOwner(node, slot, &get_owner_err); + if (!assigned_to) { + if (get_owner_err == NULL) success = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err); + zfree(get_owner_err); + } + } + } if (!success && err != NULL) { CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); zfree(err); @@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } -static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { - if (bulk_idx == 0 && reply) { - if (reply->type == REDIS_REPLY_ERROR) - return strstr(reply->str, "already unassigned") != NULL; - } - return 0; +/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore + * errors except for ADDSLOTS errors. + * Return 1 if the error should be ignored. */ +static int clusterManagerOnSetOwnerErr(redisReply *reply, + clusterManagerNode *n, int bulk_idx) +{ + UNUSED(reply); + UNUSED(n); + /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */ + return (bulk_idx != 1); } static int clusterManagerSetSlotOwner(clusterManagerNode *owner, @@ -2865,8 +2930,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); clusterManagerBumpEpoch(owner); - success = clusterManagerExecTransaction(owner, - clusterManagerIgnoreUnassignedErr); + success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr); return success; } @@ -2950,8 +3014,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int replace_existing_keys = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int retry = (config.cluster_manager_command.flags & + (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2983,16 +3047,35 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; - int not_served = strstr(migrate_reply->str, "slot not served") != NULL; - if (replace_existing_keys && (is_busy || not_served)) { + int not_served = 0; + if (!is_busy) { + char *get_owner_err = NULL; + clusterManagerNode *served_by = + clusterManagerGetSlotOwner(source, slot, &get_owner_err); + if (!served_by) { + if (get_owner_err == NULL) not_served = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, + get_owner_err); + zfree(get_owner_err); + } + } + } + if (retry && (is_busy || not_served)) { /* If the key already exists, try to migrate keys * adding REPLACE option. * If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) + if (not_served) { + clusterManagerLogWarn("*** Slot was not served, setting " + "owner to node %s:%d.\n", + target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + } + if (is_busy) { + clusterManagerLogWarn("*** Target key exists. " + "Replacing it for FIX.\n"); + } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, target, @@ -4252,7 +4335,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4270,7 +4353,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4333,7 +4416,7 @@ static int clusterManagerCheckCluster(int quiet) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); - int slot = 0; + int slot = 0, slots_with_multiple_owners = 0; for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { listIter li; listNode *ln; @@ -4359,6 +4442,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerNode *n = ln->value; clusterManagerLogErr(" %s:%d\n", n->ip, n->port); } + slots_with_multiple_owners++; if (do_fix) { result = clusterManagerFixMultipleSlotOwners(slot, owners); if (!result) { @@ -4366,11 +4450,13 @@ static int clusterManagerCheckCluster(int quiet) { "for slot %d\n", slot); listRelease(owners); break; - } + } else slots_with_multiple_owners--; } } listRelease(owners); } + if (slots_with_multiple_owners == 0) + clusterManagerLogOk("[OK] No multiple owners found.\n"); } return result; }