mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Merge pull request #5528 from artix75/cluster_manager_fix_cmd
Cluster manager fix cmd
This commit is contained in:
commit
0e2f56d0a1
148
src/redis-cli.c
148
src/redis-cli.c
@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1,
|
|||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
*err = zmalloc((reply->len + 1) * sizeof(char));
|
*err = zmalloc((reply->len + 1) * sizeof(char));
|
||||||
strcpy(*err, reply->str);
|
strcpy(*err, reply->str);
|
||||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err);
|
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str);
|
||||||
}
|
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
cleanup:
|
cleanup:
|
||||||
@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
if (migrate_reply == NULL) goto next;
|
if (migrate_reply == NULL) goto next;
|
||||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||||
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
||||||
|
/* If the key already exists, try to migrate keys
|
||||||
|
* adding REPLACE option.
|
||||||
|
* If the key's slot is not served, try to assign slot
|
||||||
|
* to the target node. */
|
||||||
|
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
|
||||||
|
if (strstr(migrate_reply->str, "slot not served") != NULL)
|
||||||
|
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||||
clusterManagerLogWarn("*** Target key exists. "
|
clusterManagerLogWarn("*** Target key exists. "
|
||||||
"Replacing it for FIX.\n");
|
"Replacing it for FIX.\n");
|
||||||
freeReplyObject(migrate_reply);
|
freeReplyObject(migrate_reply);
|
||||||
/* Try to migrate keys adding REPLACE option. */
|
|
||||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
target,
|
target,
|
||||||
reply,
|
reply,
|
||||||
1, timeout,
|
is_busy,
|
||||||
|
timeout,
|
||||||
NULL);
|
NULL);
|
||||||
success = (migrate_reply != NULL &&
|
success = (migrate_reply != NULL &&
|
||||||
migrate_reply->type != REDIS_REPLY_ERROR);
|
migrate_reply->type != REDIS_REPLY_ERROR);
|
||||||
@ -3301,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
|||||||
nodename = token;
|
nodename = token;
|
||||||
tot_size = (p - token);
|
tot_size = (p - token);
|
||||||
name_len = tot_size++; // Make room for ':' in tot_size
|
name_len = tot_size++; // Make room for ':' in tot_size
|
||||||
} else if (i == 8) break;
|
}
|
||||||
i++;
|
if (++i == 8) break;
|
||||||
}
|
}
|
||||||
if (i != 8) continue;
|
if (i != 8) continue;
|
||||||
if (nodename == NULL) continue;
|
if (nodename == NULL) continue;
|
||||||
@ -3341,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
|||||||
char *sp = cfg + name_len;
|
char *sp = cfg + name_len;
|
||||||
*(sp++) = ':';
|
*(sp++) = ':';
|
||||||
for (i = 0; i < c; i++) {
|
for (i = 0; i < c; i++) {
|
||||||
if (i > 0) *(sp++) = '|';
|
if (i > 0) *(sp++) = ',';
|
||||||
int slen = strlen(slots[i]);
|
int slen = strlen(slots[i]);
|
||||||
memcpy(sp, slots[i], slen);
|
memcpy(sp, slots[i], slen);
|
||||||
sp += slen;
|
sp += slen;
|
||||||
@ -3583,10 +3589,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
clusterManagerNode *n = node_n->value;
|
clusterManagerNode *n = node_n->value;
|
||||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
slot, n->ip, n->port);
|
slot, n->ip, n->port);
|
||||||
|
/* Ensure the slot is not already assigned. */
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER DELSLOTS %s", slot);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||||
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
@ -3614,10 +3627,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
clusterManagerNode *n = fn->value;
|
clusterManagerNode *n = fn->value;
|
||||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
slot, n->ip, n->port);
|
slot, n->ip, n->port);
|
||||||
|
/* Ensure the slot is not already assigned. */
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER DELSLOTS %s", slot);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||||
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
@ -3651,7 +3671,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||||
"to %s:%d\n", slot,
|
"to %s:%d\n", slot,
|
||||||
target->ip, target->port);
|
target->ip, target->port);
|
||||||
|
/* Ensure the slot is not already assigned. */
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||||
|
"CLUSTER DELSLOTS %s", slot);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(target,
|
||||||
"CLUSTER ADDSLOTS %s", slot);
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
@ -3660,6 +3684,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||||
|
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||||
* info into the node struct, in order to keep it synced */
|
* info into the node struct, in order to keep it synced */
|
||||||
@ -3670,14 +3697,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
while ((nln = listNext(&nli)) != NULL) {
|
while ((nln = listNext(&nli)) != NULL) {
|
||||||
clusterManagerNode *src = nln->value;
|
clusterManagerNode *src = nln->value;
|
||||||
if (src == target) continue;
|
if (src == target) continue;
|
||||||
|
/* Assign the slot to target node in the source node. */
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||||
|
"CLUSTER SETSLOT %s %s %s", slot,
|
||||||
|
"NODE", target->name);
|
||||||
|
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||||
|
fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
/* Set the source node in 'importing' state
|
/* Set the source node in 'importing' state
|
||||||
* (even if we will actually migrate keys away)
|
* (even if we will actually migrate keys away)
|
||||||
* in order to avoid receiving redirections
|
* in order to avoid receiving redirections
|
||||||
* for MIGRATE. */
|
* for MIGRATE. */
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
r = CLUSTER_MANAGER_COMMAND(src,
|
||||||
"CLUSTER SETSLOT %s %s %s", slot,
|
"CLUSTER SETSLOT %s %s %s", slot,
|
||||||
"IMPORTING", target->name);
|
"IMPORTING", target->name);
|
||||||
if (!clusterManagerCheckRedisReply(target, r, NULL))
|
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||||
fixed = -1;
|
fixed = -1;
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
if (fixed < 0) goto cleanup;
|
if (fixed < 0) goto cleanup;
|
||||||
@ -3687,6 +3722,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||||||
fixed = -1;
|
fixed = -1;
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(src,
|
||||||
|
"CLUSTER SETSLOT %s %s", slot,
|
||||||
|
"STABLE");
|
||||||
|
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||||
|
fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
}
|
}
|
||||||
fixed++;
|
fixed++;
|
||||||
}
|
}
|
||||||
@ -3720,11 +3762,22 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
if (n->slots[slot]) {
|
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||||
if (owner == NULL) owner = n;
|
else {
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||||
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
|
if (success && r->integer > 0) {
|
||||||
|
clusterManagerLogWarn("*** Found keys about slot %d "
|
||||||
|
"in non-owner node %s:%d!\n", slot,
|
||||||
|
n->ip, n->port);
|
||||||
listAddNodeTail(owners, n);
|
listAddNodeTail(owners, n);
|
||||||
}
|
}
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (listLength(owners) == 1) owner = listFirst(owners)->value;
|
||||||
listRewind(cluster_manager.nodes, &li);
|
listRewind(cluster_manager.nodes, &li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
@ -3735,7 +3788,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
sds migrating_slot = n->migrating[i];
|
sds migrating_slot = n->migrating[i];
|
||||||
if (atoi(migrating_slot) == slot) {
|
if (atoi(migrating_slot) == slot) {
|
||||||
char *sep = (listLength(migrating) == 0 ? "" : ",");
|
char *sep = (listLength(migrating) == 0 ? "" : ",");
|
||||||
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
|
migrating_str = sdscatfmt(migrating_str, "%s%s:%u",
|
||||||
sep, n->ip, n->port);
|
sep, n->ip, n->port);
|
||||||
listAddNodeTail(migrating, n);
|
listAddNodeTail(migrating, n);
|
||||||
is_migrating = 1;
|
is_migrating = 1;
|
||||||
@ -3748,7 +3801,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
sds importing_slot = n->importing[i];
|
sds importing_slot = n->importing[i];
|
||||||
if (atoi(importing_slot) == slot) {
|
if (atoi(importing_slot) == slot) {
|
||||||
char *sep = (listLength(importing) == 0 ? "" : ",");
|
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||||
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
importing_str = sdscatfmt(importing_str, "%s%s:%u",
|
||||||
sep, n->ip, n->port);
|
sep, n->ip, n->port);
|
||||||
listAddNodeTail(importing, n);
|
listAddNodeTail(importing, n);
|
||||||
is_importing = 1;
|
is_importing = 1;
|
||||||
@ -3767,15 +3820,20 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
clusterManagerLogWarn("*** Found keys about slot %d "
|
clusterManagerLogWarn("*** Found keys about slot %d "
|
||||||
"in node %s:%d!\n", slot, n->ip,
|
"in node %s:%d!\n", slot, n->ip,
|
||||||
n->port);
|
n->port);
|
||||||
|
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||||
|
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
||||||
|
sep, n->ip, n->port);
|
||||||
listAddNodeTail(importing, n);
|
listAddNodeTail(importing, n);
|
||||||
}
|
}
|
||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (sdslen(migrating_str) > 0)
|
||||||
printf("Set as migrating in: %s\n", migrating_str);
|
printf("Set as migrating in: %s\n", migrating_str);
|
||||||
|
if (sdslen(importing_str) > 0)
|
||||||
printf("Set as importing in: %s\n", importing_str);
|
printf("Set as importing in: %s\n", importing_str);
|
||||||
/* If there is no slot owner, set as owner the slot with the biggest
|
/* If there is no slot owner, set as owner the node with the biggest
|
||||||
* number of keys, among the set of migrating / importing nodes. */
|
* number of keys, among the set of migrating / importing nodes. */
|
||||||
if (owner == NULL) {
|
if (owner == NULL) {
|
||||||
clusterManagerLogInfo(">>> Nobody claims ownership, "
|
clusterManagerLogInfo(">>> Nobody claims ownership, "
|
||||||
@ -3799,6 +3857,15 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
if (reply) freeReplyObject(reply);
|
if (reply) freeReplyObject(reply);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
|
/* Ensure that the slot is unassigned before assigning it to the
|
||||||
|
* owner. */
|
||||||
|
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
||||||
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
|
/* Ignore "already unassigned" error. */
|
||||||
|
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||||
|
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
if (reply) freeReplyObject(reply);
|
if (reply) freeReplyObject(reply);
|
||||||
@ -3827,32 +3894,43 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
* in migrating state, since migrating is a valid state only for
|
* in migrating state, since migrating is a valid state only for
|
||||||
* slot owners. */
|
* slot owners. */
|
||||||
if (listLength(owners) > 1) {
|
if (listLength(owners) > 1) {
|
||||||
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
|
/* Owner cannot be NULL at this point, since if there are more owners,
|
||||||
|
* the owner has been set in the previous condition (owner == NULL). */
|
||||||
|
assert(owner != NULL);
|
||||||
listRewind(owners, &li);
|
listRewind(owners, &li);
|
||||||
redisReply *reply = NULL;
|
redisReply *reply = NULL;
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
clusterManagerNode *n = ln->value;
|
clusterManagerNode *n = ln->value;
|
||||||
if (n == owner) continue;
|
if (n == owner) continue;
|
||||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot);
|
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
||||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||||
|
/* Ignore "already unassigned" error. */
|
||||||
|
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||||
|
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||||
if (reply) freeReplyObject(reply);
|
if (reply) freeReplyObject(reply);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
|
n->slots[slot] = 0;
|
||||||
|
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||||
|
success = clusterManagerSetSlot(n, owner, slot, "node", NULL);
|
||||||
|
if (!success) goto cleanup;
|
||||||
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
|
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
|
/* Avoid duplicates. */
|
||||||
|
clusterManagerRemoveNodeFromList(importing, n);
|
||||||
listAddNodeTail(importing, n);
|
listAddNodeTail(importing, n);
|
||||||
|
/* Ensure that the node is not in the migrating list. */
|
||||||
|
clusterManagerRemoveNodeFromList(migrating, n);
|
||||||
}
|
}
|
||||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
|
||||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (!success) goto cleanup;
|
|
||||||
}
|
}
|
||||||
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
|
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
|
||||||
/* Case 1: The slot is in migrating state in one slot, and in
|
/* Case 1: The slot is in migrating state in one node, and in
|
||||||
* importing state in 1 slot. That's trivial to address. */
|
* importing state in 1 node. That's trivial to address. */
|
||||||
if (listLength(migrating) == 1 && listLength(importing) == 1) {
|
if (listLength(migrating) == 1 && listLength(importing) == 1) {
|
||||||
clusterManagerNode *src = listFirst(migrating)->value;
|
clusterManagerNode *src = listFirst(migrating)->value;
|
||||||
clusterManagerNode *dst = listFirst(importing)->value;
|
clusterManagerNode *dst = listFirst(importing)->value;
|
||||||
|
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||||
|
"%s:%d to %s:%d\n", slot,
|
||||||
|
src->ip, src->port, dst->ip, dst->port);
|
||||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||||
}
|
}
|
||||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||||
@ -3860,7 +3938,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
* the slot. In this case we just move all the keys to the owner
|
* the slot. In this case we just move all the keys to the owner
|
||||||
* according to the configuration. */
|
* according to the configuration. */
|
||||||
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
|
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
|
||||||
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
|
clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its "
|
||||||
"owner %s:%d\n", slot, owner->ip, owner->port);
|
"owner %s:%d\n", slot, owner->ip, owner->port);
|
||||||
move_opts |= CLUSTER_MANAGER_OPT_COLD;
|
move_opts |= CLUSTER_MANAGER_OPT_COLD;
|
||||||
listRewind(importing, &li);
|
listRewind(importing, &li);
|
||||||
@ -3878,11 +3956,24 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
if (r) freeReplyObject(r);
|
if (r) freeReplyObject(r);
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
|
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||||
|
* other nodes update their own configuration about the slot itself. */
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n == owner) continue;
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||||
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||||
listLength(migrating) == 1);
|
listLength(migrating) == 1);
|
||||||
if (try_to_close_slot) {
|
if (try_to_close_slot) {
|
||||||
clusterManagerNode *n = listFirst(migrating)->value;
|
clusterManagerNode *n = listFirst(migrating)->value;
|
||||||
|
if (!owner || owner != n) {
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
|
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
|
||||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
@ -3892,11 +3983,15 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||||||
}
|
}
|
||||||
if (!success) goto cleanup;
|
if (!success) goto cleanup;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
/* Case 3: There are no slots claiming to be in importing state, but
|
/* Case 3: There are no slots claiming to be in importing state, but
|
||||||
* there is a migrating node that actually don't have any key. We
|
* there is a migrating node that actually don't have any key or is the
|
||||||
* can just close the slot, probably a reshard interrupted in the middle. */
|
* slot owner. We can just close the slot, probably a reshard interrupted
|
||||||
|
* in the middle. */
|
||||||
if (try_to_close_slot) {
|
if (try_to_close_slot) {
|
||||||
clusterManagerNode *n = listFirst(migrating)->value;
|
clusterManagerNode *n = listFirst(migrating)->value;
|
||||||
|
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
||||||
|
slot, n->ip, n->port);
|
||||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||||
slot, "STABLE");
|
slot, "STABLE");
|
||||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
@ -4025,6 +4120,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||||||
result = 0;
|
result = 0;
|
||||||
if (do_fix/* && result*/) {
|
if (do_fix/* && result*/) {
|
||||||
dictType dtype = clusterManagerDictType;
|
dictType dtype = clusterManagerDictType;
|
||||||
|
dtype.keyDestructor = dictSdsDestructor;
|
||||||
dtype.valDestructor = dictListDestructor;
|
dtype.valDestructor = dictListDestructor;
|
||||||
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
|
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
|
||||||
int fixed = clusterManagerFixSlotsCoverage(slots);
|
int fixed = clusterManagerFixSlotsCoverage(slots);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user