diff --git a/src/redis-cli.c b/src/redis-cli.c index a3fb065d..6fe93e66 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); -typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, + clusterManagerNode *n, int bulk_idx); /* Cluster Manager helper functions */ @@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } -/* Execute MULTI command on a cluster node. */ +/* Call MULTI command on a cluster node. */ static int clusterManagerStartTransaction(clusterManagerNode *node) { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); int success = clusterManagerCheckRedisReply(node, reply, NULL); @@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) { return success; } -/* Execute EXEC command on a cluster node. */ +/* Call EXEC command on a cluster node. */ static int clusterManagerExecTransaction(clusterManagerNode *node, clusterManagerOnReplyError onerror) { @@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, redisReply *r = reply->element[i]; char *err = NULL; success = clusterManagerCheckRedisReply(node, r, &err); - if (!success && onerror) success = onerror(r, i); + if (!success && onerror) success = onerror(r, node, i); if (err) { if (!success) CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); @@ -2768,6 +2769,55 @@ cleanup: return success; } +/* Get the node the slot is assigned to from the point of view of node *n. + * If the slot is unassigned or if the reply is an error, return NULL. + * Use the **err argument in order to check wether the slot is unassigned + * or the reply resulted in an error. */ +static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, + int slot, char **err) +{ + assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS); + clusterManagerNode *owner = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS"); + if (clusterManagerCheckRedisReply(n, reply, err)) { + assert(reply->type == REDIS_REPLY_ARRAY); + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3); + int from, to; + from = r->element[0]->integer; + to = r->element[1]->integer; + if (slot < from || slot > to) continue; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2); + char *name = NULL; + if (nr->elements >= 3) + name = nr->element[2]->str; + if (name != NULL) + owner = clusterManagerNodeByName(name); + else { + char *ip = nr->element[0]->str; + assert(ip != NULL); + int port = (int) nr->element[1]->integer; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *nd = ln->value; + if (strcmp(nd->ip, ip) == 0 && port == nd->port) { + owner = nd; + break; + } + } + } + if (owner) break; + } + } + if (reply) freeReplyObject(reply); + return owner; +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, char *err = NULL; int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && - ignore_unassigned_err && - strstr(reply->str, "already unassigned") != NULL) success = 1; + ignore_unassigned_err) + { + char *get_owner_err = NULL; + clusterManagerNode *assigned_to = + clusterManagerGetSlotOwner(node, slot, &get_owner_err); + if (!assigned_to) { + if (get_owner_err == NULL) success = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err); + zfree(get_owner_err); + } + } + } if (!success && err != NULL) { CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); zfree(err); @@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } -static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { - if (bulk_idx == 0 && reply) { - if (reply->type == REDIS_REPLY_ERROR) - return strstr(reply->str, "already unassigned") != NULL; - } - return 0; +/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore + * errors except for ADDSLOTS errors. + * Return 1 if the error should be ignored. */ +static int clusterManagerOnSetOwnerErr(redisReply *reply, + clusterManagerNode *n, int bulk_idx) +{ + UNUSED(reply); + UNUSED(n); + /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */ + return (bulk_idx != 1); } static int clusterManagerSetSlotOwner(clusterManagerNode *owner, @@ -2865,8 +2930,71 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); clusterManagerBumpEpoch(owner); - success = clusterManagerExecTransaction(owner, - clusterManagerIgnoreUnassignedErr); + success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr); + return success; +} + +/* Get the hash for the values of the specified keys in *keys_reply for the + * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command + * on both nodes. Every key with same name on both nodes but having different + * values will be added to the *diffs list. Return 0 in case of reply + * error. */ +static int clusterManagerCompareKeysValues(clusterManagerNode *n1, + clusterManagerNode *n2, + redisReply *keys_reply, + list *diffs) +{ + size_t i, argc = keys_reply->elements + 2; + static const char *hash_zero = "0000000000000000000000000000000000000000"; + char **argv = zcalloc(argc * sizeof(char *)); + size_t *argv_len = zcalloc(argc * sizeof(size_t)); + argv[0] = "DEBUG"; + argv_len[0] = 5; + argv[1] = "DIGEST-VALUE"; + argv_len[1] = 12; + for (i = 0; i < keys_reply->elements; i++) { + redisReply *entry = keys_reply->element[i]; + int idx = i + 2; + argv[idx] = entry->str; + argv_len[idx] = entry->len; + } + int success = 0; + void *_reply1 = NULL, *_reply2 = NULL; + redisReply *r1 = NULL, *r2 = NULL; + redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n1->context, &_reply1) == REDIS_OK); + if (!success) goto cleanup; + r1 = (redisReply *) _reply1; + redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n2->context, &_reply2) == REDIS_OK); + if (!success) goto cleanup; + r2 = (redisReply *) _reply2; + success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR); + if (r1->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str); + success = 0; + } + if (r2->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str); + success = 0; + } + if (!success) goto cleanup; + assert(keys_reply->elements == r1->elements && + keys_reply->elements == r2->elements); + for (i = 0; i < keys_reply->elements; i++) { + char *key = keys_reply->element[i]->str; + char *hash1 = r1->element[i]->str; + char *hash2 = r2->element[i]->str; + /* Ignore keys that don't exist in both nodes. */ + if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0) + continue; + if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key); + } +cleanup: + if (r1) freeReplyObject(r1); + if (r2) freeReplyObject(r2); + zfree(argv); + zfree(argv_len); return success; } @@ -2950,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int replace_existing_keys = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; + int do_replace = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_REPLACE; while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2983,16 +3113,85 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; - int not_served = strstr(migrate_reply->str, "slot not served") != NULL; - if (replace_existing_keys && (is_busy || not_served)) { - /* If the key already exists, try to migrate keys - * adding REPLACE option. - * If the key's slot is not served, try to assign slot + int not_served = 0; + if (!is_busy) { + /* Check if the slot is unassigned (not served) in the + * source node's configuration. */ + char *get_owner_err = NULL; + clusterManagerNode *served_by = + clusterManagerGetSlotOwner(source, slot, &get_owner_err); + if (!served_by) { + if (get_owner_err == NULL) not_served = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, + get_owner_err); + zfree(get_owner_err); + } + } + } + /* Try to handle errors. */ + if (is_busy || not_served) { + /* If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) + if (do_fix && not_served) { + clusterManagerLogWarn("*** Slot was not served, setting " + "owner to node %s:%d.\n", + target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + } + /* If the key already exists in the target node (BUSYKEY), + * check whether its value is the same in both nodes. + * In case of equal values, retry migration with the + * REPLACE option. + * In case of different values: + * - If the migration is requested by the fix command, stop + * and warn the user. + * - In other cases (ie. reshard), proceed only if the user + * launched the command with the --cluster-replace option.*/ + if (is_busy) { + clusterManagerLogWarn("\n*** Target key exists\n"); + if (!do_replace) { + clusterManagerLogWarn("*** Checking key values on " + "both nodes...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success) { + clusterManagerLogErr("*** Value check failed!\n"); + listRelease(diffs); + goto next; + } + if (listLength(diffs) > 0) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually and try again " + "or relaunch the command \n" + "with --cluster-replace " + "option to force key " + "overriding.\n"); + listRelease(diffs); + goto next; + } + listRelease(diffs); + } + clusterManagerLogWarn("*** Replacing target keys...\n"); + } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, target, @@ -4252,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4270,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4333,7 +4532,7 @@ static int clusterManagerCheckCluster(int quiet) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); - int slot = 0; + int slot = 0, slots_with_multiple_owners = 0; for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { listIter li; listNode *ln; @@ -4359,6 +4558,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerNode *n = ln->value; clusterManagerLogErr(" %s:%d\n", n->ip, n->port); } + slots_with_multiple_owners++; if (do_fix) { result = clusterManagerFixMultipleSlotOwners(slot, owners); if (!result) { @@ -4366,11 +4566,13 @@ static int clusterManagerCheckCluster(int quiet) { "for slot %d\n", slot); listRelease(owners); break; - } + } else slots_with_multiple_owners--; } } listRelease(owners); } + if (slots_with_multiple_owners == 0) + clusterManagerLogOk("[OK] No multiple owners found.\n"); } return result; }