mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Cluster Manager: further improvements to "fix":
- clusterManagerFixOpenSlot: ensure that the slot is unassigned before ADDSLOTS - clusterManagerFixSlotsCoverage: after cold migration, the slot configuration is now updated on all the nodes.
This commit is contained in:
parent
d6f0a9ac72
commit
18ddbf0352
@ -3589,10 +3589,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerNode *n = node_n->value;
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3620,10 +3627,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerNode *n = fn->value;
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3657,7 +3671,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||
"to %s:%d\n", slot,
|
||||
target->ip, target->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
@ -3666,6 +3684,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3768,7 +3789,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
sds migrating_slot = n->migrating[i];
|
||||
if (atoi(migrating_slot) == slot) {
|
||||
char *sep = (listLength(migrating) == 0 ? "" : ",");
|
||||
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
|
||||
migrating_str = sdscatfmt(migrating_str, "%s%s:%u",
|
||||
sep, n->ip, n->port);
|
||||
listAddNodeTail(migrating, n);
|
||||
is_migrating = 1;
|
||||
@ -3781,7 +3802,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
sds importing_slot = n->importing[i];
|
||||
if (atoi(importing_slot) == slot) {
|
||||
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
||||
importing_str = sdscatfmt(importing_str, "%s%s:%u",
|
||||
sep, n->ip, n->port);
|
||||
listAddNodeTail(importing, n);
|
||||
is_importing = 1;
|
||||
@ -3809,8 +3830,10 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
printf("Set as migrating in: %s\n", migrating_str);
|
||||
printf("Set as importing in: %s\n", importing_str);
|
||||
if (sdslen(migrating_str) > 0)
|
||||
printf("Set as migrating in: %s\n", migrating_str);
|
||||
if (sdslen(importing_str) > 0)
|
||||
printf("Set as importing in: %s\n", importing_str);
|
||||
/* If there is no slot owner, set as owner the node with the biggest
|
||||
* number of keys, among the set of migrating / importing nodes. */
|
||||
if (owner == NULL) {
|
||||
@ -3872,7 +3895,9 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
* in migrating state, since migrating is a valid state only for
|
||||
* slot owners. */
|
||||
if (listLength(owners) > 1) {
|
||||
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
|
||||
/* Owner cannot be NULL at this point, since if there are more owners,
|
||||
* the owner has been set in the previous condition (owner == NULL). */
|
||||
assert(owner != NULL);
|
||||
listRewind(owners, &li);
|
||||
redisReply *reply = NULL;
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
@ -3897,10 +3922,6 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
/* Ensure that the node is not in the migrating list. */
|
||||
clusterManagerRemoveNodeFromList(migrating, n);
|
||||
}
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
|
||||
/* Case 1: The slot is in migrating state in one node, and in
|
||||
@ -3908,6 +3929,9 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (listLength(migrating) == 1 && listLength(importing) == 1) {
|
||||
clusterManagerNode *src = listFirst(migrating)->value;
|
||||
clusterManagerNode *dst = listFirst(importing)->value;
|
||||
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||
"%s:%d to %s:%d\n", slot,
|
||||
src->ip, src->port, dst->ip, dst->port);
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
}
|
||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||
@ -3915,7 +3939,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
* the slot. In this case we just move all the keys to the owner
|
||||
* according to the configuration. */
|
||||
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
|
||||
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
|
||||
clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its "
|
||||
"owner %s:%d\n", slot, owner->ip, owner->port);
|
||||
move_opts |= CLUSTER_MANAGER_OPT_COLD;
|
||||
listRewind(importing, &li);
|
||||
@ -3933,6 +3957,18 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||
* other nodes update their own configuration about the slot itself. */
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
} else {
|
||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||
listLength(migrating) == 1);
|
||||
@ -3955,6 +3991,8 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
* in the middle. */
|
||||
if (try_to_close_slot) {
|
||||
clusterManagerNode *n = listFirst(migrating)->value;
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
@ -4083,6 +4121,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
result = 0;
|
||||
if (do_fix/* && result*/) {
|
||||
dictType dtype = clusterManagerDictType;
|
||||
dtype.keyDestructor = dictSdsDestructor;
|
||||
dtype.valDestructor = dictListDestructor;
|
||||
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
|
||||
int fixed = clusterManagerFixSlotsCoverage(slots);
|
||||
|
Loading…
x
Reference in New Issue
Block a user