From d935cfcb89fe70a0d9c039605d1df38d0be59db7 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 12 Dec 2018 13:23:08 +0100 Subject: [PATCH 1/4] Cluster Manager: avoid using reply error messages to check slot status. Slot assignment status is now checked by using CLUSTER SLOTS. Furthermore, one memory leak has been fixed. --- src/redis-cli.c | 136 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 25 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a3fb065d..a93bd9b1 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); -typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, + clusterManagerNode *n, int bulk_idx); /* Cluster Manager helper functions */ @@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } -/* Execute MULTI command on a cluster node. */ +/* Call MULTI command on a cluster node. */ static int clusterManagerStartTransaction(clusterManagerNode *node) { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); int success = clusterManagerCheckRedisReply(node, reply, NULL); @@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) { return success; } -/* Execute EXEC command on a cluster node. */ +/* Call EXEC command on a cluster node. */ static int clusterManagerExecTransaction(clusterManagerNode *node, clusterManagerOnReplyError onerror) { @@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, redisReply *r = reply->element[i]; char *err = NULL; success = clusterManagerCheckRedisReply(node, r, &err); - if (!success && onerror) success = onerror(r, i); + if (!success && onerror) success = onerror(r, node, i); if (err) { if (!success) CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); @@ -2768,6 +2769,55 @@ cleanup: return success; } +/* Get the node the slot is assigned to from the point of view of node *n. + * If the slot is unassigned or if the reply is an error, return NULL. + * Use the **err argument in order to check wether the slot is unassigned + * or the reply resulted in an error. */ +static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, + int slot, char **err) +{ + assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS); + clusterManagerNode *owner = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS"); + if (clusterManagerCheckRedisReply(n, reply, err)) { + assert(reply->type == REDIS_REPLY_ARRAY); + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3); + int from, to; + from = r->element[0]->integer; + to = r->element[1]->integer; + if (slot < from || slot > to) continue; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2); + char *name = NULL; + if (nr->elements >= 3) + name = nr->element[2]->str; + if (name != NULL) + owner = clusterManagerNodeByName(name); + else { + char *ip = nr->element[0]->str; + assert(ip != NULL); + int port = (int) nr->element[1]->integer; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *nd = ln->value; + if (strcmp(nd->ip, ip) == 0 && port == nd->port) { + owner = nd; + break; + } + } + } + if (owner) break; + } + } + if (reply) freeReplyObject(reply); + return owner; +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, char *err = NULL; int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && - ignore_unassigned_err && - strstr(reply->str, "already unassigned") != NULL) success = 1; + ignore_unassigned_err) + { + char *get_owner_err = NULL; + clusterManagerNode *assigned_to = + clusterManagerGetSlotOwner(node, slot, &get_owner_err); + if (!assigned_to) { + if (get_owner_err == NULL) success = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err); + zfree(get_owner_err); + } + } + } if (!success && err != NULL) { CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); zfree(err); @@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } -static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { - if (bulk_idx == 0 && reply) { - if (reply->type == REDIS_REPLY_ERROR) - return strstr(reply->str, "already unassigned") != NULL; - } - return 0; +/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore + * errors except for ADDSLOTS errors. + * Return 1 if the error should be ignored. */ +static int clusterManagerOnSetOwnerErr(redisReply *reply, + clusterManagerNode *n, int bulk_idx) +{ + UNUSED(reply); + UNUSED(n); + /* Only raise error when ADDSLOTS fail (bulk_idx == 1). */ + return (bulk_idx != 1); } static int clusterManagerSetSlotOwner(clusterManagerNode *owner, @@ -2865,8 +2930,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); clusterManagerBumpEpoch(owner); - success = clusterManagerExecTransaction(owner, - clusterManagerIgnoreUnassignedErr); + success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr); return success; } @@ -2950,8 +3014,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int replace_existing_keys = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int retry = (config.cluster_manager_command.flags & + (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -2983,16 +3047,35 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; - int not_served = strstr(migrate_reply->str, "slot not served") != NULL; - if (replace_existing_keys && (is_busy || not_served)) { + int not_served = 0; + if (!is_busy) { + char *get_owner_err = NULL; + clusterManagerNode *served_by = + clusterManagerGetSlotOwner(source, slot, &get_owner_err); + if (!served_by) { + if (get_owner_err == NULL) not_served = 1; + else { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, + get_owner_err); + zfree(get_owner_err); + } + } + } + if (retry && (is_busy || not_served)) { /* If the key already exists, try to migrate keys * adding REPLACE option. * If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) + if (not_served) { + clusterManagerLogWarn("*** Slot was not served, setting " + "owner to node %s:%d.\n", + target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + } + if (is_busy) { + clusterManagerLogWarn("*** Target key exists. " + "Replacing it for FIX.\n"); + } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, target, @@ -4252,7 +4335,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4270,7 +4353,7 @@ static int clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); + dictReplace(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -4333,7 +4416,7 @@ static int clusterManagerCheckCluster(int quiet) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); - int slot = 0; + int slot = 0, slots_with_multiple_owners = 0; for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { listIter li; listNode *ln; @@ -4359,6 +4442,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerNode *n = ln->value; clusterManagerLogErr(" %s:%d\n", n->ip, n->port); } + slots_with_multiple_owners++; if (do_fix) { result = clusterManagerFixMultipleSlotOwners(slot, owners); if (!result) { @@ -4366,11 +4450,13 @@ static int clusterManagerCheckCluster(int quiet) { "for slot %d\n", slot); listRelease(owners); break; - } + } else slots_with_multiple_owners--; } } listRelease(owners); } + if (slots_with_multiple_owners == 0) + clusterManagerLogOk("[OK] No multiple owners found.\n"); } return result; } From 143bfa1e6e65cf8be1eaad0b8169e2d95ca62f9a Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Dec 2018 17:38:57 +0100 Subject: [PATCH 2/4] Cluster Manager: compare key values after BUSYKEY error (migration). If a key exists in the target node during a migration (BUSYKEY), the value of the key on both nodes (source and target) will be compared. If the key has the same value on both keys, the migration will be automatically retried with the REPLACE argument in order to override the target's key. If the key has different values, the behaviour will depend on such cases: - In case of 'fix' command, the migration will stop and the user will be warned to manually check the key(s). - In other cases (ie. reshard), if the user launched the command with the --cluster-replace option, the migration will be retried with the REPLACE argument, elsewhere the migration will stop and the user will be warned. --- src/redis-cli.c | 133 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 124 insertions(+), 9 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a93bd9b1..b0a12ebb 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2934,6 +2934,68 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, return success; } +/* Get the hash for the values of the specified keys in *keys_reply for the + * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command + * on both nodes. Every key with same name on both nodes but having different + * values will be added to the *diffs list. Return 0 in case of reply + * error. */ +static int clusterManagerCompareKeysValues(clusterManagerNode *n1, + clusterManagerNode *n2, + redisReply *keys_reply, + list *diffs) +{ + size_t i, argc = keys_reply->elements + 2; + static const char *hash_zero = "0000000000000000000000000000000000000000"; + char **argv = zcalloc(argc * sizeof(char *)); + size_t *argv_len = zcalloc(argc * sizeof(size_t)); + argv[0] = "DEBUG"; + argv_len[0] = 5; + argv[1] = "DIGEST-VALUE"; + argv_len[1] = 12; + for (i = 0; i < keys_reply->elements; i++) { + redisReply *entry = keys_reply->element[i]; + int idx = i + 2; + argv[idx] = entry->str; + argv_len[idx] = entry->len; + } + int success = 0; + void *_reply1 = NULL, *_reply2 = NULL; + redisReply *r1 = NULL, *r2 = NULL; + redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n1->context, &_reply1) == REDIS_OK); + if (!success) goto cleanup; + r1 = (redisReply *) _reply1; + redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n2->context, &_reply2) == REDIS_OK); + if (!success) goto cleanup; + r2 = (redisReply *) _reply2; + success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR); + if (r1->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str); + success = 0; + } + if (r2->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str); + success = 0; + } + if (!success) goto cleanup; + assert(keys_reply->elements == r1->elements && + keys_reply->elements == r2->elements); + for (i = 0; i < keys_reply->elements; i++) { + char *key = keys_reply->element[i]->str; + char *hash1 = r1->element[i]->str; + char *hash2 = r2->element[i]->str; + /* Ignore keys that don't exist in both nodes. */ + if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0) + continue; + if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key); + } +cleanup: + if (r1) freeReplyObject(r1); + if (r2) freeReplyObject(r2); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3014,8 +3076,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int retry = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; + int do_replace = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_REPLACE; while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -3049,6 +3113,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; int not_served = 0; if (!is_busy) { + /* Check if the slot is unassigned (not served) in the + * source node's configuration. */ char *get_owner_err = NULL; clusterManagerNode *served_by = clusterManagerGetSlotOwner(source, slot, &get_owner_err); @@ -3061,20 +3127,69 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, } } } - if (retry && (is_busy || not_served)) { - /* If the key already exists, try to migrate keys - * adding REPLACE option. - * If the key's slot is not served, try to assign slot + /* Try to handle errors. */ + if (is_busy || not_served) { + /* If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) { + if (do_fix && not_served) { clusterManagerLogWarn("*** Slot was not served, setting " "owner to node %s:%d.\n", target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); } + /* If the key already exists in the target node (BUSYKEY), + * check whether its value is the same in both nodes. + * In case of equal values, retry migration with the + * REPLACE option. + * In case of different values: + * - If the migration is requested by the fix command, stop + * and warn the user. + * - In other cases (ie. reshard), proceed only if the user + * launched the command with the --cluster-replace option.*/ if (is_busy) { - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + clusterManagerLogWarn("\n*** Target key exists, " + "checking values...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success && (do_fix || !do_replace)) { + listRelease(diffs); + clusterManagerLogErr("*** Value check failed!\n"); + goto next; + } + if (listLength(diffs) > 0 && (do_fix || !do_replace)) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually "); + if (do_fix) + clusterManagerLogErr("and try again!\n"); + else { + clusterManagerLogErr("or relaunch the command " + "with --cluster-replace " + "option to force key " + "overriding.\n"); + } + listRelease(diffs); + goto next; + } + listRelease(diffs); + clusterManagerLogWarn("*** Replacing target keys...\n"); } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source, From cc29590188a22eb73cfbbef39fce73c7467b1edf Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Dec 2018 18:39:21 +0100 Subject: [PATCH 3/4] Fixed memory leak in clusterManagerCompareKeysValues. --- src/redis-cli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index b0a12ebb..705c7483 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2993,6 +2993,8 @@ static int clusterManagerCompareKeysValues(clusterManagerNode *n1, cleanup: if (r1) freeReplyObject(r1); if (r2) freeReplyObject(r2); + zfree(argv); + zfree(argv_len); return success; } From 503fd229e4181e932ba74b3ca8a222712d80ebca Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 19 Dec 2018 17:27:58 +0100 Subject: [PATCH 4/4] Cluster Manager: enable --cluster-replace also for 'fix' command. --- src/redis-cli.c | 69 ++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 705c7483..6fe93e66 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3149,48 +3149,47 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, * - In other cases (ie. reshard), proceed only if the user * launched the command with the --cluster-replace option.*/ if (is_busy) { - clusterManagerLogWarn("\n*** Target key exists, " - "checking values...\n"); - list *diffs = listCreate(); - success = clusterManagerCompareKeysValues(source, - target, reply, diffs); - if (!success && (do_fix || !do_replace)) { - listRelease(diffs); - clusterManagerLogErr("*** Value check failed!\n"); - goto next; - } - if (listLength(diffs) > 0 && (do_fix || !do_replace)) { - success = 0; - clusterManagerLogErr( - "*** Found %d key(s) in both source node and " - "target node having different values.\n" - " Source node: %s:%d\n" - " Target node: %s:%d\n" - " Keys(s):\n", - listLength(diffs), - source->ip, source->port, - target->ip, target->port); - listIter dli; - listNode *dln; - listRewind(diffs, &dli); - while((dln = listNext(&dli)) != NULL) { - char *k = dln->value; - clusterManagerLogErr(" - %s\n", k); + clusterManagerLogWarn("\n*** Target key exists\n"); + if (!do_replace) { + clusterManagerLogWarn("*** Checking key values on " + "both nodes...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success) { + clusterManagerLogErr("*** Value check failed!\n"); + listRelease(diffs); + goto next; } - clusterManagerLogErr("Please fix the above key(s) " - "manually "); - if (do_fix) - clusterManagerLogErr("and try again!\n"); - else { - clusterManagerLogErr("or relaunch the command " + if (listLength(diffs) > 0) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually and try again " + "or relaunch the command \n" "with --cluster-replace " "option to force key " "overriding.\n"); + listRelease(diffs); + goto next; } listRelease(diffs); - goto next; } - listRelease(diffs); clusterManagerLogWarn("*** Replacing target keys...\n"); } freeReplyObject(migrate_reply);