From 5bf13eaaf8f444bb547b00ed5ecd727a93cd8399 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 22 Nov 2018 11:47:59 +0100 Subject: [PATCH 1/6] Cluster Manager: check/fix commands now handle multiple owners even if all slots are covered and not open. --- src/redis-cli.c | 135 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 6 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e558a30..9d8c0bca 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,6 +2746,41 @@ cleanup: return success; } +static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER DELSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerAddSlot(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER ADDSLOTS %d", slot); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node, + int slot) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER COUNTKEYSINSLOT %d", slot); + int count = -1; + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer; + if (reply) freeReplyObject(reply); + return count; +} + +static int clusterManagerBumpEpoch(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -4053,17 +4088,62 @@ cleanup: return success; } +static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { + clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot); + int success = 0; + assert(listLength(owners) > 1); + clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners, + slot, + NULL); + if (!owner) owner = listFirst(owners)->value; + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", + slot, owner->ip, owner->port); + /* Set the owner node by calling DELSLOTS in order to unassign the slot + * in case it's already assigned to another node and by finally calling + * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it + * could reply with an "already unassigned" error and if it should fail + * for other reasons, it would lead to a failure in the follwing ADDSLOTS + * command. */ + clusterManagerDelSlot(owner, slot); + if (!clusterManagerAddSlot(owner, slot)) return 0; + if (!clusterManagerBumpEpoch(owner)) return 0; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Update configuration in all the other master nodes by assigning the slot + * itself to the new owner, and by eventually migrating keys if the node + * has keys for the slot. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + int count = clusterManagerCountKeysInSlot(n, slot); + success = (count >= 0); + if (!success) break; + clusterManagerDelSlot(n, slot); + if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; + if (count > 0) { + int opts = CLUSTER_MANAGER_OPT_VERBOSE | + CLUSTER_MANAGER_OPT_COLD; + success = clusterManagerMoveSlot(n, owner, slot, opts, NULL); + if (!success) break; + } + } + return success; +} + static int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; - int result = 1; - int do_fix = config.cluster_manager_command.flags & - CLUSTER_MANAGER_CMD_FLAG_FIX; clusterManagerNode *node = ln->value; clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n", node->ip, node->port); + int result = 1, consistent = 0; + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; if (!quiet) clusterManagerShowNodes(); - if (!clusterManagerIsConfigConsistent()) { + consistent = clusterManagerIsConfigConsistent(); + if (!consistent) { sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); clusterManagerOnError(err); result = 0; @@ -4071,7 +4151,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogOk("[OK] All nodes agree about slots " "configuration.\n"); } - // Check open slots + /* Check open slots */ clusterManagerLogInfo(">>> Check for open slots...\n"); listIter li; listRewind(cluster_manager.nodes, &li); @@ -4130,7 +4210,7 @@ static int clusterManagerCheckCluster(int quiet) { clusterManagerLogErr("%s.\n", (char *) errstr); sdsfree(errstr); if (do_fix) { - // Fix open slots. + /* Fix open slots. */ dictReleaseIterator(iter); iter = dictGetIterator(open_slots); while ((entry = dictNext(iter)) != NULL) { @@ -4165,6 +4245,49 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } + if (!consistent) { + /* Check whether there are multiple owners, even when slots are + * fully covered and there are no open slots. */ + clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); + int slot = 0; + for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + list *owners = listCreate(); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + /* Nodes having keys for the slot will be considered + * owners too. */ + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) listAddNodeTail(owners, n); + } + } + if (listLength(owners) > 1) { + result = 0; + clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n", + slot, listLength(owners)); + listRewind(owners, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + clusterManagerLogErr(" %s:%d\n", n->ip, n->port); + } + if (do_fix) { + result = clusterManagerFixMultipleSlotOwners(slot, owners); + if (!result) { + clusterManagerLogErr("Failed to fix multiple owners " + "for slot %d\n", slot); + listRelease(owners); + break; + } + } + } + listRelease(owners); + } + } return result; } From eaac9f9e930d8959d681b7d03d7411bfb18db3a7 Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 27 Nov 2018 12:26:56 +0100 Subject: [PATCH 2/6] Cluster Manager: code cleanup. --- src/redis-cli.c | 128 ++++++++++++++++-------------------------------- 1 file changed, 41 insertions(+), 87 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 9d8c0bca..a3eadf36 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,10 +2746,23 @@ cleanup: return success; } -static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { +static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER SETSLOT %d %s", slot, "STABLE"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerDelSlot(clusterManagerNode *node, int slot, + int ignore_unassigned_err) +{ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER DELSLOTS %d", slot); int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + ignore_unassigned_err && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); return success; } @@ -3658,24 +3671,18 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ - n->slots[atoi(slot)] = 1; + n->slots[s] = 1; fixed++; } } @@ -3691,6 +3698,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(single, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); assert(entry != NULL); list *nodes = (list *) dictGetVal(entry); @@ -3700,16 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3744,21 +3745,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "to %s:%d\n", slot, target->ip, target->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(target, s, 1); + if (!clusterManagerAddSlot(target, s)) fixed = -1; if (fixed < 0) goto cleanup; - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER SETSLOT %s %s", slot, "STABLE"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerClearSlotStatus(target, s)) + fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3770,23 +3762,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *src = nln->value; if (src == target) continue; /* Assign the slot to target node in the source node. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "NODE", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) - fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerSetSlot(src, target, s, + "IMPORTING", NULL)) fixed = -1; if (fixed < 0) goto cleanup; int opts = CLUSTER_MANAGER_OPT_VERBOSE | CLUSTER_MANAGER_OPT_COLD; @@ -3794,12 +3778,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s", slot, - "STABLE"); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerClearSlotStatus(src, s)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; } fixed++; @@ -3923,24 +3903,13 @@ static int clusterManagerFixOpenSlot(int slot) { // Use ADDSLOTS to assign the slot. clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", owner->ip, owner->port); - redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " - "SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; /* Ensure that the slot is unassigned before assigning it to the * owner. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(owner, slot, 1); if (!success) goto cleanup; - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerAddSlot(owner, slot); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3948,9 +3917,7 @@ static int clusterManagerFixOpenSlot(int slot) { /* Make sure this information will propagate. Not strictly needed * since there is no past owner, so all the other nodes will accept * whatever epoch this node will claim the slot with. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerBumpEpoch(owner); if (!success) goto cleanup; /* Remove the owner from the list of migrating/importing * nodes. */ @@ -3970,16 +3937,10 @@ static int clusterManagerFixOpenSlot(int slot) { * the owner has been set in the previous condition (owner == NULL). */ assert(owner != NULL); listRewind(owners, &li); - redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(n, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(n, slot, 1); if (!success) goto cleanup; n->slots[slot] = 0; /* Assign the slot to the owner in the node 'n' configuration.' */ @@ -4021,11 +3982,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; clusterManagerLogInfo(">>> Setting %d as STABLE in " "%s:%d\n", slot, n->ip, n->port); - - redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerClearSlotStatus(n, slot); if (!success) goto cleanup; } /* Since the slot has been moved in "cold" mode, ensure that all the @@ -4035,10 +3992,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerNode *n = ln->value; if (n == owner) continue; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } } else { @@ -4104,7 +4058,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { * could reply with an "already unassigned" error and if it should fail * for other reasons, it would lead to a failure in the follwing ADDSLOTS * command. */ - clusterManagerDelSlot(owner, slot); + clusterManagerDelSlot(owner, slot, 1); if (!clusterManagerAddSlot(owner, slot)) return 0; if (!clusterManagerBumpEpoch(owner)) return 0; listIter li; @@ -4120,7 +4074,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { int count = clusterManagerCountKeysInSlot(n, slot); success = (count >= 0); if (!success) break; - clusterManagerDelSlot(n, slot); + clusterManagerDelSlot(n, slot, 1); if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; if (count > 0) { int opts = CLUSTER_MANAGER_OPT_VERBOSE | From d5f7703367c83f46683d220fed785c18504dd5ca Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 28 Nov 2018 16:59:16 +0100 Subject: [PATCH 3/6] Cluster Manager: setting new slot owner is now handled atomically in 'fix' command. --- src/redis-cli.c | 103 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a3eadf36..b66ea9e6 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1929,6 +1929,7 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); /* Cluster Manager helper functions */ @@ -2188,6 +2189,38 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } +/* Execute MULTI command on a cluster node. */ +static int clusterManagerStartTransaction(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Execute EXEC command on a cluster node. */ +static int clusterManagerExecTransaction(clusterManagerNode *node, + clusterManagerOnReplyError onerror) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success) { + if (reply->type != REDIS_REPLY_ARRAY) { + success = 0; + goto cleanup; + } + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + success = clusterManagerCheckRedisReply(node, r, NULL); + if (!success && onerror) success = onerror(r, i); + if (!success) break; + } + } +cleanup: + if (reply) freeReplyObject(reply); + return success; +} + static int clusterManagerNodeConnect(clusterManagerNode *node) { if (node->context) redisFree(node->context); node->context = redisConnect(node->ip, node->port); @@ -2794,6 +2827,31 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } +static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { + if (bulk_idx == 0 && reply) { + if (reply->type == REDIS_REPLY_ERROR) + return strstr(reply->str, "already unassigned") != NULL; + } + return 0; +} + +static int clusterManagerSetSlotOwner(clusterManagerNode *owner, + int slot, + int do_clear) +{ + int success = clusterManagerStartTransaction(owner); + if (!success) return 0; + /* Ensure the slot is not already assigned. */ + clusterManagerDelSlot(owner, slot, 0); + /* Add the slot and bump epoch. */ + clusterManagerAddSlot(owner, slot); + if (do_clear) clusterManagerClearSlotStatus(owner, slot); + clusterManagerBumpEpoch(owner); + success = clusterManagerExecTransaction(owner, + clusterManagerIgnoreUnassignedErr); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3675,11 +3733,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[s] = 1; @@ -3707,11 +3764,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[atoi(slot)] = 1; @@ -3744,14 +3800,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(target, s, 1); - if (!clusterManagerAddSlot(target, s)) fixed = -1; - if (fixed < 0) goto cleanup; - if (!clusterManagerClearSlotStatus(target, s)) + if (!clusterManagerSetSlotOwner(target, s, 1)) { fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; - if (fixed < 0) goto cleanup; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ target->slots[atoi(slot)] = 1; @@ -3905,11 +3957,7 @@ static int clusterManagerFixOpenSlot(int slot) { owner->ip, owner->port); success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; - /* Ensure that the slot is unassigned before assigning it to the - * owner. */ - success = clusterManagerDelSlot(owner, slot, 1); - if (!success) goto cleanup; - success = clusterManagerAddSlot(owner, slot); + success = clusterManagerSetSlotOwner(owner, slot, 0); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -4052,15 +4100,8 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { if (!owner) owner = listFirst(owners)->value; clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", slot, owner->ip, owner->port); - /* Set the owner node by calling DELSLOTS in order to unassign the slot - * in case it's already assigned to another node and by finally calling - * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it - * could reply with an "already unassigned" error and if it should fail - * for other reasons, it would lead to a failure in the follwing ADDSLOTS - * command. */ - clusterManagerDelSlot(owner, slot, 1); - if (!clusterManagerAddSlot(owner, slot)) return 0; - if (!clusterManagerBumpEpoch(owner)) return 0; + /* Set the slot owner. */ + if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; listIter li; listNode *ln; listRewind(cluster_manager.nodes, &li); From 1a56fc913e70d61449c79b643f4ab241ece3cbec Mon Sep 17 00:00:00 2001 From: artix Date: Fri, 30 Nov 2018 20:48:52 +0100 Subject: [PATCH 4/6] Cluster Manager: 'fix' command now handles open slots with migrating state in one node and importing state in multiple nodes. --- src/redis-cli.c | 80 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index b66ea9e6..56d2ee0e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1846,7 +1846,7 @@ static int evalMode(int argc, char **argv) { if (eval_ldb) { if (!config.eval_ldb) { /* If the debugging session ended immediately, there was an - * error compiling the script. Show it and don't enter + * error compiling the script. Show it and they don't enter * the REPL at all. */ printf("Eval debugging session can't start:\n"); cliReadReply(0); @@ -4043,6 +4043,73 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } + } + /* Case 3: The slot is in migrating state in one node but multiple + * other nodes claim to be in importing state and don't have any key in + * the slot. We search for the importing node having the same ID as + * the destination node of the migrating node. + * In that case we move the slot from the migrating node to this node and + * we close the importing states on all the other importing nodes. + * If no importing node has the same ID as the destination node of the + * migrating node, the slot's state is closed on both the migrating node + * and the importing nodes. */ + else if (listLength(migrating) == 1 && listLength(importing) > 1) { + int try_to_fix = 1; + clusterManagerNode *src = listFirst(migrating)->value; + clusterManagerNode *dst = NULL; + sds target_id = NULL; + for (int i = 0; i < src->migrating_count; i += 2) { + sds migrating_slot = src->migrating[i]; + if (atoi(migrating_slot) == slot) { + target_id = src->migrating[i + 1]; + break; + } + } + assert(target_id != NULL); + listIter li; + listNode *ln; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + int count = clusterManagerCountKeysInSlot(n, slot); + if (count > 0) { + try_to_fix = 0; + break; + } + if (strcmp(n->name, target_id) == 0) dst = n; + } + if (!try_to_fix) goto unhandled_case; + if (dst != NULL) { + clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to " + "%s:%d and closing it on all the other " + "importing nodes.\n", + slot, src->ip, src->port, + dst->ip, dst->port); + /* Move the slot to the destination node. */ + success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); + if (!success) goto cleanup; + /* Close slot on all the other importing nodes. */ + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (dst == n) continue; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } else { + clusterManagerLogInfo(">>> Case 3: Closing slot %d on both " + "migrating and importing nodes.\n", slot); + /* Close the slot on both the migrating node and the importing + * nodes. */ + success = clusterManagerClearSlotStatus(src, slot); + if (!success) goto cleanup; + listRewind(importing, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + success = clusterManagerClearSlotStatus(n, slot); + if (!success) goto cleanup; + } + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); @@ -4059,13 +4126,13 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; } } - /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key or is the - * slot owner. We can just close the slot, probably a reshard interrupted - * in the middle. */ + /* Case 4: There are no slots claiming to be in importing state, but + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard + * interrupted in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n", slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); @@ -4073,6 +4140,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } else { +unhandled_case: success = 0; clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot " "yet (work in progress). Slot is set as " From 0c1336caf4ba34ee7a41c21fb12756b542a0d2e8 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 5 Dec 2018 20:09:09 +0100 Subject: [PATCH 5/6] Cluster Manager: - FixOpenSlot now correctly updates in-memory cluster configuration. - Improved output messages. --- src/redis-cli.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 56d2ee0e..eeeaddc6 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2211,8 +2211,14 @@ static int clusterManagerExecTransaction(clusterManagerNode *node, size_t i; for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; - success = clusterManagerCheckRedisReply(node, r, NULL); + char *err = NULL; + success = clusterManagerCheckRedisReply(node, r, &err); if (!success && onerror) success = onerror(r, i); + if (err) { + if (!success) + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } if (!success) break; } } @@ -2792,10 +2798,15 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot, { redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER DELSLOTS %d", slot); - int success = clusterManagerCheckRedisReply(node, reply, NULL); + char *err = NULL; + int success = clusterManagerCheckRedisReply(node, reply, &err); if (!success && reply && reply->type == REDIS_REPLY_ERROR && ignore_unassigned_err && strstr(reply->str, "already unassigned") != NULL) success = 1; + if (!success && err != NULL) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); + zfree(err); + } if (reply) freeReplyObject(reply); return success; } @@ -2842,7 +2853,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, int success = clusterManagerStartTransaction(owner); if (!success) return 0; /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(owner, slot, 0); + clusterManagerDelSlot(owner, slot, 1); /* Add the slot and bump epoch. */ clusterManagerAddSlot(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot); @@ -3747,7 +3758,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { /* Handle case "2": keys only in one node. */ if (listLength(single) > 0) { - printf("The following uncovered slots have keys in just one node:\n"); + printf("The following uncovered slots have keys in just one node:\n"); clusterManagerPrintSlotsList(single); if (confirmWithYes("Fix these slots by covering with those nodes?")){ listIter li; @@ -4012,6 +4023,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogInfo(">>> Case 1: Moving slot %d from " "%s:%d to %s:%d\n", slot, src->ip, src->port, dst->ip, dst->port); + move_opts |= CLUSTER_MANAGER_OPT_UPDATE; success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -4166,7 +4178,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { slot, NULL); if (!owner) owner = listFirst(owners)->value; - clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", + clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n", slot, owner->ip, owner->port); /* Set the slot owner. */ if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; From 27ddb2ba3a8759b306501882bd76760640e6705a Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 10 Dec 2018 18:01:13 +0100 Subject: [PATCH 6/6] Cluster Manager: - Multiple owners checking in 'fix'/'check' commands is now optional (using --cluster-search-multiple-owners). - Updated help. --- src/redis-cli.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index eeeaddc6..a3fb065d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -117,6 +117,7 @@ #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6 #define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8 +#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 @@ -1378,6 +1379,9 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; + } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1991,14 +1995,17 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", "replicas "}, - {"check", clusterManagerCommandCheck, -1, "host:port", NULL}, + {"check", clusterManagerCommandCheck, -1, "host:port", + "search-multiple-owners"}, {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, - {"fix", clusterManagerCommandFix, -1, "host:port", NULL}, + {"fix", clusterManagerCommandFix, -1, "host:port", + "search-multiple-owners"}, {"reshard", clusterManagerCommandReshard, -1, "host:port", - "from ,to ,slots ,yes,timeout ,pipeline "}, + "from ,to ,slots ,yes,timeout ,pipeline ," + "replace"}, {"rebalance", clusterManagerCommandRebalance, -1, "host:port", "weight ,use-empty-masters," - "timeout ,simulate,pipeline ,threshold "}, + "timeout ,simulate,pipeline ,threshold ,replace"}, {"add-node", clusterManagerCommandAddNode, 2, "new_host:new_port existing_host:existing_port", "slave,master-id "}, {"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL}, @@ -4320,7 +4327,9 @@ static int clusterManagerCheckCluster(int quiet) { if (fixed > 0) result = 1; } } - if (!consistent) { + int search_multiple_owners = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS; + if (search_multiple_owners) { /* Check whether there are multiple owners, even when slots are * fully covered and there are no open slots. */ clusterManagerLogInfo(">>> Check for multiple slot owners...\n");