Cluster Manager: setting new slot owner is now handled atomically

in 'fix' command.
This commit is contained in:
artix 2018-11-28 16:59:16 +01:00
parent eaac9f9e93
commit d5f7703367

View File

@ -1929,6 +1929,7 @@ static dictType clusterManagerDictType = {
};
typedef int clusterManagerCommandProc(int argc, char **argv);
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
/* Cluster Manager helper functions */
@ -2188,6 +2189,38 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1;
}
/* Execute MULTI command on a cluster node. */
static int clusterManagerStartTransaction(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
/* Execute EXEC command on a cluster node. */
static int clusterManagerExecTransaction(clusterManagerNode *node,
clusterManagerOnReplyError onerror)
{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (success) {
if (reply->type != REDIS_REPLY_ARRAY) {
success = 0;
goto cleanup;
}
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
success = clusterManagerCheckRedisReply(node, r, NULL);
if (!success && onerror) success = onerror(r, i);
if (!success) break;
}
}
cleanup:
if (reply) freeReplyObject(reply);
return success;
}
static int clusterManagerNodeConnect(clusterManagerNode *node) {
if (node->context) redisFree(node->context);
node->context = redisConnect(node->ip, node->port);
@ -2794,6 +2827,31 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) {
return success;
}
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
if (bulk_idx == 0 && reply) {
if (reply->type == REDIS_REPLY_ERROR)
return strstr(reply->str, "already unassigned") != NULL;
}
return 0;
}
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
int slot,
int do_clear)
{
int success = clusterManagerStartTransaction(owner);
if (!success) return 0;
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(owner, slot, 0);
/* Add the slot and bump epoch. */
clusterManagerAddSlot(owner, slot);
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
clusterManagerBumpEpoch(owner);
success = clusterManagerExecTransaction(owner,
clusterManagerIgnoreUnassignedErr);
return success;
}
/* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */
@ -3675,11 +3733,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(n, s, 1);
if (!clusterManagerAddSlot(n, s)) fixed = -1;
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
if (fixed < 0) goto cleanup;
if (!clusterManagerSetSlotOwner(n, s, 0)) {
fixed = -1;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
n->slots[s] = 1;
@ -3707,11 +3764,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(n, s, 1);
if (!clusterManagerAddSlot(n, s)) fixed = -1;
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
if (fixed < 0) goto cleanup;
if (!clusterManagerSetSlotOwner(n, s, 0)) {
fixed = -1;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
n->slots[atoi(slot)] = 1;
@ -3744,14 +3800,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot,
target->ip, target->port);
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(target, s, 1);
if (!clusterManagerAddSlot(target, s)) fixed = -1;
if (fixed < 0) goto cleanup;
if (!clusterManagerClearSlotStatus(target, s))
if (!clusterManagerSetSlotOwner(target, s, 1)) {
fixed = -1;
if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1;
if (fixed < 0) goto cleanup;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
target->slots[atoi(slot)] = 1;
@ -3905,11 +3957,7 @@ static int clusterManagerFixOpenSlot(int slot) {
owner->ip, owner->port);
success = clusterManagerClearSlotStatus(owner, slot);
if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the
* owner. */
success = clusterManagerDelSlot(owner, slot, 1);
if (!success) goto cleanup;
success = clusterManagerAddSlot(owner, slot);
success = clusterManagerSetSlotOwner(owner, slot, 0);
if (!success) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@ -4052,15 +4100,8 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
if (!owner) owner = listFirst(owners)->value;
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d",
slot, owner->ip, owner->port);
/* Set the owner node by calling DELSLOTS in order to unassign the slot
* in case it's already assigned to another node and by finally calling
* ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it
* could reply with an "already unassigned" error and if it should fail
* for other reasons, it would lead to a failure in the follwing ADDSLOTS
* command. */
clusterManagerDelSlot(owner, slot, 1);
if (!clusterManagerAddSlot(owner, slot)) return 0;
if (!clusterManagerBumpEpoch(owner)) return 0;
/* Set the slot owner. */
if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);