mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Merge pull request #5681 from artix75/cluster_manager_fix_cmd
Cluster manager fix cmd
This commit is contained in:
commit
086363babf
429
src/redis-cli.c
429
src/redis-cli.c
@ -117,6 +117,7 @@
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9
|
||||
|
||||
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
|
||||
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
|
||||
@ -1378,6 +1379,9 @@ static int parseOptions(int argc, char **argv) {
|
||||
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
|
||||
} else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
||||
sds version = cliVersion();
|
||||
printf("redis-cli %s\n", version);
|
||||
@ -1846,7 +1850,7 @@ static int evalMode(int argc, char **argv) {
|
||||
if (eval_ldb) {
|
||||
if (!config.eval_ldb) {
|
||||
/* If the debugging session ended immediately, there was an
|
||||
* error compiling the script. Show it and don't enter
|
||||
* error compiling the script. Show it and they don't enter
|
||||
* the REPL at all. */
|
||||
printf("Eval debugging session can't start:\n");
|
||||
cliReadReply(0);
|
||||
@ -1929,6 +1933,7 @@ static dictType clusterManagerDictType = {
|
||||
};
|
||||
|
||||
typedef int clusterManagerCommandProc(int argc, char **argv);
|
||||
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
|
||||
|
||||
/* Cluster Manager helper functions */
|
||||
|
||||
@ -1990,14 +1995,17 @@ typedef struct clusterManagerCommandDef {
|
||||
clusterManagerCommandDef clusterManagerCommands[] = {
|
||||
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
||||
"replicas <arg>"},
|
||||
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
||||
{"check", clusterManagerCommandCheck, -1, "host:port",
|
||||
"search-multiple-owners"},
|
||||
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
||||
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
|
||||
{"fix", clusterManagerCommandFix, -1, "host:port",
|
||||
"search-multiple-owners"},
|
||||
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
|
||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
|
||||
"replace"},
|
||||
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
|
||||
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
|
||||
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
|
||||
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
|
||||
{"add-node", clusterManagerCommandAddNode, 2,
|
||||
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
|
||||
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
|
||||
@ -2188,6 +2196,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Execute MULTI command on a cluster node. */
|
||||
static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Execute EXEC command on a cluster node. */
|
||||
static int clusterManagerExecTransaction(clusterManagerNode *node,
|
||||
clusterManagerOnReplyError onerror)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (success) {
|
||||
if (reply->type != REDIS_REPLY_ARRAY) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
size_t i;
|
||||
for (i = 0; i < reply->elements; i++) {
|
||||
redisReply *r = reply->element[i];
|
||||
char *err = NULL;
|
||||
success = clusterManagerCheckRedisReply(node, r, &err);
|
||||
if (!success && onerror) success = onerror(r, i);
|
||||
if (err) {
|
||||
if (!success)
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||
zfree(err);
|
||||
}
|
||||
if (!success) break;
|
||||
}
|
||||
}
|
||||
cleanup:
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
||||
if (node->context) redisFree(node->context);
|
||||
node->context = redisConnect(node->ip, node->port);
|
||||
@ -2746,6 +2792,84 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER SETSLOT %d %s", slot, "STABLE");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
|
||||
int ignore_unassigned_err)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER DELSLOTS %d", slot);
|
||||
char *err = NULL;
|
||||
int success = clusterManagerCheckRedisReply(node, reply, &err);
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
ignore_unassigned_err &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (!success && err != NULL) {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||
zfree(err);
|
||||
}
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER ADDSLOTS %d", slot);
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
|
||||
int slot)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||
int count = -1;
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
|
||||
if (reply) freeReplyObject(reply);
|
||||
return count;
|
||||
}
|
||||
|
||||
static int clusterManagerBumpEpoch(clusterManagerNode *node) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
|
||||
if (bulk_idx == 0 && reply) {
|
||||
if (reply->type == REDIS_REPLY_ERROR)
|
||||
return strstr(reply->str, "already unassigned") != NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
||||
int slot,
|
||||
int do_clear)
|
||||
{
|
||||
int success = clusterManagerStartTransaction(owner);
|
||||
if (!success) return 0;
|
||||
/* Ensure the slot is not already assigned. */
|
||||
clusterManagerDelSlot(owner, slot, 1);
|
||||
/* Add the slot and bump epoch. */
|
||||
clusterManagerAddSlot(owner, slot);
|
||||
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
|
||||
clusterManagerBumpEpoch(owner);
|
||||
success = clusterManagerExecTransaction(owner,
|
||||
clusterManagerIgnoreUnassignedErr);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||
* is not NULL, a dot will be printed for every migrated key. */
|
||||
@ -3623,24 +3747,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
listRewind(none, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
n->slots[atoi(slot)] = 1;
|
||||
n->slots[s] = 1;
|
||||
fixed++;
|
||||
}
|
||||
}
|
||||
@ -3648,7 +3765,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
|
||||
/* Handle case "2": keys only in one node. */
|
||||
if (listLength(single) > 0) {
|
||||
printf("The following uncovered slots have keys in just one node:\n");
|
||||
printf("The following uncovered slots have keys in just one node:\n");
|
||||
clusterManagerPrintSlotsList(single);
|
||||
if (confirmWithYes("Fix these slots by covering with those nodes?")){
|
||||
listIter li;
|
||||
@ -3656,6 +3773,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
listRewind(single, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||
assert(entry != NULL);
|
||||
list *nodes = (list *) dictGetVal(entry);
|
||||
@ -3664,18 +3782,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerNode *n = fn->value;
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
n->slots[atoi(slot)] = 1;
|
||||
@ -3708,23 +3818,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||
"to %s:%d\n", slot,
|
||||
target->ip, target->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(target, s, 1)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
target->slots[atoi(slot)] = 1;
|
||||
@ -3735,23 +3832,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
clusterManagerNode *src = nln->value;
|
||||
if (src == target) continue;
|
||||
/* Assign the slot to target node in the source node. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"NODE", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Set the source node in 'importing' state
|
||||
* (even if we will actually migrate keys away)
|
||||
* in order to avoid receiving redirections
|
||||
* for MIGRATE. */
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"IMPORTING", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (!clusterManagerSetSlot(src, target, s,
|
||||
"IMPORTING", NULL)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
@ -3759,12 +3848,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s", slot,
|
||||
"STABLE");
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerClearSlotStatus(src, s))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
}
|
||||
fixed++;
|
||||
@ -3888,24 +3973,9 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
// Use ADDSLOTS to assign the slot.
|
||||
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
|
||||
owner->ip, owner->port);
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
|
||||
"SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerClearSlotStatus(owner, slot);
|
||||
if (!success) goto cleanup;
|
||||
/* Ensure that the slot is unassigned before assigning it to the
|
||||
* owner. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerSetSlotOwner(owner, slot, 0);
|
||||
if (!success) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
@ -3913,9 +3983,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
/* Make sure this information will propagate. Not strictly needed
|
||||
* since there is no past owner, so all the other nodes will accept
|
||||
* whatever epoch this node will claim the slot with. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerBumpEpoch(owner);
|
||||
if (!success) goto cleanup;
|
||||
/* Remove the owner from the list of migrating/importing
|
||||
* nodes. */
|
||||
@ -3935,16 +4003,10 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
* the owner has been set in the previous condition (owner == NULL). */
|
||||
assert(owner != NULL);
|
||||
listRewind(owners, &li);
|
||||
redisReply *reply = NULL;
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerDelSlot(n, slot, 1);
|
||||
if (!success) goto cleanup;
|
||||
n->slots[slot] = 0;
|
||||
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||
@ -3968,6 +4030,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||
"%s:%d to %s:%d\n", slot,
|
||||
src->ip, src->port, dst->ip, dst->port);
|
||||
move_opts |= CLUSTER_MANAGER_OPT_UPDATE;
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
}
|
||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||
@ -3986,11 +4049,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (!success) goto cleanup;
|
||||
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
||||
"%s:%d\n", slot, n->ip, n->port);
|
||||
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||
@ -4000,12 +4059,76 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
/* Case 3: The slot is in migrating state in one node but multiple
|
||||
* other nodes claim to be in importing state and don't have any key in
|
||||
* the slot. We search for the importing node having the same ID as
|
||||
* the destination node of the migrating node.
|
||||
* In that case we move the slot from the migrating node to this node and
|
||||
* we close the importing states on all the other importing nodes.
|
||||
* If no importing node has the same ID as the destination node of the
|
||||
* migrating node, the slot's state is closed on both the migrating node
|
||||
* and the importing nodes. */
|
||||
else if (listLength(migrating) == 1 && listLength(importing) > 1) {
|
||||
int try_to_fix = 1;
|
||||
clusterManagerNode *src = listFirst(migrating)->value;
|
||||
clusterManagerNode *dst = NULL;
|
||||
sds target_id = NULL;
|
||||
for (int i = 0; i < src->migrating_count; i += 2) {
|
||||
sds migrating_slot = src->migrating[i];
|
||||
if (atoi(migrating_slot) == slot) {
|
||||
target_id = src->migrating[i + 1];
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(target_id != NULL);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
if (count > 0) {
|
||||
try_to_fix = 0;
|
||||
break;
|
||||
}
|
||||
if (strcmp(n->name, target_id) == 0) dst = n;
|
||||
}
|
||||
if (!try_to_fix) goto unhandled_case;
|
||||
if (dst != NULL) {
|
||||
clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to "
|
||||
"%s:%d and closing it on all the other "
|
||||
"importing nodes.\n",
|
||||
slot, src->ip, src->port,
|
||||
dst->ip, dst->port);
|
||||
/* Move the slot to the destination node. */
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
if (!success) goto cleanup;
|
||||
/* Close slot on all the other importing nodes. */
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (dst == n) continue;
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
} else {
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on both "
|
||||
"migrating and importing nodes.\n", slot);
|
||||
/* Close the slot on both the migrating node and the importing
|
||||
* nodes. */
|
||||
success = clusterManagerClearSlotStatus(src, slot);
|
||||
if (!success) goto cleanup;
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||
listLength(migrating) == 1);
|
||||
@ -4022,13 +4145,13 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
/* Case 3: There are no slots claiming to be in importing state, but
|
||||
* there is a migrating node that actually don't have any key or is the
|
||||
* slot owner. We can just close the slot, probably a reshard interrupted
|
||||
* in the middle. */
|
||||
/* Case 4: There are no slots claiming to be in importing state, but
|
||||
* there is a migrating node that actually don't have any key or is the
|
||||
* slot owner. We can just close the slot, probably a reshard
|
||||
* interrupted in the middle. */
|
||||
if (try_to_close_slot) {
|
||||
clusterManagerNode *n = listFirst(migrating)->value;
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
||||
clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
@ -4036,6 +4159,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
} else {
|
||||
unhandled_case:
|
||||
success = 0;
|
||||
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
|
||||
"yet (work in progress). Slot is set as "
|
||||
@ -4053,17 +4177,55 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||
clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
|
||||
int success = 0;
|
||||
assert(listLength(owners) > 1);
|
||||
clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
|
||||
slot,
|
||||
NULL);
|
||||
if (!owner) owner = listFirst(owners)->value;
|
||||
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n",
|
||||
slot, owner->ip, owner->port);
|
||||
/* Set the slot owner. */
|
||||
if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
/* Update configuration in all the other master nodes by assigning the slot
|
||||
* itself to the new owner, and by eventually migrating keys if the node
|
||||
* has keys for the slot. */
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
success = (count >= 0);
|
||||
if (!success) break;
|
||||
clusterManagerDelSlot(n, slot, 1);
|
||||
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
|
||||
if (count > 0) {
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
|
||||
if (!success) break;
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerCheckCluster(int quiet) {
|
||||
listNode *ln = listFirst(cluster_manager.nodes);
|
||||
if (!ln) return 0;
|
||||
int result = 1;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
clusterManagerNode *node = ln->value;
|
||||
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
||||
node->ip, node->port);
|
||||
int result = 1, consistent = 0;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
if (!quiet) clusterManagerShowNodes();
|
||||
if (!clusterManagerIsConfigConsistent()) {
|
||||
consistent = clusterManagerIsConfigConsistent();
|
||||
if (!consistent) {
|
||||
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
||||
clusterManagerOnError(err);
|
||||
result = 0;
|
||||
@ -4071,7 +4233,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
clusterManagerLogOk("[OK] All nodes agree about slots "
|
||||
"configuration.\n");
|
||||
}
|
||||
// Check open slots
|
||||
/* Check open slots */
|
||||
clusterManagerLogInfo(">>> Check for open slots...\n");
|
||||
listIter li;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
@ -4130,7 +4292,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
clusterManagerLogErr("%s.\n", (char *) errstr);
|
||||
sdsfree(errstr);
|
||||
if (do_fix) {
|
||||
// Fix open slots.
|
||||
/* Fix open slots. */
|
||||
dictReleaseIterator(iter);
|
||||
iter = dictGetIterator(open_slots);
|
||||
while ((entry = dictNext(iter)) != NULL) {
|
||||
@ -4165,6 +4327,51 @@ static int clusterManagerCheckCluster(int quiet) {
|
||||
if (fixed > 0) result = 1;
|
||||
}
|
||||
}
|
||||
int search_multiple_owners = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||
if (search_multiple_owners) {
|
||||
/* Check whether there are multiple owners, even when slots are
|
||||
* fully covered and there are no open slots. */
|
||||
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
||||
int slot = 0;
|
||||
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
list *owners = listCreate();
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||
else {
|
||||
/* Nodes having keys for the slot will be considered
|
||||
* owners too. */
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
if (count > 0) listAddNodeTail(owners, n);
|
||||
}
|
||||
}
|
||||
if (listLength(owners) > 1) {
|
||||
result = 0;
|
||||
clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
|
||||
slot, listLength(owners));
|
||||
listRewind(owners, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
||||
}
|
||||
if (do_fix) {
|
||||
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
||||
if (!result) {
|
||||
clusterManagerLogErr("Failed to fix multiple owners "
|
||||
"for slot %d\n", slot);
|
||||
listRelease(owners);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
listRelease(owners);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user