From d5f7703367c83f46683d220fed785c18504dd5ca Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 28 Nov 2018 16:59:16 +0100 Subject: [PATCH] Cluster Manager: setting new slot owner is now handled atomically in 'fix' command. --- src/redis-cli.c | 103 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index a3eadf36..b66ea9e6 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1929,6 +1929,7 @@ static dictType clusterManagerDictType = { }; typedef int clusterManagerCommandProc(int argc, char **argv); +typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); /* Cluster Manager helper functions */ @@ -2188,6 +2189,38 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n, return 1; } +/* Execute MULTI command on a cluster node. */ +static int clusterManagerStartTransaction(clusterManagerNode *node) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +/* Execute EXEC command on a cluster node. */ +static int clusterManagerExecTransaction(clusterManagerNode *node, + clusterManagerOnReplyError onerror) +{ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (success) { + if (reply->type != REDIS_REPLY_ARRAY) { + success = 0; + goto cleanup; + } + size_t i; + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + success = clusterManagerCheckRedisReply(node, r, NULL); + if (!success && onerror) success = onerror(r, i); + if (!success) break; + } + } +cleanup: + if (reply) freeReplyObject(reply); + return success; +} + static int clusterManagerNodeConnect(clusterManagerNode *node) { if (node->context) redisFree(node->context); node->context = redisConnect(node->ip, node->port); @@ -2794,6 +2827,31 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) { return success; } +static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { + if (bulk_idx == 0 && reply) { + if (reply->type == REDIS_REPLY_ERROR) + return strstr(reply->str, "already unassigned") != NULL; + } + return 0; +} + +static int clusterManagerSetSlotOwner(clusterManagerNode *owner, + int slot, + int do_clear) +{ + int success = clusterManagerStartTransaction(owner); + if (!success) return 0; + /* Ensure the slot is not already assigned. */ + clusterManagerDelSlot(owner, slot, 0); + /* Add the slot and bump epoch. */ + clusterManagerAddSlot(owner, slot); + if (do_clear) clusterManagerClearSlotStatus(owner, slot); + clusterManagerBumpEpoch(owner); + success = clusterManagerExecTransaction(owner, + clusterManagerIgnoreUnassignedErr); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3675,11 +3733,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[s] = 1; @@ -3707,11 +3764,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(n, s, 1); - if (!clusterManagerAddSlot(n, s)) fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; - if (fixed < 0) goto cleanup; + if (!clusterManagerSetSlotOwner(n, s, 0)) { + fixed = -1; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ n->slots[atoi(slot)] = 1; @@ -3744,14 +3800,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); - /* Ensure the slot is not already assigned. */ - clusterManagerDelSlot(target, s, 1); - if (!clusterManagerAddSlot(target, s)) fixed = -1; - if (fixed < 0) goto cleanup; - if (!clusterManagerClearSlotStatus(target, s)) + if (!clusterManagerSetSlotOwner(target, s, 1)) { fixed = -1; - if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; - if (fixed < 0) goto cleanup; + goto cleanup; + } /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ target->slots[atoi(slot)] = 1; @@ -3905,11 +3957,7 @@ static int clusterManagerFixOpenSlot(int slot) { owner->ip, owner->port); success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; - /* Ensure that the slot is unassigned before assigning it to the - * owner. */ - success = clusterManagerDelSlot(owner, slot, 1); - if (!success) goto cleanup; - success = clusterManagerAddSlot(owner, slot); + success = clusterManagerSetSlotOwner(owner, slot, 0); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -4052,15 +4100,8 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { if (!owner) owner = listFirst(owners)->value; clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", slot, owner->ip, owner->port); - /* Set the owner node by calling DELSLOTS in order to unassign the slot - * in case it's already assigned to another node and by finally calling - * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it - * could reply with an "already unassigned" error and if it should fail - * for other reasons, it would lead to a failure in the follwing ADDSLOTS - * command. */ - clusterManagerDelSlot(owner, slot, 1); - if (!clusterManagerAddSlot(owner, slot)) return 0; - if (!clusterManagerBumpEpoch(owner)) return 0; + /* Set the slot owner. */ + if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0; listIter li; listNode *ln; listRewind(cluster_manager.nodes, &li);