mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Cluster Manager: code cleanup.
This commit is contained in:
parent
5bf13eaaf8
commit
eaac9f9e93
128
src/redis-cli.c
128
src/redis-cli.c
@ -2746,10 +2746,23 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerDelSlot(clusterManagerNode *node, int slot) {
|
||||
static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER SETSLOT %d %s", slot, "STABLE");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
|
||||
int ignore_unassigned_err)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER DELSLOTS %d", slot);
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
ignore_unassigned_err &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
@ -3658,24 +3671,18 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
listRewind(none, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
clusterManagerDelSlot(n, s, 1);
|
||||
if (!clusterManagerAddSlot(n, s)) fixed = -1;
|
||||
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
n->slots[atoi(slot)] = 1;
|
||||
n->slots[s] = 1;
|
||||
fixed++;
|
||||
}
|
||||
}
|
||||
@ -3691,6 +3698,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
listRewind(single, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||
assert(entry != NULL);
|
||||
list *nodes = (list *) dictGetVal(entry);
|
||||
@ -3700,16 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
clusterManagerDelSlot(n, s, 1);
|
||||
if (!clusterManagerAddSlot(n, s)) fixed = -1;
|
||||
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3744,21 +3745,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
"to %s:%d\n", slot,
|
||||
target->ip, target->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
clusterManagerDelSlot(target, s, 1);
|
||||
if (!clusterManagerAddSlot(target, s)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (!clusterManagerClearSlotStatus(target, s))
|
||||
fixed = -1;
|
||||
if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3770,23 +3762,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerNode *src = nln->value;
|
||||
if (src == target) continue;
|
||||
/* Assign the slot to target node in the source node. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"NODE", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Set the source node in 'importing' state
|
||||
* (even if we will actually migrate keys away)
|
||||
* in order to avoid receiving redirections
|
||||
* for MIGRATE. */
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"IMPORTING", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (!clusterManagerSetSlot(src, target, s,
|
||||
"IMPORTING", NULL)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
@ -3794,12 +3778,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s", slot,
|
||||
"STABLE");
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerClearSlotStatus(src, s))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
}
|
||||
fixed++;
|
||||
@ -3923,24 +3903,13 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
// Use ADDSLOTS to assign the slot.
|
||||
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
|
||||
owner->ip, owner->port);
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
|
||||
"SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerClearSlotStatus(owner, slot);
|
||||
if (!success) goto cleanup;
|
||||
/* Ensure that the slot is unassigned before assigning it to the
|
||||
* owner. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerDelSlot(owner, slot, 1);
|
||||
if (!success) goto cleanup;
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerAddSlot(owner, slot);
|
||||
if (!success) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3948,9 +3917,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
/* Make sure this information will propagate. Not strictly needed
|
||||
* since there is no past owner, so all the other nodes will accept
|
||||
* whatever epoch this node will claim the slot with. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerBumpEpoch(owner);
|
||||
if (!success) goto cleanup;
|
||||
/* Remove the owner from the list of migrating/importing
|
||||
* nodes. */
|
||||
@ -3970,16 +3937,10 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
* the owner has been set in the previous condition (owner == NULL). */
|
||||
assert(owner != NULL);
|
||||
listRewind(owners, &li);
|
||||
redisReply *reply = NULL;
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerDelSlot(n, slot, 1);
|
||||
if (!success) goto cleanup;
|
||||
n->slots[slot] = 0;
|
||||
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||
@ -4021,11 +3982,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (!success) goto cleanup;
|
||||
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
||||
"%s:%d\n", slot, n->ip, n->port);
|
||||
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||
@ -4035,10 +3992,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
} else {
|
||||
@ -4104,7 +4058,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||
* could reply with an "already unassigned" error and if it should fail
|
||||
* for other reasons, it would lead to a failure in the follwing ADDSLOTS
|
||||
* command. */
|
||||
clusterManagerDelSlot(owner, slot);
|
||||
clusterManagerDelSlot(owner, slot, 1);
|
||||
if (!clusterManagerAddSlot(owner, slot)) return 0;
|
||||
if (!clusterManagerBumpEpoch(owner)) return 0;
|
||||
listIter li;
|
||||
@ -4120,7 +4074,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
success = (count >= 0);
|
||||
if (!success) break;
|
||||
clusterManagerDelSlot(n, slot);
|
||||
clusterManagerDelSlot(n, slot, 1);
|
||||
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
|
||||
if (count > 0) {
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
|
Loading…
x
Reference in New Issue
Block a user