Cluster Manager: fix command.

This commit is contained in:
artix 2018-04-06 18:02:40 +02:00
parent 6d1a7cec23
commit 3f8a4adb49

View File

@ -151,6 +151,7 @@ static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(void *privdata, const void *key1,
const void *key2);
static void dictSdsDestructor(void *privdata, void *val);
static void dictListDestructor(void *privdata, void *val);
/* Cluster Manager Command Info */
typedef struct clusterManagerCommand {
@ -406,6 +407,12 @@ static void dictSdsDestructor(void *privdata, void *val)
sdsfree(val);
}
void dictListDestructor(void *privdata, void *val)
{
DICT_NOTUSED(privdata);
listRelease((list*)val);
}
/* _serverAssert is needed by dict */
void _serverAssert(const char *estr, const char *file, int line) {
fprintf(stderr, "=== ASSERTION FAILED ===");
@ -1446,6 +1453,15 @@ static void usage(void) {
exit(1);
}
static int confirmWithYes(char *msg) {
printf("%s (type 'yes' to accept): ", msg);
fflush(stdout);
char buf[4];
int nread = read(fileno(stdin),buf,4);
buf[3] = '\0';
return (nread != 0 && !strcmp("yes", buf));
}
/* Turn the plain C strings into Sds strings */
static char **convertToSds(int count, char** args) {
int j;
@ -1751,7 +1767,7 @@ static int evalMode(int argc, char **argv) {
}
/*------------------------------------------------------------------------------
* Cluster Manager mode
* Cluster Manager
*--------------------------------------------------------------------------- */
/* The Cluster Manager global structure */
@ -1760,6 +1776,9 @@ static struct clusterManager {
list *errors;
} cluster_manager;
/* Used by clusterManagerFixSlotsCoverage */
dict *clusterManagerUncoveredSlots = NULL;
typedef struct clusterManagerNode {
redisContext *context;
sds name;
@ -1776,10 +1795,12 @@ typedef struct clusterManagerNode {
int slots_count;
int replicas_count;
list *friends;
sds *migrating;
sds *importing;
int migrating_count;
int importing_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
sds *importing; /* An array of sds where even strings are slots and odd
* strings are the source node IDs. */
int migrating_count; /* Length of the migrating array (migrating slots*2) */
int importing_count; /* Length of the importing array (importing slots*2) */
float weight; /* Weight used by rebalance */
int balance; /* Used by rebalance */
} clusterManagerNode;
@ -1829,7 +1850,7 @@ static void clusterManagerShowNodes(void);
static void clusterManagerShowInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
static void clusterManagerWaitForClusterJoin(void);
static void clusterManagerCheckCluster(int quiet);
static int clusterManagerCheckCluster(int quiet);
static void clusterManagerLog(int level, const char* fmt, ...);
static int clusterManagerIsConfigConsistent(void);
static void clusterManagerOnError(sds err);
@ -1846,6 +1867,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandFix(int argc, char **argv);
static int clusterManagerCommandReshard(int argc, char **argv);
static int clusterManagerCommandRebalance(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
@ -1863,6 +1885,7 @@ clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
@ -1988,6 +2011,8 @@ static void freeClusterManager(void) {
listRelease(cluster_manager.errors);
cluster_manager.errors = NULL;
}
if (clusterManagerUncoveredSlots != NULL)
dictRelease(clusterManagerUncoveredSlots);
}
static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
@ -2013,6 +2038,38 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
clusterManagerNodeResetSlots(node);
return node;
}
/* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
* latest case, if 'err' arg is not NULL, it gets allocated with a copy
* of reply error (it's up to the caller function to free it), elsewhere
* the error is directly printed. */
static int clusterManagerCheckRedisReply(clusterManagerNode *n,
redisReply *r, char **err)
{
int is_err = 0;
if (!r || (is_err = (r->type == REDIS_REPLY_ERROR))) {
if (is_err) {
if (err != NULL) {
*err = zmalloc((r->len + 1) * sizeof(char));
strcpy(*err, r->str);
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, r->str);
}
return 0;
}
return 1;
}
static void clusterManagerRemoveNodeFromList(list *nodelist,
clusterManagerNode *node) {
listIter li;
listNode *ln;
listRewind(nodelist, &li);
while ((ln = listNext(&li)) != NULL) {
if (node == ln->value) {
listDelNode(nodelist, ln);
break;
}
}
}
/* Return the node with the specified ID or NULL. */
static clusterManagerNode *clusterManagerNodeByName(const char *name) {
@ -2470,10 +2527,10 @@ cleanup:
/* Set slot status to "importing" or "migrating" */
static int clusterManagerSetSlot(clusterManagerNode *node1,
clusterManagerNode *node2,
int slot, const char *mode, char **err) {
int slot, const char *status, char **err) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER "
"SETSLOT %d %s %s",
slot, mode,
slot, status,
(char *) node2->name);
if (err != NULL) *err = NULL;
if (!reply) return 0;
@ -2492,6 +2549,70 @@ cleanup:
return success;
}
/* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */
static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source,
clusterManagerNode *target,
redisReply *reply,
int replace, int timeout,
char *dots)
{
redisReply *migrate_reply = NULL;
char **argv = NULL;
size_t *argv_len = NULL;
int c = (replace ? 8 : 7);
size_t argc = c + reply->elements;
size_t i, offset = 6; // Keys Offset
argv = zcalloc(argc * sizeof(char *));
argv_len = zcalloc(argc * sizeof(size_t));
char portstr[255];
char timeoutstr[255];
snprintf(portstr, 10, "%d", target->port);
snprintf(timeoutstr, 10, "%d", timeout);
argv[0] = "MIGRATE";
argv_len[0] = 7;
argv[1] = target->ip;
argv_len[1] = strlen(target->ip);
argv[2] = portstr;
argv_len[2] = strlen(portstr);
argv[3] = "";
argv_len[3] = 0;
argv[4] = "0";
argv_len[4] = 1;
argv[5] = timeoutstr;
argv_len[5] = strlen(timeoutstr);
if (replace) {
argv[offset] = "REPLACE";
argv_len[offset] = 7;
offset++;
}
argv[offset] = "KEYS";
argv_len[offset] = 4;
offset++;
for (i = 0; i < reply->elements; i++) {
redisReply *entry = reply->element[i];
size_t idx = i + offset;
assert(entry->type == REDIS_REPLY_STRING);
argv[idx] = (char *) sdsnew(entry->str);
argv_len[idx] = entry->len;
if (dots) dots[i] = '.';
}
if (dots) dots[reply->elements] = '\0';
void *_reply = NULL;
redisAppendCommandArgv(source->context,argc,
(const char**)argv,argv_len);
int success = (redisGetReply(source->context, &_reply) == REDIS_OK);
for (i = 0; i < reply->elements; i++) sdsfree(argv[i + offset]);
if (!success) goto cleanup;
migrate_reply = (redisReply *) _reply;
cleanup:
zfree(argv);
zfree(argv_len);
return migrate_reply;
}
/* Migrate all keys in the given slot from source to target.*/
static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
clusterManagerNode *target,
int slot, int timeout,
@ -2499,10 +2620,11 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
char **err)
{
int success = 1;
int do_fix = (config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX);
while (1) {
char *dots = NULL;
redisReply *reply = NULL, *migrate_reply = NULL;
char **argv = NULL;
size_t *argv_len = NULL;
reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
"GETKEYSINSLOT %d %d", slot,
pipeline);
@ -2523,57 +2645,37 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
freeReplyObject(reply);
break;
}
char *dots = (verbose ? zmalloc((count+1) * sizeof(char)) : NULL);
if (verbose) dots = zmalloc((count+1) * sizeof(char));
/* Calling MIGRATE command. */
size_t argc = count + 8;
argv = zcalloc(argc * sizeof(char *));
argv_len = zcalloc(argc * sizeof(size_t));
char portstr[255];
char timeoutstr[255];
snprintf(portstr, 10, "%d", target->port);
snprintf(timeoutstr, 10, "%d", timeout);
argv[0] = "MIGRATE";
argv_len[0] = 7;
argv[1] = target->ip;
argv_len[1] = strlen(target->ip);
argv[2] = portstr;
argv_len[2] = strlen(portstr);
argv[3] = "";
argv_len[3] = 0;
argv[4] = "0";
argv_len[4] = 1;
argv[5] = timeoutstr;
argv_len[5] = strlen(timeoutstr);
argv[6] = "REPLACE";
argv_len[6] = 7;
argv[7] = "KEYS";
argv_len[7] = 4;
for (size_t i = 0; i < count; i++) {
redisReply *entry = reply->element[i];
size_t idx = i + 8;
assert(entry->type == REDIS_REPLY_STRING);
argv[idx] = (char *) sdsnew(entry->str);
argv_len[idx] = entry->len;
if (verbose) dots[i] = '.';
}
if (verbose) dots[count] = '\0';
void *_reply = NULL;
redisAppendCommandArgv(source->context,argc,
(const char**)argv,argv_len);
success = (redisGetReply(source->context, &_reply) == REDIS_OK);
for (size_t i = 0; i < count; i++) sdsfree(argv[i + 8]);
if (!success) goto next;
migrate_reply = (redisReply *) _reply;
migrate_reply = clusterManagerMigrateKeysInReply(source, target,
reply, 0, timeout,
dots);
if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
// TODO: Implement fix.
success = 0;
if (err != NULL) {
*err = zmalloc((migrate_reply->len + 1) * sizeof(char));
strcpy(*err, migrate_reply->str);
printf("\n");
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n");
freeReplyObject(migrate_reply);
/* Try to migrate keys adding REPLACE option. */
migrate_reply = clusterManagerMigrateKeysInReply(source,
target,
reply,
1, timeout,
NULL);
success = (migrate_reply != NULL &&
migrate_reply->type != REDIS_REPLY_ERROR);
} else success = 0;
if (!success) {
if (migrate_reply != NULL) {
if (err) {
*err = zmalloc((migrate_reply->len + 1) * sizeof(char));
strcpy(*err, migrate_reply->str);
}
printf("\n");
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
}
goto next;
}
goto next;
}
if (verbose) {
printf("%s", dots);
@ -2582,8 +2684,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
next:
if (reply != NULL) freeReplyObject(reply);
if (migrate_reply != NULL) freeReplyObject(migrate_reply);
zfree(argv);
zfree(argv_len);
if (dots) zfree(dots);
if (!success) break;
}
return success;
@ -2729,6 +2830,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL,
*ping_sent = NULL, *ping_recv = NULL, *config_epoch = NULL,
*link_status = NULL;
UNUSED(link_status);
int i = 0;
while ((p = strchr(line, ' ')) != NULL) {
*p = '\0';
@ -2974,11 +3076,11 @@ int clusterManagerCompareNodeBalance(const void *n1, const void *n2) {
static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
sds signature = NULL;
int node_count = 0, i = 0, name_len = 0;
char **node_configs = NULL;
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
if (reply == NULL || reply->type == REDIS_REPLY_ERROR)
goto cleanup;
char *lines = reply->str, *p, *line;
char **node_configs = NULL;
while ((p = strstr(lines, "\n")) != NULL) {
i = 0;
*p = '\0';
@ -3057,8 +3159,10 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
}
cleanup:
if (reply != NULL) freeReplyObject(reply);
for (i = 0; i < node_count; i++) zfree(node_configs[i]);
zfree(node_configs);
if (node_configs != NULL) {
for (i = 0; i < node_count; i++) zfree(node_configs[i]);
zfree(node_configs);
}
return signature;
}
@ -3114,9 +3218,453 @@ static int clusterManagerGetCoveredSlots(char *all_slots) {
return totslots;
}
static void clusterManagerCheckCluster(int quiet) {
static void clusterManagerPrintSlotsList(list *slots) {
listIter li;
listNode *ln;
listRewind(slots, &li);
sds first = NULL;
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
if (!first) first = slot;
else printf(", ");
printf("%s", slot);
}
printf("\n");
}
/* Return the node, among 'nodes' with the greatest number of keys
* in the specified slot. */
static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes,
int slot,
char **err)
{
clusterManagerNode *node = NULL;
int numkeys = 0;
listIter li;
listNode *ln;
listRewind(nodes, &li);
if (err) *err = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
redisReply *r =
CLUSTER_MANAGER_COMMAND(n, "CLUSTER COUNTKEYSINSLOTi %d", slot);
int success = clusterManagerCheckRedisReply(n, r, err);
if (success) {
if (r->integer > numkeys || node == NULL) {
numkeys = r->integer;
node = n;
}
}
if (r != NULL) freeReplyObject(r);
/* If the reply contains errors */
if (!success) {
if (err != NULL && *err != NULL)
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
node = NULL;
break;
}
}
return node;
}
static int clusterManagerFixSlotsCoverage(char *all_slots) {
int i, fixed = 0;
list *none = NULL, *single = NULL, *multi = NULL;
clusterManagerLogInfo(">>> Fixing slots coverage...\n");
printf("List of not covered slots: \n");
int uncovered_count = 0;
sds log = sdsempty();
for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
int covered = all_slots[i];
if (!covered) {
sds key = sdsfromlonglong((long long) i);
if (uncovered_count++ > 0) printf(",");
printf("%s", (char *) key);
list *slot_nodes = listCreate();
sds slot_nodes_str = sdsempty();
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
redisReply *reply = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", i, 1);
if (!clusterManagerCheckRedisReply(n, reply, NULL)) {
fixed = -1;
if (reply) freeReplyObject(reply);
goto cleanup;
}
assert(reply->type == REDIS_REPLY_ARRAY);
if (reply->elements > 0) {
listAddNodeTail(slot_nodes, n);
if (listLength(slot_nodes) > 1)
slot_nodes_str = sdscat(slot_nodes_str, ", ");
slot_nodes_str = sdscatfmt(slot_nodes_str,
"%s:%u", n->ip, n->port);
}
freeReplyObject(reply);
}
log = sdscatfmt(log, "\nSlot %S has keys in %u nodes: %S",
key, listLength(slot_nodes), slot_nodes_str);
sdsfree(slot_nodes_str);
dictAdd(clusterManagerUncoveredSlots, key, slot_nodes);
}
}
printf("\n%s\n", log);
/* For every slot, take action depending on the actual condition:
* 1) No node has keys for this slot.
* 2) A single node has keys for this slot.
* 3) Multiple nodes have keys for this slot. */
none = listCreate();
single = listCreate();
multi = listCreate();
dictIterator *iter = dictGetIterator(clusterManagerUncoveredSlots);
dictEntry *entry;
while ((entry = dictNext(iter)) != NULL) {
sds slot = (sds) dictGetKey(entry);
list *nodes = (list *) dictGetVal(entry);
switch (listLength(nodes)){
case 0: listAddNodeTail(none, slot); break;
case 1: listAddNodeTail(single, slot); break;
default: listAddNodeTail(multi, slot); break;
}
}
dictReleaseIterator(iter);
/* Handle case "1": keys in no node. */
if (listLength(none) > 0) {
printf("The following uncovered slots have no keys "
"across the cluster:\n");
clusterManagerPrintSlotsList(none);
if (confirmWithYes("Fix these slots by covering with a random node?")){
srand(time(NULL));
listIter li;
listNode *ln;
listRewind(none, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
long idx = (long) (rand() % listLength(cluster_manager.nodes));
listNode *node_n = listIndex(cluster_manager.nodes, idx);
assert(node_n != NULL);
clusterManagerNode *n = node_n->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
fixed++;
}
}
}
/* Handle case "2": keys only in one node. */
if (listLength(single) > 0) {
printf("The following uncovered slots have keys in just one node:\n");
clusterManagerPrintSlotsList(single);
if (confirmWithYes("Fix these slots by covering with those nodes?")){
listIter li;
listNode *ln;
listRewind(single, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
assert(entry != NULL);
list *nodes = (list *) dictGetVal(entry);
listNode *fn = listFirst(nodes);
assert(fn != NULL);
clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
fixed++;
}
}
}
/* Handle case "3": keys in multiple nodes. */
if (listLength(multi) > 0) {
printf("The folowing uncovered slots have keys in multiple nodes:\n");
clusterManagerPrintSlotsList(multi);
if (confirmWithYes("Fix these slots by moving keys "
"into a single node?")) {
listIter li;
listNode *ln;
listRewind(multi, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
assert(entry != NULL);
list *nodes = (list *) dictGetVal(entry);
int s = atoi(slot);
clusterManagerNode *target =
clusterManagerGetNodeWithMostKeysInSlot(nodes, s, NULL);
if (target == NULL) {
fixed = -1;
goto cleanup;
}
clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot,
target->ip, target->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER SETSLOT %s %s", slot, "STABLE");
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
listIter nli;
listNode *nln;
listRewind(nodes, &nli);
while ((nln = listNext(&nli)) != NULL) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"IMPORTING", target->name);
if (!clusterManagerCheckRedisReply(target, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
CLUSTER_MANAGER_OPT_COLD;
if (!clusterManagerMoveSlot(src, target, s, opts, NULL)) {
fixed = -1;
goto cleanup;
}
}
fixed++;
}
}
}
cleanup:
sdsfree(log);
if (none) listRelease(none);
if (single) listRelease(single);
if (multi) listRelease(multi);
return fixed;
}
/* Slot 'slot' was found to be in importing or migrating state in one or
* more nodes. This function fixes this condition by migrating keys where
* it seems more sensible. */
static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogInfo(">>> Fixing open slot %d\n", slot);
/* Try to obtain the current slot owner, according to the current
* nodes configuration. */
int success = 1;
list *owners = listCreate();
list *migrating = listCreate();
list *importing = listCreate();
sds migrating_str = sdsempty();
sds importing_str = sdsempty();
clusterManagerNode *owner = NULL;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->slots[slot]) {
if (owner == NULL) owner = n;
listAddNodeTail(owners, n);
}
}
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->migrating) {
for (int i = 0; i < n->migrating_count; i += 2) {
sds migrating_slot = n->migrating[i];
if (atoi(migrating_slot) == slot) {
char *sep = (listLength(migrating) == 0 ? "" : ",");
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
sep, n->ip, n->port);
listAddNodeTail(migrating, n);
break;
}
}
}
if (n->importing) {
for (int i = 0; i < n->importing_count; i += 2) {
sds importing_slot = n->importing[i];
if (atoi(importing_slot) == slot) {
char *sep = (listLength(importing) == 0 ? "" : ",");
importing_str = sdscatfmt(importing_str, "%s%S:%u",
sep, n->ip, n->port);
listAddNodeTail(importing, n);
break;
}
}
}
}
printf("Set as migrating in: %s\n", migrating_str);
printf("Set as importing in: %s\n", importing_str);
/* If there is no slot owner, set as owner the slot with the biggest
* number of keys, among the set of migrating / importing nodes. */
if (owner == NULL) {
clusterManagerLogInfo(">>> Nobody claims ownership, "
"selecting an owner...\n");
owner = clusterManagerGetNodeWithMostKeysInSlot(cluster_manager.nodes,
slot, NULL);
// If we still don't have an owner, we can't fix it.
if (owner == NULL) {
clusterManagerLogErr("[ERR] Can't select a slot owner. "
"Impossible to fix.\n");
success = 0;
goto cleanup;
}
// Use ADDSLOTS to assign the slot.
printf("*** Configuring %s:%d as the slot owner\n", owner->ip,
owner->port);
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
"SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
/* Make sure this information will propagate. Not strictly needed
* since there is no past owner, so all the other nodes will accept
* whatever epoch this node will claim the slot with. */
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
/* Remove the owner from the list of migrating/importing
* nodes. */
clusterManagerRemoveNodeFromList(migrating, owner);
clusterManagerRemoveNodeFromList(importing, owner);
}
/* If there are multiple owners of the slot, we need to fix it
* so that a single node is the owner and all the other nodes
* are in importing state. Later the fix can be handled by one
* of the base cases above.
*
* Note that this case also covers multiple nodes having the slot
* in migrating state, since migrating is a valid state only for
* slot owners. */
if (listLength(owners) > 1) {
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
listRewind(owners, &li);
redisReply *reply = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot);
success = clusterManagerCheckRedisReply(n, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
if (!success) goto cleanup;
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
listAddNodeTail(importing, n);
}
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
}
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
/* Case 1: The slot is in migrating state in one slot, and in
* importing state in 1 slot. That's trivial to address. */
if (listLength(migrating) == 1 && listLength(importing) == 1) {
clusterManagerNode *src = listFirst(migrating)->value;
clusterManagerNode *dst = listFirst(importing)->value;
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
}
/* Case 2: There are multiple nodes that claim the slot as importing,
* they probably got keys about the slot after a restart so opened
* the slot. In this case we just move all the keys to the owner
* according to the configuration. */
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
"owner %s:%d\n", slot, owner->ip, owner->port);
move_opts |= CLUSTER_MANAGER_OPT_COLD;
listRewind(importing, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
success = clusterManagerMoveSlot(n, owner, slot, move_opts, NULL);
if (!success) goto cleanup;
clusterManagerLogInfo(">>> Setting %d as STABLE in "
"%s:%d\n", slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
} else {
int try_to_close_slot = (listLength(importing) == 0 &&
listLength(migrating) == 1);
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) {
if (success) try_to_close_slot = (r->elements == 0);
freeReplyObject(r);
}
if (!success) goto cleanup;
}
/* Case 3: There are no slots claiming to be in importing state, but
* there is a migrating node that actually don't have any key. We
* can just close the slot, probably a reshard interrupted in the middle. */
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) goto cleanup;
} else {
success = 0;
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
"yet (work in progress). Slot is set as "
"migrating in %s, as importing in %s, "
"owner is %s:%d\n", migrating_str,
importing_str, owner->ip, owner->port);
}
}
cleanup:
listRelease(owners);
listRelease(migrating);
listRelease(importing);
sdsfree(migrating_str);
sdsfree(importing_str);
return success;
}
static int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return;
if (!ln) return 0;
int result = 1;
int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX;
clusterManagerNode *node = ln->value;
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
node->ip, node->port);
@ -3124,6 +3672,7 @@ static void clusterManagerCheckCluster(int quiet) {
if (!clusterManagerIsConfigConsistent()) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
clusterManagerOnError(err);
result = 0;
} else {
clusterManagerLogOk("[OK] All nodes agree about slots "
"configuration.\n");
@ -3174,6 +3723,7 @@ static void clusterManagerCheckCluster(int quiet) {
}
}
if (open_slots != NULL) {
result = 0;
dictIterator *iter = dictGetIterator(open_slots);
dictEntry *entry;
sds errstr = sdsnew("[WARNING] The following slots are open: ");
@ -3185,6 +3735,17 @@ static void clusterManagerCheckCluster(int quiet) {
}
clusterManagerLogErr("%s.\n", (char *) errstr);
sdsfree(errstr);
if (do_fix) {
// Fix open slots.
dictReleaseIterator(iter);
iter = dictGetIterator(open_slots);
while ((entry = dictNext(iter)) != NULL) {
sds slot = (sds) dictGetKey(entry);
result = clusterManagerFixOpenSlot(atoi(slot));
if (!result) break;
}
}
dictReleaseIterator(iter);
dictRelease(open_slots);
}
clusterManagerLogInfo(">>> Check slots coverage...\n");
@ -3200,7 +3761,16 @@ static void clusterManagerCheckCluster(int quiet) {
"covered by nodes.\n",
CLUSTER_MANAGER_SLOTS);
clusterManagerOnError(err);
result = 0;
if (do_fix/* && result*/) {
dictType dtype = clusterManagerDictType;
dtype.valDestructor = dictListDestructor;
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
int fixed = clusterManagerFixSlotsCoverage(slots);
if (fixed > 0) result = 1;
}
}
return result;
}
static clusterManagerNode *clusterNodeForResharding(char *id,
@ -3546,12 +4116,7 @@ assign_replicas:
}
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
clusterManagerShowNodes();
printf("Can I set the above configuration? %s", "(type 'yes' to accept): ");
fflush(stdout);
char buf[4];
int nread = read(fileno(stdin),buf,4);
buf[3] = '\0';
if (nread != 0 && !strcmp("yes", buf)) {
if (confirmWithYes("Can I set the above configuration?")) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
@ -3674,13 +4239,17 @@ static int clusterManagerCommandCheck(int argc, char **argv) {
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerShowInfo();
clusterManagerCheckCluster(0);
return 1;
return clusterManagerCheckCluster(0);
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandFix(int argc, char **argv) {
config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_FIX;
return clusterManagerCommandCheck(argc, argv);
}
static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;