mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Cluster Manager: fix command.
This commit is contained in:
parent
d8fc307cc6
commit
8969254e66
701
src/redis-cli.c
701
src/redis-cli.c
@ -151,6 +151,7 @@ static uint64_t dictSdsHash(const void *key);
|
|||||||
static int dictSdsKeyCompare(void *privdata, const void *key1,
|
static int dictSdsKeyCompare(void *privdata, const void *key1,
|
||||||
const void *key2);
|
const void *key2);
|
||||||
static void dictSdsDestructor(void *privdata, void *val);
|
static void dictSdsDestructor(void *privdata, void *val);
|
||||||
|
static void dictListDestructor(void *privdata, void *val);
|
||||||
|
|
||||||
/* Cluster Manager Command Info */
|
/* Cluster Manager Command Info */
|
||||||
typedef struct clusterManagerCommand {
|
typedef struct clusterManagerCommand {
|
||||||
@ -406,6 +407,12 @@ static void dictSdsDestructor(void *privdata, void *val)
|
|||||||
sdsfree(val);
|
sdsfree(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictListDestructor(void *privdata, void *val)
|
||||||
|
{
|
||||||
|
DICT_NOTUSED(privdata);
|
||||||
|
listRelease((list*)val);
|
||||||
|
}
|
||||||
|
|
||||||
/* _serverAssert is needed by dict */
|
/* _serverAssert is needed by dict */
|
||||||
void _serverAssert(const char *estr, const char *file, int line) {
|
void _serverAssert(const char *estr, const char *file, int line) {
|
||||||
fprintf(stderr, "=== ASSERTION FAILED ===");
|
fprintf(stderr, "=== ASSERTION FAILED ===");
|
||||||
@ -1446,6 +1453,15 @@ static void usage(void) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int confirmWithYes(char *msg) {
|
||||||
|
printf("%s (type 'yes' to accept): ", msg);
|
||||||
|
fflush(stdout);
|
||||||
|
char buf[4];
|
||||||
|
int nread = read(fileno(stdin),buf,4);
|
||||||
|
buf[3] = '\0';
|
||||||
|
return (nread != 0 && !strcmp("yes", buf));
|
||||||
|
}
|
||||||
|
|
||||||
/* Turn the plain C strings into Sds strings */
|
/* Turn the plain C strings into Sds strings */
|
||||||
static char **convertToSds(int count, char** args) {
|
static char **convertToSds(int count, char** args) {
|
||||||
int j;
|
int j;
|
||||||
@ -1751,7 +1767,7 @@ static int evalMode(int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*------------------------------------------------------------------------------
|
/*------------------------------------------------------------------------------
|
||||||
* Cluster Manager mode
|
* Cluster Manager
|
||||||
*--------------------------------------------------------------------------- */
|
*--------------------------------------------------------------------------- */
|
||||||
|
|
||||||
/* The Cluster Manager global structure */
|
/* The Cluster Manager global structure */
|
||||||
@ -1760,6 +1776,9 @@ static struct clusterManager {
|
|||||||
list *errors;
|
list *errors;
|
||||||
} cluster_manager;
|
} cluster_manager;
|
||||||
|
|
||||||
|
/* Used by clusterManagerFixSlotsCoverage */
|
||||||
|
dict *clusterManagerUncoveredSlots = NULL;
|
||||||
|
|
||||||
typedef struct clusterManagerNode {
|
typedef struct clusterManagerNode {
|
||||||
redisContext *context;
|
redisContext *context;
|
||||||
sds name;
|
sds name;
|
||||||
@ -1776,10 +1795,12 @@ typedef struct clusterManagerNode {
|
|||||||
int slots_count;
|
int slots_count;
|
||||||
int replicas_count;
|
int replicas_count;
|
||||||
list *friends;
|
list *friends;
|
||||||
sds *migrating;
|
sds *migrating; /* An array of sds where even strings are slots and odd
|
||||||
sds *importing;
|
* strings are the destination node IDs. */
|
||||||
int migrating_count;
|
sds *importing; /* An array of sds where even strings are slots and odd
|
||||||
int importing_count;
|
* strings are the source node IDs. */
|
||||||
|
int migrating_count; /* Length of the migrating array (migrating slots*2) */
|
||||||
|
int importing_count; /* Length of the importing array (importing slots*2) */
|
||||||
float weight; /* Weight used by rebalance */
|
float weight; /* Weight used by rebalance */
|
||||||
int balance; /* Used by rebalance */
|
int balance; /* Used by rebalance */
|
||||||
} clusterManagerNode;
|
} clusterManagerNode;
|
||||||
@ -1829,7 +1850,7 @@ static void clusterManagerShowNodes(void);
|
|||||||
static void clusterManagerShowInfo(void);
|
static void clusterManagerShowInfo(void);
|
||||||
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
|
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
|
||||||
static void clusterManagerWaitForClusterJoin(void);
|
static void clusterManagerWaitForClusterJoin(void);
|
||||||
static void clusterManagerCheckCluster(int quiet);
|
static int clusterManagerCheckCluster(int quiet);
|
||||||
static void clusterManagerLog(int level, const char* fmt, ...);
|
static void clusterManagerLog(int level, const char* fmt, ...);
|
||||||
static int clusterManagerIsConfigConsistent(void);
|
static int clusterManagerIsConfigConsistent(void);
|
||||||
static void clusterManagerOnError(sds err);
|
static void clusterManagerOnError(sds err);
|
||||||
@ -1846,6 +1867,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
|
|||||||
static int clusterManagerCommandCreate(int argc, char **argv);
|
static int clusterManagerCommandCreate(int argc, char **argv);
|
||||||
static int clusterManagerCommandInfo(int argc, char **argv);
|
static int clusterManagerCommandInfo(int argc, char **argv);
|
||||||
static int clusterManagerCommandCheck(int argc, char **argv);
|
static int clusterManagerCommandCheck(int argc, char **argv);
|
||||||
|
static int clusterManagerCommandFix(int argc, char **argv);
|
||||||
static int clusterManagerCommandReshard(int argc, char **argv);
|
static int clusterManagerCommandReshard(int argc, char **argv);
|
||||||
static int clusterManagerCommandRebalance(int argc, char **argv);
|
static int clusterManagerCommandRebalance(int argc, char **argv);
|
||||||
static int clusterManagerCommandCall(int argc, char **argv);
|
static int clusterManagerCommandCall(int argc, char **argv);
|
||||||
@ -1863,6 +1885,7 @@ clusterManagerCommandDef clusterManagerCommands[] = {
|
|||||||
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
||||||
"replicas <arg>"},
|
"replicas <arg>"},
|
||||||
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
||||||
|
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
|
||||||
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
||||||
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
||||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
|
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
|
||||||
@ -1988,6 +2011,8 @@ static void freeClusterManager(void) {
|
|||||||
listRelease(cluster_manager.errors);
|
listRelease(cluster_manager.errors);
|
||||||
cluster_manager.errors = NULL;
|
cluster_manager.errors = NULL;
|
||||||
}
|
}
|
||||||
|
if (clusterManagerUncoveredSlots != NULL)
|
||||||
|
dictRelease(clusterManagerUncoveredSlots);
|
||||||
}
|
}
|
||||||
|
|
||||||
static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
|
static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
|
||||||
@ -2013,6 +2038,38 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
|
|||||||
clusterManagerNodeResetSlots(node);
|
clusterManagerNodeResetSlots(node);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
/* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
|
||||||
|
* latest case, if 'err' arg is not NULL, it gets allocated with a copy
|
||||||
|
* of reply error (it's up to the caller function to free it), elsewhere
|
||||||
|
* the error is directly printed. */
|
||||||
|
static int clusterManagerCheckRedisReply(clusterManagerNode *n,
|
||||||
|
redisReply *r, char **err)
|
||||||
|
{
|
||||||
|
int is_err = 0;
|
||||||
|
if (!r || (is_err = (r->type == REDIS_REPLY_ERROR))) {
|
||||||
|
if (is_err) {
|
||||||
|
if (err != NULL) {
|
||||||
|
*err = zmalloc((r->len + 1) * sizeof(char));
|
||||||
|
strcpy(*err, r->str);
|
||||||
|
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, r->str);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clusterManagerRemoveNodeFromList(list *nodelist,
|
||||||
|
clusterManagerNode *node) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(nodelist, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
if (node == ln->value) {
|
||||||
|
listDelNode(nodelist, ln);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Return the node with the specified ID or NULL. */
|
/* Return the node with the specified ID or NULL. */
|
||||||
static clusterManagerNode *clusterManagerNodeByName(const char *name) {
|
static clusterManagerNode *clusterManagerNodeByName(const char *name) {
|
||||||
@ -2470,10 +2527,10 @@ cleanup:
|
|||||||
/* Set slot status to "importing" or "migrating" */
|
/* Set slot status to "importing" or "migrating" */
|
||||||
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
||||||
clusterManagerNode *node2,
|
clusterManagerNode *node2,
|
||||||
int slot, const char *mode, char **err) {
|
int slot, const char *status, char **err) {
|
||||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER "
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER "
|
||||||
"SETSLOT %d %s %s",
|
"SETSLOT %d %s %s",
|
||||||
slot, mode,
|
slot, status,
|
||||||
(char *) node2->name);
|
(char *) node2->name);
|
||||||
if (err != NULL) *err = NULL;
|
if (err != NULL) *err = NULL;
|
||||||
if (!reply) return 0;
|
if (!reply) return 0;
|
||||||
@ -2492,6 +2549,70 @@ cleanup:
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||||
|
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||||
|
* is not NULL, a dot will be printed for every migrated key. */
|
||||||
|
static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source,
|
||||||
|
clusterManagerNode *target,
|
||||||
|
redisReply *reply,
|
||||||
|
int replace, int timeout,
|
||||||
|
char *dots)
|
||||||
|
{
|
||||||
|
redisReply *migrate_reply = NULL;
|
||||||
|
char **argv = NULL;
|
||||||
|
size_t *argv_len = NULL;
|
||||||
|
int c = (replace ? 8 : 7);
|
||||||
|
size_t argc = c + reply->elements;
|
||||||
|
size_t i, offset = 6; // Keys Offset
|
||||||
|
argv = zcalloc(argc * sizeof(char *));
|
||||||
|
argv_len = zcalloc(argc * sizeof(size_t));
|
||||||
|
char portstr[255];
|
||||||
|
char timeoutstr[255];
|
||||||
|
snprintf(portstr, 10, "%d", target->port);
|
||||||
|
snprintf(timeoutstr, 10, "%d", timeout);
|
||||||
|
argv[0] = "MIGRATE";
|
||||||
|
argv_len[0] = 7;
|
||||||
|
argv[1] = target->ip;
|
||||||
|
argv_len[1] = strlen(target->ip);
|
||||||
|
argv[2] = portstr;
|
||||||
|
argv_len[2] = strlen(portstr);
|
||||||
|
argv[3] = "";
|
||||||
|
argv_len[3] = 0;
|
||||||
|
argv[4] = "0";
|
||||||
|
argv_len[4] = 1;
|
||||||
|
argv[5] = timeoutstr;
|
||||||
|
argv_len[5] = strlen(timeoutstr);
|
||||||
|
if (replace) {
|
||||||
|
argv[offset] = "REPLACE";
|
||||||
|
argv_len[offset] = 7;
|
||||||
|
offset++;
|
||||||
|
}
|
||||||
|
argv[offset] = "KEYS";
|
||||||
|
argv_len[offset] = 4;
|
||||||
|
offset++;
|
||||||
|
for (i = 0; i < reply->elements; i++) {
|
||||||
|
redisReply *entry = reply->element[i];
|
||||||
|
size_t idx = i + offset;
|
||||||
|
assert(entry->type == REDIS_REPLY_STRING);
|
||||||
|
argv[idx] = (char *) sdsnew(entry->str);
|
||||||
|
argv_len[idx] = entry->len;
|
||||||
|
if (dots) dots[i] = '.';
|
||||||
|
}
|
||||||
|
if (dots) dots[reply->elements] = '\0';
|
||||||
|
void *_reply = NULL;
|
||||||
|
redisAppendCommandArgv(source->context,argc,
|
||||||
|
(const char**)argv,argv_len);
|
||||||
|
int success = (redisGetReply(source->context, &_reply) == REDIS_OK);
|
||||||
|
for (i = 0; i < reply->elements; i++) sdsfree(argv[i + offset]);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
migrate_reply = (redisReply *) _reply;
|
||||||
|
cleanup:
|
||||||
|
zfree(argv);
|
||||||
|
zfree(argv_len);
|
||||||
|
return migrate_reply;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Migrate all keys in the given slot from source to target.*/
|
||||||
static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
||||||
clusterManagerNode *target,
|
clusterManagerNode *target,
|
||||||
int slot, int timeout,
|
int slot, int timeout,
|
||||||
@ -2499,10 +2620,11 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
char **err)
|
char **err)
|
||||||
{
|
{
|
||||||
int success = 1;
|
int success = 1;
|
||||||
|
int do_fix = (config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_FIX);
|
||||||
while (1) {
|
while (1) {
|
||||||
|
char *dots = NULL;
|
||||||
redisReply *reply = NULL, *migrate_reply = NULL;
|
redisReply *reply = NULL, *migrate_reply = NULL;
|
||||||
char **argv = NULL;
|
|
||||||
size_t *argv_len = NULL;
|
|
||||||
reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
|
reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
|
||||||
"GETKEYSINSLOT %d %d", slot,
|
"GETKEYSINSLOT %d %d", slot,
|
||||||
pipeline);
|
pipeline);
|
||||||
@ -2523,58 +2645,38 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
char *dots = (verbose ? zmalloc((count+1) * sizeof(char)) : NULL);
|
if (verbose) dots = zmalloc((count+1) * sizeof(char));
|
||||||
/* Calling MIGRATE command. */
|
/* Calling MIGRATE command. */
|
||||||
size_t argc = count + 8;
|
migrate_reply = clusterManagerMigrateKeysInReply(source, target,
|
||||||
argv = zcalloc(argc * sizeof(char *));
|
reply, 0, timeout,
|
||||||
argv_len = zcalloc(argc * sizeof(size_t));
|
dots);
|
||||||
char portstr[255];
|
if (migrate_reply == NULL) goto next;
|
||||||
char timeoutstr[255];
|
|
||||||
snprintf(portstr, 10, "%d", target->port);
|
|
||||||
snprintf(timeoutstr, 10, "%d", timeout);
|
|
||||||
argv[0] = "MIGRATE";
|
|
||||||
argv_len[0] = 7;
|
|
||||||
argv[1] = target->ip;
|
|
||||||
argv_len[1] = strlen(target->ip);
|
|
||||||
argv[2] = portstr;
|
|
||||||
argv_len[2] = strlen(portstr);
|
|
||||||
argv[3] = "";
|
|
||||||
argv_len[3] = 0;
|
|
||||||
argv[4] = "0";
|
|
||||||
argv_len[4] = 1;
|
|
||||||
argv[5] = timeoutstr;
|
|
||||||
argv_len[5] = strlen(timeoutstr);
|
|
||||||
argv[6] = "REPLACE";
|
|
||||||
argv_len[6] = 7;
|
|
||||||
argv[7] = "KEYS";
|
|
||||||
argv_len[7] = 4;
|
|
||||||
for (size_t i = 0; i < count; i++) {
|
|
||||||
redisReply *entry = reply->element[i];
|
|
||||||
size_t idx = i + 8;
|
|
||||||
assert(entry->type == REDIS_REPLY_STRING);
|
|
||||||
argv[idx] = (char *) sdsnew(entry->str);
|
|
||||||
argv_len[idx] = entry->len;
|
|
||||||
if (verbose) dots[i] = '.';
|
|
||||||
}
|
|
||||||
if (verbose) dots[count] = '\0';
|
|
||||||
void *_reply = NULL;
|
|
||||||
redisAppendCommandArgv(source->context,argc,
|
|
||||||
(const char**)argv,argv_len);
|
|
||||||
success = (redisGetReply(source->context, &_reply) == REDIS_OK);
|
|
||||||
for (size_t i = 0; i < count; i++) sdsfree(argv[i + 8]);
|
|
||||||
if (!success) goto next;
|
|
||||||
migrate_reply = (redisReply *) _reply;
|
|
||||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||||
// TODO: Implement fix.
|
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
||||||
success = 0;
|
clusterManagerLogWarn("*** Target key exists. "
|
||||||
if (err != NULL) {
|
"Replacing it for FIX.\n");
|
||||||
|
freeReplyObject(migrate_reply);
|
||||||
|
/* Try to migrate keys adding REPLACE option. */
|
||||||
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
|
target,
|
||||||
|
reply,
|
||||||
|
1, timeout,
|
||||||
|
NULL);
|
||||||
|
success = (migrate_reply != NULL &&
|
||||||
|
migrate_reply->type != REDIS_REPLY_ERROR);
|
||||||
|
} else success = 0;
|
||||||
|
if (!success) {
|
||||||
|
if (migrate_reply != NULL) {
|
||||||
|
if (err) {
|
||||||
*err = zmalloc((migrate_reply->len + 1) * sizeof(char));
|
*err = zmalloc((migrate_reply->len + 1) * sizeof(char));
|
||||||
strcpy(*err, migrate_reply->str);
|
strcpy(*err, migrate_reply->str);
|
||||||
|
}
|
||||||
printf("\n");
|
printf("\n");
|
||||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
|
||||||
}
|
}
|
||||||
goto next;
|
goto next;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
printf("%s", dots);
|
printf("%s", dots);
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
@ -2582,8 +2684,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
next:
|
next:
|
||||||
if (reply != NULL) freeReplyObject(reply);
|
if (reply != NULL) freeReplyObject(reply);
|
||||||
if (migrate_reply != NULL) freeReplyObject(migrate_reply);
|
if (migrate_reply != NULL) freeReplyObject(migrate_reply);
|
||||||
zfree(argv);
|
if (dots) zfree(dots);
|
||||||
zfree(argv_len);
|
|
||||||
if (!success) break;
|
if (!success) break;
|
||||||
}
|
}
|
||||||
return success;
|
return success;
|
||||||
@ -2729,6 +2830,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
|
|||||||
char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL,
|
char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL,
|
||||||
*ping_sent = NULL, *ping_recv = NULL, *config_epoch = NULL,
|
*ping_sent = NULL, *ping_recv = NULL, *config_epoch = NULL,
|
||||||
*link_status = NULL;
|
*link_status = NULL;
|
||||||
|
UNUSED(link_status);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while ((p = strchr(line, ' ')) != NULL) {
|
while ((p = strchr(line, ' ')) != NULL) {
|
||||||
*p = '\0';
|
*p = '\0';
|
||||||
@ -2974,11 +3076,11 @@ int clusterManagerCompareNodeBalance(const void *n1, const void *n2) {
|
|||||||
static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
||||||
sds signature = NULL;
|
sds signature = NULL;
|
||||||
int node_count = 0, i = 0, name_len = 0;
|
int node_count = 0, i = 0, name_len = 0;
|
||||||
|
char **node_configs = NULL;
|
||||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
|
||||||
if (reply == NULL || reply->type == REDIS_REPLY_ERROR)
|
if (reply == NULL || reply->type == REDIS_REPLY_ERROR)
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
char *lines = reply->str, *p, *line;
|
char *lines = reply->str, *p, *line;
|
||||||
char **node_configs = NULL;
|
|
||||||
while ((p = strstr(lines, "\n")) != NULL) {
|
while ((p = strstr(lines, "\n")) != NULL) {
|
||||||
i = 0;
|
i = 0;
|
||||||
*p = '\0';
|
*p = '\0';
|
||||||
@ -3057,8 +3159,10 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
|||||||
}
|
}
|
||||||
cleanup:
|
cleanup:
|
||||||
if (reply != NULL) freeReplyObject(reply);
|
if (reply != NULL) freeReplyObject(reply);
|
||||||
|
if (node_configs != NULL) {
|
||||||
for (i = 0; i < node_count; i++) zfree(node_configs[i]);
|
for (i = 0; i < node_count; i++) zfree(node_configs[i]);
|
||||||
zfree(node_configs);
|
zfree(node_configs);
|
||||||
|
}
|
||||||
return signature;
|
return signature;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3114,9 +3218,453 @@ static int clusterManagerGetCoveredSlots(char *all_slots) {
|
|||||||
return totslots;
|
return totslots;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clusterManagerCheckCluster(int quiet) {
|
static void clusterManagerPrintSlotsList(list *slots) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(slots, &li);
|
||||||
|
sds first = NULL;
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
sds slot = ln->value;
|
||||||
|
if (!first) first = slot;
|
||||||
|
else printf(", ");
|
||||||
|
printf("%s", slot);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return the node, among 'nodes' with the greatest number of keys
|
||||||
|
* in the specified slot. */
|
||||||
|
static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes,
|
||||||
|
int slot,
|
||||||
|
char **err)
|
||||||
|
{
|
||||||
|
clusterManagerNode *node = NULL;
|
||||||
|
int numkeys = 0;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(nodes, &li);
|
||||||
|
if (err) *err = NULL;
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
|
||||||
|
continue;
|
||||||
|
redisReply *r =
|
||||||
|
CLUSTER_MANAGER_COMMAND(n, "CLUSTER COUNTKEYSINSLOTi %d", slot);
|
||||||
|
int success = clusterManagerCheckRedisReply(n, r, err);
|
||||||
|
if (success) {
|
||||||
|
if (r->integer > numkeys || node == NULL) {
|
||||||
|
numkeys = r->integer;
|
||||||
|
node = n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (r != NULL) freeReplyObject(r);
|
||||||
|
/* If the reply contains errors */
|
||||||
|
if (!success) {
|
||||||
|
if (err != NULL && *err != NULL)
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
|
||||||
|
node = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||||
|
int i, fixed = 0;
|
||||||
|
list *none = NULL, *single = NULL, *multi = NULL;
|
||||||
|
clusterManagerLogInfo(">>> Fixing slots coverage...\n");
|
||||||
|
printf("List of not covered slots: \n");
|
||||||
|
int uncovered_count = 0;
|
||||||
|
sds log = sdsempty();
|
||||||
|
for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
|
||||||
|
int covered = all_slots[i];
|
||||||
|
if (!covered) {
|
||||||
|
sds key = sdsfromlonglong((long long) i);
|
||||||
|
if (uncovered_count++ > 0) printf(",");
|
||||||
|
printf("%s", (char *) key);
|
||||||
|
list *slot_nodes = listCreate();
|
||||||
|
sds slot_nodes_str = sdsempty();
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
|
||||||
|
continue;
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER GETKEYSINSLOT %d %d", i, 1);
|
||||||
|
if (!clusterManagerCheckRedisReply(n, reply, NULL)) {
|
||||||
|
fixed = -1;
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
assert(reply->type == REDIS_REPLY_ARRAY);
|
||||||
|
if (reply->elements > 0) {
|
||||||
|
listAddNodeTail(slot_nodes, n);
|
||||||
|
if (listLength(slot_nodes) > 1)
|
||||||
|
slot_nodes_str = sdscat(slot_nodes_str, ", ");
|
||||||
|
slot_nodes_str = sdscatfmt(slot_nodes_str,
|
||||||
|
"%s:%u", n->ip, n->port);
|
||||||
|
}
|
||||||
|
freeReplyObject(reply);
|
||||||
|
}
|
||||||
|
log = sdscatfmt(log, "\nSlot %S has keys in %u nodes: %S",
|
||||||
|
key, listLength(slot_nodes), slot_nodes_str);
|
||||||
|
sdsfree(slot_nodes_str);
|
||||||
|
dictAdd(clusterManagerUncoveredSlots, key, slot_nodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("\n%s\n", log);
|
||||||
|
/* For every slot, take action depending on the actual condition:
|
||||||
|
* 1) No node has keys for this slot.
|
||||||
|
* 2) A single node has keys for this slot.
|
||||||
|
* 3) Multiple nodes have keys for this slot. */
|
||||||
|
none = listCreate();
|
||||||
|
single = listCreate();
|
||||||
|
multi = listCreate();
|
||||||
|
dictIterator *iter = dictGetIterator(clusterManagerUncoveredSlots);
|
||||||
|
dictEntry *entry;
|
||||||
|
while ((entry = dictNext(iter)) != NULL) {
|
||||||
|
sds slot = (sds) dictGetKey(entry);
|
||||||
|
list *nodes = (list *) dictGetVal(entry);
|
||||||
|
switch (listLength(nodes)){
|
||||||
|
case 0: listAddNodeTail(none, slot); break;
|
||||||
|
case 1: listAddNodeTail(single, slot); break;
|
||||||
|
default: listAddNodeTail(multi, slot); break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dictReleaseIterator(iter);
|
||||||
|
|
||||||
|
/* Handle case "1": keys in no node. */
|
||||||
|
if (listLength(none) > 0) {
|
||||||
|
printf("The following uncovered slots have no keys "
|
||||||
|
"across the cluster:\n");
|
||||||
|
clusterManagerPrintSlotsList(none);
|
||||||
|
if (confirmWithYes("Fix these slots by covering with a random node?")){
|
||||||
|
srand(time(NULL));
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(none, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
sds slot = ln->value;
|
||||||
|
long idx = (long) (rand() % listLength(cluster_manager.nodes));
|
||||||
|
listNode *node_n = listIndex(cluster_manager.nodes, idx);
|
||||||
|
assert(node_n != NULL);
|
||||||
|
clusterManagerNode *n = node_n->value;
|
||||||
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
|
slot, n->ip, n->port);
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
|
fixed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Handle case "2": keys only in one node. */
|
||||||
|
if (listLength(single) > 0) {
|
||||||
|
printf("The following uncovered slots have keys in just one node:\n");
|
||||||
|
clusterManagerPrintSlotsList(single);
|
||||||
|
if (confirmWithYes("Fix these slots by covering with those nodes?")){
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(single, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
sds slot = ln->value;
|
||||||
|
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||||
|
assert(entry != NULL);
|
||||||
|
list *nodes = (list *) dictGetVal(entry);
|
||||||
|
listNode *fn = listFirst(nodes);
|
||||||
|
assert(fn != NULL);
|
||||||
|
clusterManagerNode *n = fn->value;
|
||||||
|
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||||
|
slot, n->ip, n->port);
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
|
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
|
fixed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Handle case "3": keys in multiple nodes. */
|
||||||
|
if (listLength(multi) > 0) {
|
||||||
|
printf("The folowing uncovered slots have keys in multiple nodes:\n");
|
||||||
|
clusterManagerPrintSlotsList(multi);
|
||||||
|
if (confirmWithYes("Fix these slots by moving keys "
|
||||||
|
"into a single node?")) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(multi, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
sds slot = ln->value;
|
||||||
|
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||||
|
assert(entry != NULL);
|
||||||
|
list *nodes = (list *) dictGetVal(entry);
|
||||||
|
int s = atoi(slot);
|
||||||
|
clusterManagerNode *target =
|
||||||
|
clusterManagerGetNodeWithMostKeysInSlot(nodes, s, NULL);
|
||||||
|
if (target == NULL) {
|
||||||
|
fixed = -1;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||||
|
"to %s:%d\n", slot,
|
||||||
|
target->ip, target->port);
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||||
|
"CLUSTER ADDSLOTS %s", slot);
|
||||||
|
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
|
r = CLUSTER_MANAGER_COMMAND(target,
|
||||||
|
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||||
|
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
|
listIter nli;
|
||||||
|
listNode *nln;
|
||||||
|
listRewind(nodes, &nli);
|
||||||
|
while ((nln = listNext(&nli)) != NULL) {
|
||||||
|
clusterManagerNode *src = nln->value;
|
||||||
|
if (src == target) continue;
|
||||||
|
/* Set the source node in 'importing' state
|
||||||
|
* (even if we will actually migrate keys away)
|
||||||
|
* in order to avoid receiving redirections
|
||||||
|
* for MIGRATE. */
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||||
|
"CLUSTER SETSLOT %s %s %s", slot,
|
||||||
|
"IMPORTING", target->name);
|
||||||
|
if (!clusterManagerCheckRedisReply(target, r, NULL))
|
||||||
|
fixed = -1;
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (fixed < 0) goto cleanup;
|
||||||
|
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||||
|
CLUSTER_MANAGER_OPT_COLD;
|
||||||
|
if (!clusterManagerMoveSlot(src, target, s, opts, NULL)) {
|
||||||
|
fixed = -1;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fixed++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
sdsfree(log);
|
||||||
|
if (none) listRelease(none);
|
||||||
|
if (single) listRelease(single);
|
||||||
|
if (multi) listRelease(multi);
|
||||||
|
return fixed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Slot 'slot' was found to be in importing or migrating state in one or
|
||||||
|
* more nodes. This function fixes this condition by migrating keys where
|
||||||
|
* it seems more sensible. */
|
||||||
|
static int clusterManagerFixOpenSlot(int slot) {
|
||||||
|
clusterManagerLogInfo(">>> Fixing open slot %d\n", slot);
|
||||||
|
/* Try to obtain the current slot owner, according to the current
|
||||||
|
* nodes configuration. */
|
||||||
|
int success = 1;
|
||||||
|
list *owners = listCreate();
|
||||||
|
list *migrating = listCreate();
|
||||||
|
list *importing = listCreate();
|
||||||
|
sds migrating_str = sdsempty();
|
||||||
|
sds importing_str = sdsempty();
|
||||||
|
clusterManagerNode *owner = NULL;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
|
if (n->slots[slot]) {
|
||||||
|
if (owner == NULL) owner = n;
|
||||||
|
listAddNodeTail(owners, n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||||
|
if (n->migrating) {
|
||||||
|
for (int i = 0; i < n->migrating_count; i += 2) {
|
||||||
|
sds migrating_slot = n->migrating[i];
|
||||||
|
if (atoi(migrating_slot) == slot) {
|
||||||
|
char *sep = (listLength(migrating) == 0 ? "" : ",");
|
||||||
|
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
|
||||||
|
sep, n->ip, n->port);
|
||||||
|
listAddNodeTail(migrating, n);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (n->importing) {
|
||||||
|
for (int i = 0; i < n->importing_count; i += 2) {
|
||||||
|
sds importing_slot = n->importing[i];
|
||||||
|
if (atoi(importing_slot) == slot) {
|
||||||
|
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||||
|
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
||||||
|
sep, n->ip, n->port);
|
||||||
|
listAddNodeTail(importing, n);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("Set as migrating in: %s\n", migrating_str);
|
||||||
|
printf("Set as importing in: %s\n", importing_str);
|
||||||
|
/* If there is no slot owner, set as owner the slot with the biggest
|
||||||
|
* number of keys, among the set of migrating / importing nodes. */
|
||||||
|
if (owner == NULL) {
|
||||||
|
clusterManagerLogInfo(">>> Nobody claims ownership, "
|
||||||
|
"selecting an owner...\n");
|
||||||
|
owner = clusterManagerGetNodeWithMostKeysInSlot(cluster_manager.nodes,
|
||||||
|
slot, NULL);
|
||||||
|
// If we still don't have an owner, we can't fix it.
|
||||||
|
if (owner == NULL) {
|
||||||
|
clusterManagerLogErr("[ERR] Can't select a slot owner. "
|
||||||
|
"Impossible to fix.\n");
|
||||||
|
success = 0;
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use ADDSLOTS to assign the slot.
|
||||||
|
printf("*** Configuring %s:%d as the slot owner\n", owner->ip,
|
||||||
|
owner->port);
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
|
||||||
|
"SETSLOT %d %s",
|
||||||
|
slot, "STABLE");
|
||||||
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||||
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
/* Make sure this information will propagate. Not strictly needed
|
||||||
|
* since there is no past owner, so all the other nodes will accept
|
||||||
|
* whatever epoch this node will claim the slot with. */
|
||||||
|
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||||
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
/* Remove the owner from the list of migrating/importing
|
||||||
|
* nodes. */
|
||||||
|
clusterManagerRemoveNodeFromList(migrating, owner);
|
||||||
|
clusterManagerRemoveNodeFromList(importing, owner);
|
||||||
|
}
|
||||||
|
/* If there are multiple owners of the slot, we need to fix it
|
||||||
|
* so that a single node is the owner and all the other nodes
|
||||||
|
* are in importing state. Later the fix can be handled by one
|
||||||
|
* of the base cases above.
|
||||||
|
*
|
||||||
|
* Note that this case also covers multiple nodes having the slot
|
||||||
|
* in migrating state, since migrating is a valid state only for
|
||||||
|
* slot owners. */
|
||||||
|
if (listLength(owners) > 1) {
|
||||||
|
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
|
||||||
|
listRewind(owners, &li);
|
||||||
|
redisReply *reply = NULL;
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n == owner) continue;
|
||||||
|
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot);
|
||||||
|
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
|
||||||
|
listAddNodeTail(importing, n);
|
||||||
|
}
|
||||||
|
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||||
|
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||||
|
if (reply) freeReplyObject(reply);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
|
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
|
||||||
|
/* Case 1: The slot is in migrating state in one slot, and in
|
||||||
|
* importing state in 1 slot. That's trivial to address. */
|
||||||
|
if (listLength(migrating) == 1 && listLength(importing) == 1) {
|
||||||
|
clusterManagerNode *src = listFirst(migrating)->value;
|
||||||
|
clusterManagerNode *dst = listFirst(importing)->value;
|
||||||
|
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||||
|
}
|
||||||
|
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||||
|
* they probably got keys about the slot after a restart so opened
|
||||||
|
* the slot. In this case we just move all the keys to the owner
|
||||||
|
* according to the configuration. */
|
||||||
|
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
|
||||||
|
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
|
||||||
|
"owner %s:%d\n", slot, owner->ip, owner->port);
|
||||||
|
move_opts |= CLUSTER_MANAGER_OPT_COLD;
|
||||||
|
listRewind(importing, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *n = ln->value;
|
||||||
|
if (n == owner) continue;
|
||||||
|
success = clusterManagerMoveSlot(n, owner, slot, move_opts, NULL);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
||||||
|
"%s:%d\n", slot, n->ip, n->port);
|
||||||
|
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||||
|
slot, "STABLE");
|
||||||
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||||
|
listLength(migrating) == 1);
|
||||||
|
if (try_to_close_slot) {
|
||||||
|
clusterManagerNode *n = listFirst(migrating)->value;
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||||
|
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
|
||||||
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
|
if (r) {
|
||||||
|
if (success) try_to_close_slot = (r->elements == 0);
|
||||||
|
freeReplyObject(r);
|
||||||
|
}
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
}
|
||||||
|
/* Case 3: There are no slots claiming to be in importing state, but
|
||||||
|
* there is a migrating node that actually don't have any key. We
|
||||||
|
* can just close the slot, probably a reshard interrupted in the middle. */
|
||||||
|
if (try_to_close_slot) {
|
||||||
|
clusterManagerNode *n = listFirst(migrating)->value;
|
||||||
|
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||||
|
slot, "STABLE");
|
||||||
|
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||||
|
if (r) freeReplyObject(r);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
} else {
|
||||||
|
success = 0;
|
||||||
|
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
|
||||||
|
"yet (work in progress). Slot is set as "
|
||||||
|
"migrating in %s, as importing in %s, "
|
||||||
|
"owner is %s:%d\n", migrating_str,
|
||||||
|
importing_str, owner->ip, owner->port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
listRelease(owners);
|
||||||
|
listRelease(migrating);
|
||||||
|
listRelease(importing);
|
||||||
|
sdsfree(migrating_str);
|
||||||
|
sdsfree(importing_str);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int clusterManagerCheckCluster(int quiet) {
|
||||||
listNode *ln = listFirst(cluster_manager.nodes);
|
listNode *ln = listFirst(cluster_manager.nodes);
|
||||||
if (!ln) return;
|
if (!ln) return 0;
|
||||||
|
int result = 1;
|
||||||
|
int do_fix = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
clusterManagerNode *node = ln->value;
|
clusterManagerNode *node = ln->value;
|
||||||
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
||||||
node->ip, node->port);
|
node->ip, node->port);
|
||||||
@ -3124,6 +3672,7 @@ static void clusterManagerCheckCluster(int quiet) {
|
|||||||
if (!clusterManagerIsConfigConsistent()) {
|
if (!clusterManagerIsConfigConsistent()) {
|
||||||
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
||||||
clusterManagerOnError(err);
|
clusterManagerOnError(err);
|
||||||
|
result = 0;
|
||||||
} else {
|
} else {
|
||||||
clusterManagerLogOk("[OK] All nodes agree about slots "
|
clusterManagerLogOk("[OK] All nodes agree about slots "
|
||||||
"configuration.\n");
|
"configuration.\n");
|
||||||
@ -3174,6 +3723,7 @@ static void clusterManagerCheckCluster(int quiet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (open_slots != NULL) {
|
if (open_slots != NULL) {
|
||||||
|
result = 0;
|
||||||
dictIterator *iter = dictGetIterator(open_slots);
|
dictIterator *iter = dictGetIterator(open_slots);
|
||||||
dictEntry *entry;
|
dictEntry *entry;
|
||||||
sds errstr = sdsnew("[WARNING] The following slots are open: ");
|
sds errstr = sdsnew("[WARNING] The following slots are open: ");
|
||||||
@ -3185,6 +3735,17 @@ static void clusterManagerCheckCluster(int quiet) {
|
|||||||
}
|
}
|
||||||
clusterManagerLogErr("%s.\n", (char *) errstr);
|
clusterManagerLogErr("%s.\n", (char *) errstr);
|
||||||
sdsfree(errstr);
|
sdsfree(errstr);
|
||||||
|
if (do_fix) {
|
||||||
|
// Fix open slots.
|
||||||
|
dictReleaseIterator(iter);
|
||||||
|
iter = dictGetIterator(open_slots);
|
||||||
|
while ((entry = dictNext(iter)) != NULL) {
|
||||||
|
sds slot = (sds) dictGetKey(entry);
|
||||||
|
result = clusterManagerFixOpenSlot(atoi(slot));
|
||||||
|
if (!result) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dictReleaseIterator(iter);
|
||||||
dictRelease(open_slots);
|
dictRelease(open_slots);
|
||||||
}
|
}
|
||||||
clusterManagerLogInfo(">>> Check slots coverage...\n");
|
clusterManagerLogInfo(">>> Check slots coverage...\n");
|
||||||
@ -3200,7 +3761,16 @@ static void clusterManagerCheckCluster(int quiet) {
|
|||||||
"covered by nodes.\n",
|
"covered by nodes.\n",
|
||||||
CLUSTER_MANAGER_SLOTS);
|
CLUSTER_MANAGER_SLOTS);
|
||||||
clusterManagerOnError(err);
|
clusterManagerOnError(err);
|
||||||
|
result = 0;
|
||||||
|
if (do_fix/* && result*/) {
|
||||||
|
dictType dtype = clusterManagerDictType;
|
||||||
|
dtype.valDestructor = dictListDestructor;
|
||||||
|
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
|
||||||
|
int fixed = clusterManagerFixSlotsCoverage(slots);
|
||||||
|
if (fixed > 0) result = 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static clusterManagerNode *clusterNodeForResharding(char *id,
|
static clusterManagerNode *clusterNodeForResharding(char *id,
|
||||||
@ -3546,12 +4116,7 @@ assign_replicas:
|
|||||||
}
|
}
|
||||||
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
|
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
|
||||||
clusterManagerShowNodes();
|
clusterManagerShowNodes();
|
||||||
printf("Can I set the above configuration? %s", "(type 'yes' to accept): ");
|
if (confirmWithYes("Can I set the above configuration?")) {
|
||||||
fflush(stdout);
|
|
||||||
char buf[4];
|
|
||||||
int nread = read(fileno(stdin),buf,4);
|
|
||||||
buf[3] = '\0';
|
|
||||||
if (nread != 0 && !strcmp("yes", buf)) {
|
|
||||||
listRewind(cluster_manager.nodes, &li);
|
listRewind(cluster_manager.nodes, &li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
clusterManagerNode *node = ln->value;
|
clusterManagerNode *node = ln->value;
|
||||||
@ -3674,13 +4239,17 @@ static int clusterManagerCommandCheck(int argc, char **argv) {
|
|||||||
clusterManagerNode *node = clusterManagerNewNode(ip, port);
|
clusterManagerNode *node = clusterManagerNewNode(ip, port);
|
||||||
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
|
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
|
||||||
clusterManagerShowInfo();
|
clusterManagerShowInfo();
|
||||||
clusterManagerCheckCluster(0);
|
return clusterManagerCheckCluster(0);
|
||||||
return 1;
|
|
||||||
invalid_args:
|
invalid_args:
|
||||||
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
|
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int clusterManagerCommandFix(int argc, char **argv) {
|
||||||
|
config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
|
return clusterManagerCommandCheck(argc, argv);
|
||||||
|
}
|
||||||
|
|
||||||
static int clusterManagerCommandReshard(int argc, char **argv) {
|
static int clusterManagerCommandReshard(int argc, char **argv) {
|
||||||
int port = 0;
|
int port = 0;
|
||||||
char *ip = NULL;
|
char *ip = NULL;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user