From 6d1a7cec230e5a892be3eeffeec83231a4079b9f Mon Sep 17 00:00:00 2001 From: artix Date: Fri, 23 Mar 2018 16:46:43 +0100 Subject: [PATCH] Cluster Manager: rebalance command --- src/redis-cli.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 286 insertions(+), 11 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 4f87f906..49ba4125 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -71,6 +71,7 @@ #define CLUSTER_MANAGER_SLOTS 16384 #define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 #define CLUSTER_MANAGER_MIGRATE_PIPELINE 10 +#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2 #define CLUSTER_MANAGER_INVALID_HOST_ARG \ "Invalid arguments: you need to pass either a valid " \ @@ -108,10 +109,13 @@ #define CLUSTER_MANAGER_FLAG_DISCONNECT 1 << 4 #define CLUSTER_MANAGER_FLAG_FAIL 1 << 5 -#define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0 -#define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1 -#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2 -#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7 +#define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0 +#define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1 +#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2 +#define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS 1 << 3 +#define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER 1 << 4 +#define CLUSTER_MANAGER_CMD_FLAG_SIMULATE 1 << 5 +#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_COLD 1 << 1 @@ -157,9 +161,12 @@ typedef struct clusterManagerCommand { int replicas; char *from; char *to; + char **weight; + int weight_argc; int slots; int timeout; int pipeline; + float threshold; } clusterManagerCommand; static void createClusterManagerCommand(char *cmdname, int argc, char **argv); @@ -206,6 +213,7 @@ static struct config { int eval_ldb_end; /* Lua debugging session ended. */ int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */ int last_cmd_type; + int verbose; clusterManagerCommand cluster_manager_command; } config; @@ -1266,6 +1274,8 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"-d") && !lastarg) { sdsfree(config.mb_delim); config.mb_delim = sdsnew(argv[++i]); + } else if (!strcmp(argv[i],"--verbose")) { + config.verbose = 1; } else if (!strcmp(argv[i],"--cluster") && !lastarg) { if (CLUSTER_MANAGER_MODE()) usage(); char *cmd = argv[++i]; @@ -1282,15 +1292,35 @@ static int parseOptions(int argc, char **argv) { config.cluster_manager_command.from = argv[++i]; } else if (!strcmp(argv[i],"--cluster-to") && !lastarg) { config.cluster_manager_command.to = argv[++i]; + } else if (!strcmp(argv[i],"--cluster-weight") && !lastarg) { + int widx = i + 1; + char **weight = argv + widx; + int wargc = 0; + for (; widx < argc; widx++) { + if (strstr(argv[widx], "--") == argv[widx]) break; + wargc++; + } + if (wargc > 0) { + config.cluster_manager_command.weight = weight; + config.cluster_manager_command.weight_argc = wargc; + } } else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) { config.cluster_manager_command.slots = atoi(argv[++i]); } else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) { config.cluster_manager_command.timeout = atoi(argv[++i]); } else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) { config.cluster_manager_command.pipeline = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--cluster-threshold") && !lastarg) { + config.cluster_manager_command.threshold = atof(argv[++i]); } else if (!strcmp(argv[i],"--cluster-yes")) { config.cluster_manager_command.flags |= CLUSTER_MANAGER_CMD_FLAG_YES; + } else if (!strcmp(argv[i],"--cluster-simulate")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_SIMULATE; + } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { + config.cluster_manager_command.flags |= + CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -1390,6 +1420,7 @@ static void usage(void) { " are not rolled back from the server memory.\n" " --cluster [args...] [opts...]\n" " Cluster Manager command and arguments (see below).\n" +" --verbose Verbose mode.\n" " --help Output this help and exit.\n" " --version Output version and exit.\n" "\n" @@ -1749,6 +1780,8 @@ typedef struct clusterManagerNode { sds *importing; int migrating_count; int importing_count; + float weight; /* Weight used by rebalance */ + int balance; /* Used by rebalance */ } clusterManagerNode; /* Data structure used to represent a sequence of nodes. */ @@ -1780,6 +1813,7 @@ typedef int clusterManagerCommandProc(int argc, char **argv); static clusterManagerNode *clusterManagerNewNode(char *ip, int port); static clusterManagerNode *clusterManagerNodeByName(const char *name); +static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n); static void clusterManagerNodeResetSlots(clusterManagerNode *node); static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err); static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, @@ -1813,6 +1847,7 @@ static int clusterManagerCommandCreate(int argc, char **argv); static int clusterManagerCommandInfo(int argc, char **argv); static int clusterManagerCommandCheck(int argc, char **argv); static int clusterManagerCommandReshard(int argc, char **argv); +static int clusterManagerCommandRebalance(int argc, char **argv); static int clusterManagerCommandCall(int argc, char **argv); static int clusterManagerCommandHelp(int argc, char **argv); @@ -1831,6 +1866,9 @@ clusterManagerCommandDef clusterManagerCommands[] = { {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, {"reshard", clusterManagerCommandReshard, -1, "host:port", "from ,to ,slots ,yes,timeout ,pipeline "}, + {"rebalance", clusterManagerCommandRebalance, -1, "host:port", + "weight ,use-empty-masters," + "timeout ,simulate,pipeline ,threshold "}, {"call", clusterManagerCommandCall, -2, "host:port command arg arg .. arg", NULL}, {"help", clusterManagerCommandHelp, 0, NULL, NULL} @@ -1970,10 +2008,13 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) { node->migrating_count = 0; node->importing_count = 0; node->replicas_count = 0; + node->weight = 1.0f; + node->balance = 0; clusterManagerNodeResetSlots(node); return node; } +/* Return the node with the specified ID or NULL. */ static clusterManagerNode *clusterManagerNodeByName(const char *name) { if (cluster_manager.nodes == NULL) return NULL; clusterManagerNode *found = NULL; @@ -1994,6 +2035,32 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name) { return found; } +/* Like get_node_by_name but the specified name can be just the first + * part of the node ID as long as the prefix in unique across the + * cluster. + */ +static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char*name) +{ + if (cluster_manager.nodes == NULL) return NULL; + clusterManagerNode *found = NULL; + sds lcname = sdsempty(); + lcname = sdscpy(lcname, name); + sdstolower(lcname); + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->name && + strstr(n->name, lcname) == n->name) { + found = n; + break; + } + } + sdsfree(lcname); + return found; +} + static void clusterManagerNodeResetSlots(clusterManagerNode *node) { memset(node->slots, 0, sizeof(node->slots)); node->slots_count = 0; @@ -2898,6 +2965,12 @@ int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) { return node2->slots_count - node1->slots_count; } +int clusterManagerCompareNodeBalance(const void *n1, const void *n2) { + clusterManagerNode *node1 = *((clusterManagerNode **) n1); + clusterManagerNode *node2 = *((clusterManagerNode **) n2); + return node1->balance - node2->balance; +} + static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { sds signature = NULL; int node_count = 0, i = 0, name_len = 0; @@ -3200,6 +3273,19 @@ static void clusterManagerShowReshardTable(list *table) { } } +static void clusterManagerReleaseReshardTable(list *table) { + if (table != NULL) { + listIter li; + listNode *ln; + listRewind(table, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerReshardTableItem *item = ln->value; + zfree(item); + } + listRelease(table); + } +} + static void clusterManagerLog(int level, const char* fmt, ...) { int use_colors = (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COLOR); @@ -3775,14 +3861,199 @@ static int clusterManagerCommandReshard(int argc, char **argv) { } cleanup: listRelease(sources); - if (table) { - listRewind(table, &li); - while ((ln = listNext(&li)) != NULL) { - clusterManagerReshardTableItem *item = ln->value; - zfree(item); - } - listRelease(table); + clusterManagerReleaseReshardTable(table); + return result; +invalid_args: + fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); + return 0; +} + +static int clusterManagerCommandRebalance(int argc, char **argv) { + int port = 0; + char *ip = NULL; + clusterManagerNode **weightedNodes = NULL; + list *involved = NULL; + if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; + clusterManagerNode *node = clusterManagerNewNode(ip, port); + if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; + int result = 1, i; + if (config.cluster_manager_command.weight != NULL) { + for (i = 0; i < config.cluster_manager_command.weight_argc; i++) { + char *name = config.cluster_manager_command.weight[i]; + char *p = strchr(name, '='); + if (p == NULL) { + result = 0; + goto cleanup; + } + *p = '\0'; + float w = atof(++p); + clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name); + if (n == NULL) { + clusterManagerLogErr("*** No such master node %s\n", name); + result = 0; + goto cleanup; + } + n->weight = w; + } } + float total_weight = 0; + int nodes_involved = 0; + int use_empty = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; + + involved = listCreate(); + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + /* Compute the total cluster weight. */ + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate) + continue; + if (!use_empty && n->slots_count == 0) { + n->weight = 0; + continue; + } + total_weight += n->weight; + nodes_involved++; + listAddNodeTail(involved, n); + } + weightedNodes = zmalloc(nodes_involved * + sizeof(clusterManagerNode *)); + if (weightedNodes == NULL) goto cleanup; + /* Check cluster, only proceed if it looks sane. */ + clusterManagerCheckCluster(1); + if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { + clusterManagerLogErr("*** Please fix your cluster problems " + "before rebalancing" ); + result = 0; + goto cleanup; + } + /* Calculate the slots balance for each node. It's the number of + * slots the node should lose (if positive) or gain (if negative) + * in order to be balanced. */ + int threshold_reached = 0, total_balance = 0; + float threshold = config.cluster_manager_command.threshold; + i = 0; + listRewind(involved, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + weightedNodes[i++] = n; + int expected = (((float)CLUSTER_MANAGER_SLOTS / total_weight) * + (int) n->weight); + n->balance = n->slots_count - expected; + total_balance += n->balance; + /* Compute the percentage of difference between the + * expected number of slots and the real one, to see + * if it's over the threshold specified by the user. */ + int over_threshold = 0; + if (config.cluster_manager_command.threshold > 0) { + if (n->slots_count > 0) { + float err_perc = fabs((100-(100.0*expected/n->slots_count))); + if (err_perc > threshold) over_threshold = 1; + } else if (expected > 1) { + over_threshold = 1; + } + } + if (over_threshold) threshold_reached = 1; + } + if (!threshold_reached) { + clusterManagerLogErr("*** No rebalancing needed! " + "All nodes are within the %.2f%% threshold.\n", + config.cluster_manager_command.threshold); + result = 0; + goto cleanup; + } + /* Because of rounding, it is possible that the balance of all nodes + * summed does not give 0. Make sure that nodes that have to provide + * slots are always matched by nodes receiving slots. */ + while (total_balance > 0) { + listRewind(involved, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->balance < 0 && total_balance > 0) { + n->balance--; + total_balance--; + } + } + } + /* Sort nodes by their slots balance. */ + qsort(weightedNodes, nodes_involved, sizeof(clusterManagerNode *), + clusterManagerCompareNodeBalance); + clusterManagerLogInfo(">>> Rebalancing across %d nodes. " + "Total weight = %.2f\n", + nodes_involved, total_weight); + if (config.verbose) { + for (i = 0; i < nodes_involved; i++) { + clusterManagerNode *n = weightedNodes[i]; + printf("%s:%d balance is %d slots\n", n->ip, n->port, n->balance); + } + } + /* Now we have at the start of the 'sn' array nodes that should get + * slots, at the end nodes that must give slots. + * We take two indexes, one at the start, and one at the end, + * incrementing or decrementing the indexes accordingly til we + * find nodes that need to get/provide slots. */ + int dst_idx = 0; + int src_idx = nodes_involved - 1; + int simulate = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_SIMULATE; + while (dst_idx < src_idx) { + clusterManagerNode *dst = weightedNodes[dst_idx]; + clusterManagerNode *src = weightedNodes[src_idx]; + int db = abs(dst->balance); + int sb = abs(src->balance); + int numslots = (db < sb ? db : sb); + if (numslots > 0) { + printf("Moving %d slots from %s:%d to %s:%d\n", numslots, + src->ip, + src->port, + dst->ip, + dst->port); + /* Actaully move the slots. */ + list *lsrc = listCreate(), *table = NULL; + listAddNodeTail(lsrc, src); + table = clusterManagerComputeReshardTable(lsrc, numslots); + listRelease(lsrc); + int table_len = (int) listLength(table); + if (!table || table_len != numslots) { + clusterManagerLogErr("*** Assertio failed: Reshard table " + "!= number of slots"); + result = 0; + goto end_move; + } + if (simulate) { + for (i = 0; i < table_len; i++) printf("#"); + } else { + int opts = CLUSTER_MANAGER_OPT_QUIET | + CLUSTER_MANAGER_OPT_UPDATE; + listRewind(table, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerReshardTableItem *item = ln->value; + result = clusterManagerMoveSlot(item->source, + dst, + item->slot, + opts, NULL); + if (!result) goto end_move; + printf("#"); + fflush(stdout); + } + + } + printf("\n"); +end_move: + clusterManagerReleaseReshardTable(table); + if (!result) goto cleanup; + } + /* Update nodes balance. */ + dst->balance += numslots; + src->balance -= numslots; + if (dst->balance == 0) dst_idx++; + if (src->balance == 0) src_idx --; + } +cleanup: + if (involved != NULL) listRelease(involved); + if (weightedNodes != NULL) zfree(weightedNodes); return result; invalid_args: fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); @@ -5169,6 +5440,7 @@ int main(int argc, char **argv) { config.eval_ldb_sync = 0; config.enable_ldb_on_eval = 0; config.last_cmd_type = -1; + config.verbose = 0; config.cluster_manager_command.name = NULL; config.cluster_manager_command.argc = 0; config.cluster_manager_command.argv = NULL; @@ -5176,9 +5448,12 @@ int main(int argc, char **argv) { config.cluster_manager_command.replicas = 0; config.cluster_manager_command.from = NULL; config.cluster_manager_command.to = NULL; + config.cluster_manager_command.weight = NULL; config.cluster_manager_command.slots = 0; config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT; config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE; + config.cluster_manager_command.threshold = + CLUSTER_MANAGER_REBALANCE_THRESHOLD; pref.hints = 1; spectrum_palette = spectrum_palette_color;