mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Cluster Manager: check/fix commands now handle multiple owners even if
all slots are covered and not open.
This commit is contained in:
parent
5bfd8ae253
commit
5bf13eaaf8
135
src/redis-cli.c
135
src/redis-cli.c
@ -2746,6 +2746,41 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerDelSlot(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER DELSLOTS %d", slot);
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER ADDSLOTS %d", slot);
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
|
||||
int slot)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||
int count = -1;
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
|
||||
if (reply) freeReplyObject(reply);
|
||||
return count;
|
||||
}
|
||||
|
||||
static int clusterManagerBumpEpoch(clusterManagerNode *node) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||
* is not NULL, a dot will be printed for every migrated key. */
|
||||
@ -4053,17 +4088,62 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||
clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
|
||||
int success = 0;
|
||||
assert(listLength(owners) > 1);
|
||||
clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
|
||||
slot,
|
||||
NULL);
|
||||
if (!owner) owner = listFirst(owners)->value;
|
||||
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d",
|
||||
slot, owner->ip, owner->port);
|
||||
/* Set the owner node by calling DELSLOTS in order to unassign the slot
|
||||
* in case it's already assigned to another node and by finally calling
|
||||
* ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it
|
||||
* could reply with an "already unassigned" error and if it should fail
|
||||
* for other reasons, it would lead to a failure in the follwing ADDSLOTS
|
||||
* command. */
|
||||
clusterManagerDelSlot(owner, slot);
|
||||
if (!clusterManagerAddSlot(owner, slot)) return 0;
|
||||
if (!clusterManagerBumpEpoch(owner)) return 0;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
/* Update configuration in all the other master nodes by assigning the slot
|
||||
* itself to the new owner, and by eventually migrating keys if the node
|
||||
* has keys for the slot. */
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
success = (count >= 0);
|
||||
if (!success) break;
|
||||
clusterManagerDelSlot(n, slot);
|
||||
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
|
||||
if (count > 0) {
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
|
||||
if (!success) break;
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerCheckCluster(int quiet) {
|
||||
listNode *ln = listFirst(cluster_manager.nodes);
|
||||
if (!ln) return 0;
|
||||
int result = 1;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
clusterManagerNode *node = ln->value;
|
||||
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
||||
node->ip, node->port);
|
||||
int result = 1, consistent = 0;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
if (!quiet) clusterManagerShowNodes();
|
||||
if (!clusterManagerIsConfigConsistent()) {
|
||||
consistent = clusterManagerIsConfigConsistent();
|
||||
if (!consistent) {
|
||||
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
||||
clusterManagerOnError(err);
|
||||
result = 0;
|
||||
@ -4071,7 +4151,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
clusterManagerLogOk("[OK] All nodes agree about slots "
|
||||
"configuration.\n");
|
||||
}
|
||||
// Check open slots
|
||||
/* Check open slots */
|
||||
clusterManagerLogInfo(">>> Check for open slots...\n");
|
||||
listIter li;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
@ -4130,7 +4210,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
clusterManagerLogErr("%s.\n", (char *) errstr);
|
||||
sdsfree(errstr);
|
||||
if (do_fix) {
|
||||
// Fix open slots.
|
||||
/* Fix open slots. */
|
||||
dictReleaseIterator(iter);
|
||||
iter = dictGetIterator(open_slots);
|
||||
while ((entry = dictNext(iter)) != NULL) {
|
||||
@ -4165,6 +4245,49 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
if (fixed > 0) result = 1;
|
||||
}
|
||||
}
|
||||
if (!consistent) {
|
||||
/* Check whether there are multiple owners, even when slots are
|
||||
* fully covered and there are no open slots. */
|
||||
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
||||
int slot = 0;
|
||||
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
list *owners = listCreate();
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||
else {
|
||||
/* Nodes having keys for the slot will be considered
|
||||
* owners too. */
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
if (count > 0) listAddNodeTail(owners, n);
|
||||
}
|
||||
}
|
||||
if (listLength(owners) > 1) {
|
||||
result = 0;
|
||||
clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
|
||||
slot, listLength(owners));
|
||||
listRewind(owners, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
||||
}
|
||||
if (do_fix) {
|
||||
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
||||
if (!result) {
|
||||
clusterManagerLogErr("Failed to fix multiple owners "
|
||||
"for slot %d\n", slot);
|
||||
listRelease(owners);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
listRelease(owners);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user