diff --git a/src/redis-cli.c b/src/redis-cli.c index 66fc4d18..fcf48a47 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -69,6 +69,13 @@ #define REDIS_CLI_RCFILE_DEFAULT ".redisclirc" #define CLUSTER_MANAGER_SLOTS 16384 +#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 +#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10 + +#define CLUSTER_MANAGER_INVALID_HOST_ARG \ + "Invalid arguments: you need to pass either a valid " \ + "address (ie. 120.0.0.1:7000) or space separated IP " \ + "and port (ie. 120.0.0.1 7000)\n" #define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL) #define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1)) #define CLUSTER_MANAGER_NODE_CONNECT(n) \ @@ -103,9 +110,14 @@ #define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0 #define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1 +#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2 #define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 +#define CLUSTER_MANAGER_OPT_COLD 1 << 1 +#define CLUSTER_MANAGER_OPT_UPDATE 1 << 2 +#define CLUSTER_MANAGER_OPT_QUIET 1 << 6 +#define CLUSTER_MANAGER_OPT_VERBOSE 1 << 7 #define CLUSTER_MANAGER_LOG_LVL_INFO 1 #define CLUSTER_MANAGER_LOG_LVL_WARN 2 @@ -143,6 +155,11 @@ typedef struct clusterManagerCommand { char **argv; int flags; int replicas; + char *from; + char *to; + int slots; + int timeout; + int pipeline; } clusterManagerCommand; static void createClusterManagerCommand(char *cmdname, int argc, char **argv); @@ -1261,6 +1278,19 @@ static int parseOptions(int argc, char **argv) { usage(); } else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) { config.cluster_manager_command.replicas = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--cluster-from") && !lastarg) { + config.cluster_manager_command.from = argv[++i]; + } else if (!strcmp(argv[i],"--cluster-to") && !lastarg) { + config.cluster_manager_command.to = argv[++i]; + } else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) { + config.cluster_manager_command.slots = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) { + config.cluster_manager_command.timeout = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) { + config.cluster_manager_command.pipeline = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--cluster-yes")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_YES; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1358,7 +1388,7 @@ static void usage(void) { " --ldb-sync-mode Like --ldb but uses the synchronous Lua debugger, in\n" " this mode the server is blocked and script changes are\n" " are not rolled back from the server memory.\n" -" --cluster [args...]\n" +" --cluster [args...] [opts...]\n" " Cluster Manager command and arguments (see below).\n" " --help Output this help and exit.\n" " --version Output version and exit.\n" @@ -1729,6 +1759,12 @@ typedef struct clusterManagerNodeArray { int count; /* Non-NULL nodes count */ } clusterManagerNodeArray; +/* Used for reshard table. */ +typedef struct clusterManagerReshardTableItem { + clusterManagerNode *source; + int slot; +} clusterManagerReshardTableItem; + static dictType clusterManagerDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ @@ -1754,7 +1790,7 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes, int ip_count, clusterManagerNode ***offending, int *offending_len); static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes, int ip_count); -static sds clusterManagerNodeInfo(clusterManagerNode *node); +static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent); static void clusterManagerShowNodes(void); static void clusterManagerShowInfo(void); static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err); @@ -1776,6 +1812,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, static int clusterManagerCommandCreate(int argc, char **argv); static int clusterManagerCommandInfo(int argc, char **argv); static int clusterManagerCommandCheck(int argc, char **argv); +static int clusterManagerCommandReshard(int argc, char **argv); static int clusterManagerCommandCall(int argc, char **argv); static int clusterManagerCommandHelp(int argc, char **argv); @@ -1789,9 +1826,11 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", - "cluster-replicas"}, - {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, + "replicas "}, {"check", clusterManagerCommandCheck, -1, "host:port", NULL}, + {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, + {"reshard", clusterManagerCommandReshard, -1, "host:port", + "from ,to ,slots ,yes,timeout ,pipeline "}, {"call", clusterManagerCommandCall, -2, "host:port command arg arg .. arg", NULL}, {"help", clusterManagerCommandHelp, 0, NULL, NULL} @@ -1829,6 +1868,38 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) { return proc; } +/* Get host ip and port from command arguments. If only one argument has + * been provided it must be in the form of 'ip:port', elsewhere + * the first argument must be the ip and the second one the port. + * If host and port can be detected, it returns 1 and it stores host and + * port into variables referenced by'ip_ptr' and 'port_ptr' pointers, + * elsewhere it returns 0. */ +static int getClusterHostFromCmdArgs(int argc, char **argv, + char **ip_ptr, int *port_ptr) { + int port = 0; + char *ip = NULL; + if (argc == 1) { + char *addr = argv[0]; + char *c = strrchr(addr, '@'); + if (c != NULL) *c = '\0'; + c = strrchr(addr, ':'); + if (c != NULL) { + *c = '\0'; + ip = addr; + port = atoi(++c); + } else return 0; + } else { + ip = argv[0]; + port = atoi(argv[1]); + } + if (!ip || !port) return 0; + else { + *ip_ptr = ip; + *port_ptr = port; + } + return 1; +} + static void freeClusterManagerNode(clusterManagerNode *node) { if (node->context != NULL) redisFree(node->context); if (node->friends != NULL) { @@ -2188,8 +2259,12 @@ static sds clusterManagerNodeSlotsString(clusterManagerNode *node) { return slots; } -static sds clusterManagerNodeInfo(clusterManagerNode *node) { +static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) { sds info = sdsempty(); + sds spaces = sdsempty(); + int i; + for (i = 0; i < indent; i++) spaces = sdscat(spaces, " "); + if (indent) info = sdscat(info, spaces); int is_master = !(node->flags & CLUSTER_MANAGER_FLAG_SLAVE); char *role = (is_master ? "M" : "S"); sds slots = NULL; @@ -2198,17 +2273,18 @@ static sds clusterManagerNodeInfo(clusterManagerNode *node) { else { slots = clusterManagerNodeSlotsString(node); info = sdscatfmt(info, "%s: %S %s:%u\n" - " slots:%S (%u slots) " + "%s slots:%S (%u slots) " "", //TODO: flags string - role, node->name, node->ip, node->port, + role, node->name, node->ip, node->port, spaces, slots, node->slots_count); sdsfree(slots); } if (node->replicate != NULL) - info = sdscatfmt(info, "\n replicates %S", node->replicate); + info = sdscatfmt(info, "\n%s replicates %S", spaces, node->replicate); else if (node->replicas_count) - info = sdscatfmt(info, "\n %U additional replica(s)", - node->replicas_count); + info = sdscatfmt(info, "\n%s %U additional replica(s)", + spaces, node->replicas_count); + sdsfree(spaces); return info; } @@ -2218,7 +2294,7 @@ static void clusterManagerShowNodes(void) { listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *node = ln->value; - sds info = clusterManagerNodeInfo(node); + sds info = clusterManagerNodeInfo(node, 0); printf("%s\n", info); sdsfree(info); } @@ -2306,7 +2382,7 @@ static int clusterManagerAddSlots(clusterManagerNode *node, char**err) argvlen[i] = sdslen(argv[i]); redisAppendCommandArgv(node->context,argc,(const char**)argv,argvlen); if (redisGetReply(node->context, &_reply) != REDIS_OK) { - success = 1; + success = 0; goto cleanup; } reply = (redisReply*) _reply; @@ -2326,6 +2402,193 @@ cleanup: return success; } +/* Set slot status to "importing" or "migrating" */ +static int clusterManagerSetSlot(clusterManagerNode *node1, + clusterManagerNode *node2, + int slot, const char *mode, char **err) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER " + "SETSLOT %d %s %s", + slot, mode, + (char *) node2->name); + if (err != NULL) *err = NULL; + if (!reply) return 0; + if (reply->type == REDIS_REPLY_ERROR) { + if (err != NULL) { + *err = zmalloc((reply->len + 1) * sizeof(char)); + strcpy(*err, reply->str); + } + return 0; + } + return 1; +} + +static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, + clusterManagerNode *target, + int slot, int timeout, + int pipeline, int verbose, + char **err) +{ + int success = 1; + while (1) { + redisReply *reply = NULL, *migrate_reply = NULL; + char **argv = NULL; + size_t *argv_len = NULL; + reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER " + "GETKEYSINSLOT %d %d", slot, + pipeline); + success = (reply != NULL); + if (!success) return 0; + if (reply->type == REDIS_REPLY_ERROR) { + success = 0; + if (err != NULL) { + *err = zmalloc((reply->len + 1) * sizeof(char)); + strcpy(*err, reply->str); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err); + } + goto next; + } + assert(reply->type == REDIS_REPLY_ARRAY); + size_t count = reply->elements; + if (count == 0) { + freeReplyObject(reply); + break; + } + char *dots = (verbose ? zmalloc((count+1) * sizeof(char)) : NULL); + /* Calling MIGRATE command. */ + size_t argc = count + 8; + argv = zcalloc(argc * sizeof(char *)); + argv_len = zcalloc(argc * sizeof(size_t)); + char portstr[255]; + char timeoutstr[255]; + snprintf(portstr, 10, "%d", target->port); + snprintf(timeoutstr, 10, "%d", timeout); + argv[0] = "MIGRATE"; + argv_len[0] = 7; + argv[1] = target->ip; + argv_len[1] = strlen(target->ip); + argv[2] = portstr; + argv_len[2] = strlen(portstr); + argv[3] = ""; + argv_len[3] = 0; + argv[4] = "0"; + argv_len[4] = 1; + argv[5] = timeoutstr; + argv_len[5] = strlen(timeoutstr); + argv[6] = "REPLACE"; + argv_len[6] = 7; + argv[7] = "KEYS"; + argv_len[7] = 4; + for (size_t i = 0; i < count; i++) { + redisReply *entry = reply->element[i]; + size_t idx = i + 8; + assert(entry->type == REDIS_REPLY_STRING); + argv[idx] = (char *) sdsnew(entry->str); + argv_len[idx] = entry->len; + if (verbose) dots[i] = '.'; + } + if (verbose) dots[count] = '\0'; + void *_reply = NULL; + redisAppendCommandArgv(source->context,argc, + (const char**)argv,argv_len); + success = (redisGetReply(source->context, &_reply) == REDIS_OK); + for (size_t i = 0; i < count; i++) sdsfree(argv[i + 8]); + if (!success) goto next; + migrate_reply = (redisReply *) _reply; + if (migrate_reply->type == REDIS_REPLY_ERROR) { + // TODO: Implement fix. + success = 0; + if (err != NULL) { + *err = zmalloc((migrate_reply->len + 1) * sizeof(char)); + strcpy(*err, migrate_reply->str); + printf("\n"); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err); + } + goto next; + } + if (verbose) { + printf("%s", dots); + fflush(stdout); + } +next: + if (reply != NULL) freeReplyObject(reply); + if (migrate_reply != NULL) freeReplyObject(migrate_reply); + zfree(argv); + zfree(argv_len); + if (!success) break; + } + return success; +} + +/* Move slots between source and target nodes using MIGRATE. + * + * Options: + * CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key. + * CLUSTER_MANAGER_OPT_COLD -- Move keys without opening slots / + * reconfiguring the nodes. + * CLUSTER_MANAGER_OPT_UPDATE -- Update node->slots for source/target nodes. + * CLUSTER_MANAGER_OPT_QUIET -- Don't print info messages. +*/ +static int clusterManagerMoveSlot(clusterManagerNode *source, + clusterManagerNode *target, + int slot, int opts, char**err) +{ + if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) { + printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip, + source->port, target->ip, target->port); + fflush(stdout); + } + if (err != NULL) *err = NULL; + int pipeline = config.cluster_manager_command.pipeline, + timeout = config.cluster_manager_command.timeout, + print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE), + option_cold = (opts & CLUSTER_MANAGER_OPT_COLD), + success = 1; + if (!option_cold) { + success = clusterManagerSetSlot(target, source, slot, + "importing", err); + if (!success) return 0; + success = clusterManagerSetSlot(source, target, slot, + "migrating", err); + if (!success) return 0; + } + success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout, + pipeline, print_dots, err); + if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n"); + if (!success) return 0; + /* Set the new node as the owner of the slot in all the known nodes. */ + if (!option_cold) { + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER " + "SETSLOT %d %s %s", + slot, "node", + target->name); + success = (r != NULL); + if (!success) return 0; + if (r->type == REDIS_REPLY_ERROR) { + success = 0; + if (err != NULL) { + *err = zmalloc((r->len + 1) * sizeof(char)); + strcpy(*err, r->str); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err); + } + } + freeReplyObject(r); + if (!success) return 0; + } + } + /* Update the node logical config */ + if (opts & CLUSTER_MANAGER_OPT_UPDATE) { + source->slots[slot] = 0; + target->slots[slot] = 1; + } + return 1; +} + /* Flush the dirty node configuration by calling replicate for slaves or * adding the slots for masters. */ static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) { @@ -2425,20 +2688,24 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, clusterManagerNodeResetSlots(node); if (i == 8) { int remaining = strlen(line); - //TODO: just while(remaining) && assign p inside the block - while ((p = strchr(line, ' ')) != NULL || remaining) { + while (remaining > 0) { + p = strchr(line, ' '); if (p == NULL) p = line + remaining; remaining -= (p - line); char *slotsdef = line; *p = '\0'; - if (remaining) line = p + 1; - else line = p; + if (remaining) { + line = p + 1; + remaining--; + } else line = p; if (slotsdef[0] == '[') { slotsdef++; if ((p = strstr(slotsdef, "->-"))) { // Migrating *p = '\0'; p += 3; + char *closing_bracket = strchr(p, ']'); + if (closing_bracket) *closing_bracket = '\0'; sds slot = sdsnew(slotsdef); sds dst = sdsnew(p); node->migrating_count += 2; @@ -2451,6 +2718,8 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, } else if ((p = strstr(slotsdef, "-<-"))) {//Importing *p = '\0'; p += 3; + char *closing_bracket = strchr(p, ']'); + if (closing_bracket) *closing_bracket = '\0'; sds slot = sdsnew(slotsdef); sds src = sdsnew(p); node->importing_count += 2; @@ -2605,8 +2874,9 @@ invalid_friend: if (n->replicate != NULL) { clusterManagerNode *master = clusterManagerNodeByName(n->replicate); if (master == NULL) { - printf("*** WARNING: %s:%d claims to be slave of unknown " - "node ID %s.\n", n->ip, n->port, n->replicate); + clusterManagerLogWarn("*** WARNING: %s:%d claims to be " + "slave of unknown node ID %s.\n", + n->ip, n->port, n->replicate); } else master->replicas_count++; } } @@ -2619,6 +2889,12 @@ int clusterManagerSlotCompare(const void *slot1, const void *slot2) { return strcmp(*i1, *i2); } +int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) { + clusterManagerNode *node1 = *((clusterManagerNode **) n1); + clusterManagerNode *node2 = *((clusterManagerNode **) n2); + return node2->slots_count - node1->slots_count; +} + static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { sds signature = NULL; int node_count = 0, i = 0, name_len = 0; @@ -2651,16 +2927,18 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { if (remaining == 0) continue; char **slots = NULL; int c = 0; - //TODO: just while(remaining) && assign p inside the block - while ((p = strchr(line, ' ')) != NULL || remaining) { + while (remaining > 0) { + p = strchr(line, ' '); if (p == NULL) p = line + remaining; int size = (p - line); remaining -= size; tot_size += size; char *slotsdef = line; *p = '\0'; - if (remaining) line = p + 1; - else line = p; + if (remaining) { + line = p + 1; + remaining--; + } else line = p; if (slotsdef[0] != '[') { c++; slots = zrealloc(slots, (c * sizeof(char *))); @@ -2792,7 +3070,7 @@ static void clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->migrating_count; i += 2) { sds slot = n->migrating[i]; - dictAdd(open_slots, slot, n->migrating[i + 1]); + dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -2810,7 +3088,7 @@ static void clusterManagerCheckCluster(int quiet) { n->port); for (i = 0; i < n->importing_count; i += 2) { sds slot = n->importing[i]; - dictAdd(open_slots, slot, n->importing[i + 1]); + dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); char *fmt = (i > 0 ? ",%S" : "%S"); errstr = sdscatfmt(errstr, fmt, slot); } @@ -2848,6 +3126,76 @@ static void clusterManagerCheckCluster(int quiet) { } } +static clusterManagerNode *clusterNodeForResharding(char *id, + clusterManagerNode *target, + int *raise_err) +{ + clusterManagerNode *node = NULL; + const char *invalid_node_msg = "*** The specified node is not known or " + "not a master, please retry.\n"; + node = clusterManagerNodeByName(id); + *raise_err = 0; + if (!node || node->flags & CLUSTER_MANAGER_FLAG_SLAVE) { + clusterManagerLogErr(invalid_node_msg); + *raise_err = 1; + return NULL; + } else if (node != NULL && target != NULL) { + if (!strcmp(node->name, target->name)) { + clusterManagerLogErr( "*** It is not possible to use " + "the target node as " + "source node.\n"); + return NULL; + } + } + return node; +} + +static list *clusterManagerComputeReshardTable(list *sources, int numslots) { + list *moved = listCreate(); + int src_count = listLength(sources), i = 0, tot_slots = 0, j; + clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted)); + listIter li; + listNode *ln; + listRewind(sources, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *node = ln->value; + tot_slots += node->slots_count; + sorted[i++] = node; + } + qsort(sorted, src_count, sizeof(clusterManagerNode *), + clusterManagerSlotCountCompareDesc); + for (i = 0; i < src_count; i++) { + clusterManagerNode *node = sorted[i]; + float n = ((float) numslots / tot_slots * node->slots_count); + if (i == 0) n = ceil(n); + else n = floor(n); + int max = (int) n, count = 0; + for (j = 0; j < CLUSTER_MANAGER_SLOTS; j++) { + int slot = node->slots[j]; + if (!slot) continue; + if (count >= max || (int)listLength(moved) >= numslots) break; + clusterManagerReshardTableItem *item = zmalloc(sizeof(item)); + item->source = node; + item->slot = j; + listAddNodeTail(moved, item); + count++; + } + } + zfree(sorted); + return moved; +} + +static void clusterManagerShowReshardTable(list *table) { + listIter li; + listNode *ln; + listRewind(table, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerReshardTableItem *item = ln->value; + clusterManagerNode *n = item->source; + printf(" Moving slot %d from %s\n", item->slot, (char *) n->name); + } +} + static void clusterManagerLog(int level, const char* fmt, ...) { int use_colors = (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COLOR); @@ -3219,59 +3567,218 @@ cleanup: static int clusterManagerCommandInfo(int argc, char **argv) { int port = 0; char *ip = NULL; - if (argc == 1) { - char *addr = argv[0]; - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c != NULL) { - *c = '\0'; - ip = addr; - port = atoi(++c); - } else goto invalid_args; - } else { - ip = argv[0]; - port = atoi(argv[1]); - } - if (!ip || !port) goto invalid_args; + if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; clusterManagerNode *node = clusterManagerNewNode(ip, port); if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; clusterManagerShowInfo(); return 1; invalid_args: - fprintf(stderr, "Invalid arguments: you need to pass either a valid " - "address (ie. 120.0.0.1:7000) or space separated IP " - "and port (ie. 120.0.0.1 7000)\n"); + fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); return 0; } static int clusterManagerCommandCheck(int argc, char **argv) { int port = 0; char *ip = NULL; - if (argc == 1) { - char *addr = argv[0]; - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c != NULL) { - *c = '\0'; - ip = addr; - port = atoi(++c); - } else goto invalid_args; - } else { - ip = argv[0]; - port = atoi(argv[1]); - } - if (!ip || !port) goto invalid_args; + if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; clusterManagerNode *node = clusterManagerNewNode(ip, port); if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; clusterManagerShowInfo(); clusterManagerCheckCluster(0); return 1; invalid_args: - fprintf(stderr, "Invalid arguments: you need to pass either a valid " - "address (ie. 120.0.0.1:7000) or space separated IP " - "and port (ie. 120.0.0.1 7000)\n"); + fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); + return 0; +} + +static int clusterManagerCommandReshard(int argc, char **argv) { + int port = 0; + char *ip = NULL; + if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; + clusterManagerNode *node = clusterManagerNewNode(ip, port); + if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; + clusterManagerCheckCluster(0); + if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { + fflush(stdout); + fprintf(stderr, + "*** Please fix your cluster problems before resharding\n"); + return 0; + } + int slots = config.cluster_manager_command.slots; + if (!slots) { + while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) { + printf("How many slots do you want to move (from 1 to %d)? ", + CLUSTER_MANAGER_SLOTS); + fflush(stdout); + char buf[6]; + int nread = read(fileno(stdin),buf,6); + if (!nread) continue; //TODO: nread < 0 + int last_idx = nread - 1; + if (buf[last_idx] != '\n') { + int ch; + while ((ch = getchar()) != '\n' && ch != EOF) {} + } + buf[last_idx] = '\0'; + slots = atoi(buf); + } + } + char buf[255]; + char *to = config.cluster_manager_command.to, + *from = config.cluster_manager_command.from; + while (to == NULL) { + printf("What is the receiving node ID? "); + fflush(stdout); + int nread = read(fileno(stdin),buf,255); + if (!nread) continue; //TODO: nread < 0 + int last_idx = nread - 1; + if (buf[last_idx] != '\n') { + int ch; + while ((ch = getchar()) != '\n' && ch != EOF) {} + } + buf[last_idx] = '\0'; + if (strlen(buf) > 0) to = buf; + } + int raise_err = 0; + clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err); + if (target == NULL) return 0; + list *sources = listCreate(); + list *table = NULL; + int all = 0, result = 1; + if (from == NULL) { + printf("Please enter all the source node IDs.\n"); + printf(" Type 'all' to use all the nodes as source nodes for " + "the hash slots.\n"); + printf(" Type 'done' once you entered all the source nodes IDs.\n"); + while (1) { + printf("Source node #%lu: ", listLength(sources) + 1); + fflush(stdout); + int nread = read(fileno(stdin),buf,255); + if (!nread) continue; //TODO: nread < 0 + int last_idx = nread - 1; + if (buf[last_idx] != '\n') { + int ch; + while ((ch = getchar()) != '\n' && ch != EOF) {} + } + buf[last_idx] = '\0'; + if (!strcmp(buf, "done")) break; + else if (!strcmp(buf, "all")) { + all = 1; + break; + } else { + clusterManagerNode *src = + clusterNodeForResharding(buf, target, &raise_err); + if (src != NULL) listAddNodeTail(sources, src); + else if (raise_err) { + result = 0; + goto cleanup; + } + } + } + } else { + char *p; + while((p = strchr(from, ',')) != NULL) { + *p = '\0'; + if (!strcmp(from, "all")) { + all = 1; + break; + } else { + clusterManagerNode *src = + clusterNodeForResharding(from, target, &raise_err); + if (src != NULL) listAddNodeTail(sources, src); + else if (raise_err) { + result = 0; + goto cleanup; + } + } + from = p + 1; + } + /* Check if there's still another source to process. */ + if (!all && strlen(from) > 0) { + clusterManagerNode *src = + clusterNodeForResharding(from, target, &raise_err); + if (src != NULL) listAddNodeTail(sources, src); + else if (raise_err) { + result = 0; + goto cleanup; + } + } + } + listIter li; + listNode *ln; + if (all) { + listEmpty(sources); + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) + continue; + if (!sdscmp(n->name, target->name)) continue; + listAddNodeTail(sources, n); + } + } + if (listLength(sources) == 0) { + fprintf(stderr, "*** No source nodes given, operation aborted.\n"); + result = 0; + goto cleanup; + } + printf("\nReady to move %d slots.\n", slots); + printf(" Source nodes:\n"); + listRewind(sources, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *src = ln->value; + sds info = clusterManagerNodeInfo(src, 4); + printf("%s\n", info); + sdsfree(info); + } + printf(" Destination node:\n"); + sds info = clusterManagerNodeInfo(target, 4); + printf("%s\n", info); + sdsfree(info); + table = clusterManagerComputeReshardTable(sources, slots); + printf(" Resharding plan:\n"); + clusterManagerShowReshardTable(table); + if (!(config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_YES)) + { + printf("Do you want to proceed with the proposed " + "reshard plan (yes/no)? "); + fflush(stdout); + char buf[4]; + int nread = read(fileno(stdin),buf,4); + buf[3] = '\0'; + if (nread <= 0 || strcmp("yes", buf) != 0) { + result = 0; + goto cleanup; + } + } + int opts = CLUSTER_MANAGER_OPT_VERBOSE; + listRewind(table, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerReshardTableItem *item = ln->value; + char *err = NULL; + result = clusterManagerMoveSlot(item->source, target, item->slot, + opts, &err); + if (!result) { + if (err != NULL) { + clusterManagerLogErr("\n%s\n", err); + zfree(err); + } + goto cleanup; + } + } +cleanup: + listRelease(sources); + if (table) { + listRewind(table, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerReshardTableItem *item = ln->value; + zfree(item); + } + listRelease(table); + } + return result; +invalid_args: + fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); return 0; } @@ -3332,13 +3839,32 @@ static int clusterManagerCommandHelp(int argc, char **argv) { sizeof(clusterManagerCommandDef); int i = 0, j; fprintf(stderr, "Cluster Manager Commands:\n"); + int padding = 15; for (; i < commands_count; i++) { clusterManagerCommandDef *def = &(clusterManagerCommands[i]); - int namelen = strlen(def->name), padlen = 15 - namelen; + int namelen = strlen(def->name), padlen = padding - namelen; fprintf(stderr, " %s", def->name); for (j = 0; j < padlen; j++) fprintf(stderr, " "); fprintf(stderr, "%s\n", (def->args ? def->args : "")); - //TODO: if (def->options) + if (def->options != NULL) { + int optslen = strlen(def->options); + char *p = def->options, *eos = p + optslen; + char *comma = NULL; + while ((comma = strchr(p, ',')) != NULL) { + int deflen = (int)(comma - p); + char buf[255]; + memcpy(buf, p, deflen); + buf[deflen] = '\0'; + for (j = 0; j < padding; j++) fprintf(stderr, " "); + fprintf(stderr, " --cluster-%s\n", buf); + p = comma + 1; + if (p >= eos) break; + } + if (p < eos) { + for (j = 0; j < padding; j++) fprintf(stderr, " "); + fprintf(stderr, " --cluster-%s\n", p); + } + } } return 0; } @@ -4641,6 +5167,11 @@ int main(int argc, char **argv) { config.cluster_manager_command.argv = NULL; config.cluster_manager_command.flags = 0; config.cluster_manager_command.replicas = 0; + config.cluster_manager_command.from = NULL; + config.cluster_manager_command.to = NULL; + config.cluster_manager_command.slots = 0; + config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT; + config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE; pref.hints = 1; spectrum_palette = spectrum_palette_color;