Cluster Manager: reshard command, fixed slots

parsing bug and other minor bugs.
This commit is contained in:
artix 2018-02-28 10:44:11 +01:00
parent 7d609ff952
commit 99da9c9508

View File

@ -69,6 +69,13 @@
#define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
#define CLUSTER_MANAGER_SLOTS 16384
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10
#define CLUSTER_MANAGER_INVALID_HOST_ARG \
"Invalid arguments: you need to pass either a valid " \
"address (ie. 120.0.0.1:7000) or space separated IP " \
"and port (ie. 120.0.0.1 7000)\n"
#define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL)
#define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1))
#define CLUSTER_MANAGER_NODE_CONNECT(n) \
@ -103,9 +110,14 @@
#define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0
#define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1
#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
#define CLUSTER_MANAGER_OPT_UPDATE 1 << 2
#define CLUSTER_MANAGER_OPT_QUIET 1 << 6
#define CLUSTER_MANAGER_OPT_VERBOSE 1 << 7
#define CLUSTER_MANAGER_LOG_LVL_INFO 1
#define CLUSTER_MANAGER_LOG_LVL_WARN 2
@ -143,6 +155,11 @@ typedef struct clusterManagerCommand {
char **argv;
int flags;
int replicas;
char *from;
char *to;
int slots;
int timeout;
int pipeline;
} clusterManagerCommand;
static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
@ -1261,6 +1278,19 @@ static int parseOptions(int argc, char **argv) {
usage();
} else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
config.cluster_manager_command.replicas = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-from") && !lastarg) {
config.cluster_manager_command.from = argv[++i];
} else if (!strcmp(argv[i],"--cluster-to") && !lastarg) {
config.cluster_manager_command.to = argv[++i];
} else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) {
config.cluster_manager_command.slots = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) {
config.cluster_manager_command.timeout = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) {
config.cluster_manager_command.pipeline = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-yes")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_YES;
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
sds version = cliVersion();
printf("redis-cli %s\n", version);
@ -1358,7 +1388,7 @@ static void usage(void) {
" --ldb-sync-mode Like --ldb but uses the synchronous Lua debugger, in\n"
" this mode the server is blocked and script changes are\n"
" are not rolled back from the server memory.\n"
" --cluster <command> [args...]\n"
" --cluster <command> [args...] [opts...]\n"
" Cluster Manager command and arguments (see below).\n"
" --help Output this help and exit.\n"
" --version Output version and exit.\n"
@ -1729,6 +1759,12 @@ typedef struct clusterManagerNodeArray {
int count; /* Non-NULL nodes count */
} clusterManagerNodeArray;
/* Used for reshard table. */
typedef struct clusterManagerReshardTableItem {
clusterManagerNode *source;
int slot;
} clusterManagerReshardTableItem;
static dictType clusterManagerDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
@ -1754,7 +1790,7 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
int ip_count, clusterManagerNode ***offending, int *offending_len);
static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
int ip_count);
static sds clusterManagerNodeInfo(clusterManagerNode *node);
static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent);
static void clusterManagerShowNodes(void);
static void clusterManagerShowInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
@ -1776,6 +1812,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandReshard(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv);
@ -1789,9 +1826,11 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"cluster-replicas"},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
{"call", clusterManagerCommandCall, -2,
"host:port command arg arg .. arg", NULL},
{"help", clusterManagerCommandHelp, 0, NULL, NULL}
@ -1829,6 +1868,38 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) {
return proc;
}
/* Get host ip and port from command arguments. If only one argument has
* been provided it must be in the form of 'ip:port', elsewhere
* the first argument must be the ip and the second one the port.
* If host and port can be detected, it returns 1 and it stores host and
* port into variables referenced by'ip_ptr' and 'port_ptr' pointers,
* elsewhere it returns 0. */
static int getClusterHostFromCmdArgs(int argc, char **argv,
char **ip_ptr, int *port_ptr) {
int port = 0;
char *ip = NULL;
if (argc == 1) {
char *addr = argv[0];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c != NULL) {
*c = '\0';
ip = addr;
port = atoi(++c);
} else return 0;
} else {
ip = argv[0];
port = atoi(argv[1]);
}
if (!ip || !port) return 0;
else {
*ip_ptr = ip;
*port_ptr = port;
}
return 1;
}
static void freeClusterManagerNode(clusterManagerNode *node) {
if (node->context != NULL) redisFree(node->context);
if (node->friends != NULL) {
@ -2188,8 +2259,12 @@ static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
return slots;
}
static sds clusterManagerNodeInfo(clusterManagerNode *node) {
static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
sds info = sdsempty();
sds spaces = sdsempty();
int i;
for (i = 0; i < indent; i++) spaces = sdscat(spaces, " ");
if (indent) info = sdscat(info, spaces);
int is_master = !(node->flags & CLUSTER_MANAGER_FLAG_SLAVE);
char *role = (is_master ? "M" : "S");
sds slots = NULL;
@ -2198,17 +2273,18 @@ static sds clusterManagerNodeInfo(clusterManagerNode *node) {
else {
slots = clusterManagerNodeSlotsString(node);
info = sdscatfmt(info, "%s: %S %s:%u\n"
" slots:%S (%u slots) "
"%s slots:%S (%u slots) "
"", //TODO: flags string
role, node->name, node->ip, node->port,
role, node->name, node->ip, node->port, spaces,
slots, node->slots_count);
sdsfree(slots);
}
if (node->replicate != NULL)
info = sdscatfmt(info, "\n replicates %S", node->replicate);
info = sdscatfmt(info, "\n%s replicates %S", spaces, node->replicate);
else if (node->replicas_count)
info = sdscatfmt(info, "\n %U additional replica(s)",
node->replicas_count);
info = sdscatfmt(info, "\n%s %U additional replica(s)",
spaces, node->replicas_count);
sdsfree(spaces);
return info;
}
@ -2218,7 +2294,7 @@ static void clusterManagerShowNodes(void) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
sds info = clusterManagerNodeInfo(node);
sds info = clusterManagerNodeInfo(node, 0);
printf("%s\n", info);
sdsfree(info);
}
@ -2306,7 +2382,7 @@ static int clusterManagerAddSlots(clusterManagerNode *node, char**err)
argvlen[i] = sdslen(argv[i]);
redisAppendCommandArgv(node->context,argc,(const char**)argv,argvlen);
if (redisGetReply(node->context, &_reply) != REDIS_OK) {
success = 1;
success = 0;
goto cleanup;
}
reply = (redisReply*) _reply;
@ -2326,6 +2402,193 @@ cleanup:
return success;
}
/* Set slot status to "importing" or "migrating" */
static int clusterManagerSetSlot(clusterManagerNode *node1,
clusterManagerNode *node2,
int slot, const char *mode, char **err) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER "
"SETSLOT %d %s %s",
slot, mode,
(char *) node2->name);
if (err != NULL) *err = NULL;
if (!reply) return 0;
if (reply->type == REDIS_REPLY_ERROR) {
if (err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
}
return 0;
}
return 1;
}
static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
clusterManagerNode *target,
int slot, int timeout,
int pipeline, int verbose,
char **err)
{
int success = 1;
while (1) {
redisReply *reply = NULL, *migrate_reply = NULL;
char **argv = NULL;
size_t *argv_len = NULL;
reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
"GETKEYSINSLOT %d %d", slot,
pipeline);
success = (reply != NULL);
if (!success) return 0;
if (reply->type == REDIS_REPLY_ERROR) {
success = 0;
if (err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
}
goto next;
}
assert(reply->type == REDIS_REPLY_ARRAY);
size_t count = reply->elements;
if (count == 0) {
freeReplyObject(reply);
break;
}
char *dots = (verbose ? zmalloc((count+1) * sizeof(char)) : NULL);
/* Calling MIGRATE command. */
size_t argc = count + 8;
argv = zcalloc(argc * sizeof(char *));
argv_len = zcalloc(argc * sizeof(size_t));
char portstr[255];
char timeoutstr[255];
snprintf(portstr, 10, "%d", target->port);
snprintf(timeoutstr, 10, "%d", timeout);
argv[0] = "MIGRATE";
argv_len[0] = 7;
argv[1] = target->ip;
argv_len[1] = strlen(target->ip);
argv[2] = portstr;
argv_len[2] = strlen(portstr);
argv[3] = "";
argv_len[3] = 0;
argv[4] = "0";
argv_len[4] = 1;
argv[5] = timeoutstr;
argv_len[5] = strlen(timeoutstr);
argv[6] = "REPLACE";
argv_len[6] = 7;
argv[7] = "KEYS";
argv_len[7] = 4;
for (size_t i = 0; i < count; i++) {
redisReply *entry = reply->element[i];
size_t idx = i + 8;
assert(entry->type == REDIS_REPLY_STRING);
argv[idx] = (char *) sdsnew(entry->str);
argv_len[idx] = entry->len;
if (verbose) dots[i] = '.';
}
if (verbose) dots[count] = '\0';
void *_reply = NULL;
redisAppendCommandArgv(source->context,argc,
(const char**)argv,argv_len);
success = (redisGetReply(source->context, &_reply) == REDIS_OK);
for (size_t i = 0; i < count; i++) sdsfree(argv[i + 8]);
if (!success) goto next;
migrate_reply = (redisReply *) _reply;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
// TODO: Implement fix.
success = 0;
if (err != NULL) {
*err = zmalloc((migrate_reply->len + 1) * sizeof(char));
strcpy(*err, migrate_reply->str);
printf("\n");
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
}
goto next;
}
if (verbose) {
printf("%s", dots);
fflush(stdout);
}
next:
if (reply != NULL) freeReplyObject(reply);
if (migrate_reply != NULL) freeReplyObject(migrate_reply);
zfree(argv);
zfree(argv_len);
if (!success) break;
}
return success;
}
/* Move slots between source and target nodes using MIGRATE.
*
* Options:
* CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key.
* CLUSTER_MANAGER_OPT_COLD -- Move keys without opening slots /
* reconfiguring the nodes.
* CLUSTER_MANAGER_OPT_UPDATE -- Update node->slots for source/target nodes.
* CLUSTER_MANAGER_OPT_QUIET -- Don't print info messages.
*/
static int clusterManagerMoveSlot(clusterManagerNode *source,
clusterManagerNode *target,
int slot, int opts, char**err)
{
if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) {
printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip,
source->port, target->ip, target->port);
fflush(stdout);
}
if (err != NULL) *err = NULL;
int pipeline = config.cluster_manager_command.pipeline,
timeout = config.cluster_manager_command.timeout,
print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE),
option_cold = (opts & CLUSTER_MANAGER_OPT_COLD),
success = 1;
if (!option_cold) {
success = clusterManagerSetSlot(target, source, slot,
"importing", err);
if (!success) return 0;
success = clusterManagerSetSlot(source, target, slot,
"migrating", err);
if (!success) return 0;
}
success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout,
pipeline, print_dots, err);
if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n");
if (!success) return 0;
/* Set the new node as the owner of the slot in all the known nodes. */
if (!option_cold) {
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER "
"SETSLOT %d %s %s",
slot, "node",
target->name);
success = (r != NULL);
if (!success) return 0;
if (r->type == REDIS_REPLY_ERROR) {
success = 0;
if (err != NULL) {
*err = zmalloc((r->len + 1) * sizeof(char));
strcpy(*err, r->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
}
}
freeReplyObject(r);
if (!success) return 0;
}
}
/* Update the node logical config */
if (opts & CLUSTER_MANAGER_OPT_UPDATE) {
source->slots[slot] = 0;
target->slots[slot] = 1;
}
return 1;
}
/* Flush the dirty node configuration by calling replicate for slaves or
* adding the slots for masters. */
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
@ -2425,20 +2688,24 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
clusterManagerNodeResetSlots(node);
if (i == 8) {
int remaining = strlen(line);
//TODO: just while(remaining) && assign p inside the block
while ((p = strchr(line, ' ')) != NULL || remaining) {
while (remaining > 0) {
p = strchr(line, ' ');
if (p == NULL) p = line + remaining;
remaining -= (p - line);
char *slotsdef = line;
*p = '\0';
if (remaining) line = p + 1;
else line = p;
if (remaining) {
line = p + 1;
remaining--;
} else line = p;
if (slotsdef[0] == '[') {
slotsdef++;
if ((p = strstr(slotsdef, "->-"))) { // Migrating
*p = '\0';
p += 3;
char *closing_bracket = strchr(p, ']');
if (closing_bracket) *closing_bracket = '\0';
sds slot = sdsnew(slotsdef);
sds dst = sdsnew(p);
node->migrating_count += 2;
@ -2451,6 +2718,8 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
} else if ((p = strstr(slotsdef, "-<-"))) {//Importing
*p = '\0';
p += 3;
char *closing_bracket = strchr(p, ']');
if (closing_bracket) *closing_bracket = '\0';
sds slot = sdsnew(slotsdef);
sds src = sdsnew(p);
node->importing_count += 2;
@ -2605,8 +2874,9 @@ invalid_friend:
if (n->replicate != NULL) {
clusterManagerNode *master = clusterManagerNodeByName(n->replicate);
if (master == NULL) {
printf("*** WARNING: %s:%d claims to be slave of unknown "
"node ID %s.\n", n->ip, n->port, n->replicate);
clusterManagerLogWarn("*** WARNING: %s:%d claims to be "
"slave of unknown node ID %s.\n",
n->ip, n->port, n->replicate);
} else master->replicas_count++;
}
}
@ -2619,6 +2889,12 @@ int clusterManagerSlotCompare(const void *slot1, const void *slot2) {
return strcmp(*i1, *i2);
}
int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) {
clusterManagerNode *node1 = *((clusterManagerNode **) n1);
clusterManagerNode *node2 = *((clusterManagerNode **) n2);
return node2->slots_count - node1->slots_count;
}
static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
sds signature = NULL;
int node_count = 0, i = 0, name_len = 0;
@ -2651,16 +2927,18 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
if (remaining == 0) continue;
char **slots = NULL;
int c = 0;
//TODO: just while(remaining) && assign p inside the block
while ((p = strchr(line, ' ')) != NULL || remaining) {
while (remaining > 0) {
p = strchr(line, ' ');
if (p == NULL) p = line + remaining;
int size = (p - line);
remaining -= size;
tot_size += size;
char *slotsdef = line;
*p = '\0';
if (remaining) line = p + 1;
else line = p;
if (remaining) {
line = p + 1;
remaining--;
} else line = p;
if (slotsdef[0] != '[') {
c++;
slots = zrealloc(slots, (c * sizeof(char *)));
@ -2792,7 +3070,7 @@ static void clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->migrating_count; i += 2) {
sds slot = n->migrating[i];
dictAdd(open_slots, slot, n->migrating[i + 1]);
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -2810,7 +3088,7 @@ static void clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->importing_count; i += 2) {
sds slot = n->importing[i];
dictAdd(open_slots, slot, n->importing[i + 1]);
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -2848,6 +3126,76 @@ static void clusterManagerCheckCluster(int quiet) {
}
}
static clusterManagerNode *clusterNodeForResharding(char *id,
clusterManagerNode *target,
int *raise_err)
{
clusterManagerNode *node = NULL;
const char *invalid_node_msg = "*** The specified node is not known or "
"not a master, please retry.\n";
node = clusterManagerNodeByName(id);
*raise_err = 0;
if (!node || node->flags & CLUSTER_MANAGER_FLAG_SLAVE) {
clusterManagerLogErr(invalid_node_msg);
*raise_err = 1;
return NULL;
} else if (node != NULL && target != NULL) {
if (!strcmp(node->name, target->name)) {
clusterManagerLogErr( "*** It is not possible to use "
"the target node as "
"source node.\n");
return NULL;
}
}
return node;
}
static list *clusterManagerComputeReshardTable(list *sources, int numslots) {
list *moved = listCreate();
int src_count = listLength(sources), i = 0, tot_slots = 0, j;
clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted));
listIter li;
listNode *ln;
listRewind(sources, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
tot_slots += node->slots_count;
sorted[i++] = node;
}
qsort(sorted, src_count, sizeof(clusterManagerNode *),
clusterManagerSlotCountCompareDesc);
for (i = 0; i < src_count; i++) {
clusterManagerNode *node = sorted[i];
float n = ((float) numslots / tot_slots * node->slots_count);
if (i == 0) n = ceil(n);
else n = floor(n);
int max = (int) n, count = 0;
for (j = 0; j < CLUSTER_MANAGER_SLOTS; j++) {
int slot = node->slots[j];
if (!slot) continue;
if (count >= max || (int)listLength(moved) >= numslots) break;
clusterManagerReshardTableItem *item = zmalloc(sizeof(item));
item->source = node;
item->slot = j;
listAddNodeTail(moved, item);
count++;
}
}
zfree(sorted);
return moved;
}
static void clusterManagerShowReshardTable(list *table) {
listIter li;
listNode *ln;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
clusterManagerNode *n = item->source;
printf(" Moving slot %d from %s\n", item->slot, (char *) n->name);
}
}
static void clusterManagerLog(int level, const char* fmt, ...) {
int use_colors =
(config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COLOR);
@ -3219,59 +3567,218 @@ cleanup:
static int clusterManagerCommandInfo(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (argc == 1) {
char *addr = argv[0];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c != NULL) {
*c = '\0';
ip = addr;
port = atoi(++c);
} else goto invalid_args;
} else {
ip = argv[0];
port = atoi(argv[1]);
}
if (!ip || !port) goto invalid_args;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerShowInfo();
return 1;
invalid_args:
fprintf(stderr, "Invalid arguments: you need to pass either a valid "
"address (ie. 120.0.0.1:7000) or space separated IP "
"and port (ie. 120.0.0.1 7000)\n");
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandCheck(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (argc == 1) {
char *addr = argv[0];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c != NULL) {
*c = '\0';
ip = addr;
port = atoi(++c);
} else goto invalid_args;
} else {
ip = argv[0];
port = atoi(argv[1]);
}
if (!ip || !port) goto invalid_args;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerShowInfo();
clusterManagerCheckCluster(0);
return 1;
invalid_args:
fprintf(stderr, "Invalid arguments: you need to pass either a valid "
"address (ie. 120.0.0.1:7000) or space separated IP "
"and port (ie. 120.0.0.1 7000)\n");
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
fflush(stdout);
fprintf(stderr,
"*** Please fix your cluster problems before resharding\n");
return 0;
}
int slots = config.cluster_manager_command.slots;
if (!slots) {
while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) {
printf("How many slots do you want to move (from 1 to %d)? ",
CLUSTER_MANAGER_SLOTS);
fflush(stdout);
char buf[6];
int nread = read(fileno(stdin),buf,6);
if (!nread) continue; //TODO: nread < 0
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
slots = atoi(buf);
}
}
char buf[255];
char *to = config.cluster_manager_command.to,
*from = config.cluster_manager_command.from;
while (to == NULL) {
printf("What is the receiving node ID? ");
fflush(stdout);
int nread = read(fileno(stdin),buf,255);
if (!nread) continue; //TODO: nread < 0
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
if (strlen(buf) > 0) to = buf;
}
int raise_err = 0;
clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err);
if (target == NULL) return 0;
list *sources = listCreate();
list *table = NULL;
int all = 0, result = 1;
if (from == NULL) {
printf("Please enter all the source node IDs.\n");
printf(" Type 'all' to use all the nodes as source nodes for "
"the hash slots.\n");
printf(" Type 'done' once you entered all the source nodes IDs.\n");
while (1) {
printf("Source node #%lu: ", listLength(sources) + 1);
fflush(stdout);
int nread = read(fileno(stdin),buf,255);
if (!nread) continue; //TODO: nread < 0
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
if (!strcmp(buf, "done")) break;
else if (!strcmp(buf, "all")) {
all = 1;
break;
} else {
clusterManagerNode *src =
clusterNodeForResharding(buf, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
}
} else {
char *p;
while((p = strchr(from, ',')) != NULL) {
*p = '\0';
if (!strcmp(from, "all")) {
all = 1;
break;
} else {
clusterManagerNode *src =
clusterNodeForResharding(from, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
from = p + 1;
}
/* Check if there's still another source to process. */
if (!all && strlen(from) > 0) {
clusterManagerNode *src =
clusterNodeForResharding(from, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
}
listIter li;
listNode *ln;
if (all) {
listEmpty(sources);
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
if (!sdscmp(n->name, target->name)) continue;
listAddNodeTail(sources, n);
}
}
if (listLength(sources) == 0) {
fprintf(stderr, "*** No source nodes given, operation aborted.\n");
result = 0;
goto cleanup;
}
printf("\nReady to move %d slots.\n", slots);
printf(" Source nodes:\n");
listRewind(sources, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *src = ln->value;
sds info = clusterManagerNodeInfo(src, 4);
printf("%s\n", info);
sdsfree(info);
}
printf(" Destination node:\n");
sds info = clusterManagerNodeInfo(target, 4);
printf("%s\n", info);
sdsfree(info);
table = clusterManagerComputeReshardTable(sources, slots);
printf(" Resharding plan:\n");
clusterManagerShowReshardTable(table);
if (!(config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_YES))
{
printf("Do you want to proceed with the proposed "
"reshard plan (yes/no)? ");
fflush(stdout);
char buf[4];
int nread = read(fileno(stdin),buf,4);
buf[3] = '\0';
if (nread <= 0 || strcmp("yes", buf) != 0) {
result = 0;
goto cleanup;
}
}
int opts = CLUSTER_MANAGER_OPT_VERBOSE;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
char *err = NULL;
result = clusterManagerMoveSlot(item->source, target, item->slot,
opts, &err);
if (!result) {
if (err != NULL) {
clusterManagerLogErr("\n%s\n", err);
zfree(err);
}
goto cleanup;
}
}
cleanup:
listRelease(sources);
if (table) {
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
zfree(item);
}
listRelease(table);
}
return result;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
@ -3332,13 +3839,32 @@ static int clusterManagerCommandHelp(int argc, char **argv) {
sizeof(clusterManagerCommandDef);
int i = 0, j;
fprintf(stderr, "Cluster Manager Commands:\n");
int padding = 15;
for (; i < commands_count; i++) {
clusterManagerCommandDef *def = &(clusterManagerCommands[i]);
int namelen = strlen(def->name), padlen = 15 - namelen;
int namelen = strlen(def->name), padlen = padding - namelen;
fprintf(stderr, " %s", def->name);
for (j = 0; j < padlen; j++) fprintf(stderr, " ");
fprintf(stderr, "%s\n", (def->args ? def->args : ""));
//TODO: if (def->options)
if (def->options != NULL) {
int optslen = strlen(def->options);
char *p = def->options, *eos = p + optslen;
char *comma = NULL;
while ((comma = strchr(p, ',')) != NULL) {
int deflen = (int)(comma - p);
char buf[255];
memcpy(buf, p, deflen);
buf[deflen] = '\0';
for (j = 0; j < padding; j++) fprintf(stderr, " ");
fprintf(stderr, " --cluster-%s\n", buf);
p = comma + 1;
if (p >= eos) break;
}
if (p < eos) {
for (j = 0; j < padding; j++) fprintf(stderr, " ");
fprintf(stderr, " --cluster-%s\n", p);
}
}
}
return 0;
}
@ -4641,6 +5167,11 @@ int main(int argc, char **argv) {
config.cluster_manager_command.argv = NULL;
config.cluster_manager_command.flags = 0;
config.cluster_manager_command.replicas = 0;
config.cluster_manager_command.from = NULL;
config.cluster_manager_command.to = NULL;
config.cluster_manager_command.slots = 0;
config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT;
config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
pref.hints = 1;
spectrum_palette = spectrum_palette_color;