Cluster Manager: add-node command.

This commit is contained in:
artix 2018-04-11 17:08:53 +02:00
parent 81ab5a3b28
commit 615aefe6ba

View File

@ -165,6 +165,7 @@ typedef struct clusterManagerCommand {
char *from; char *from;
char *to; char *to;
char **weight; char **weight;
char *master_id;
int weight_argc; int weight_argc;
int slots; int slots;
int timeout; int timeout;
@ -1299,6 +1300,8 @@ static int parseOptions(int argc, char **argv) {
usage(); usage();
} else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) { } else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
config.cluster_manager_command.replicas = atoi(argv[++i]); config.cluster_manager_command.replicas = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-master-id") && !lastarg) {
config.cluster_manager_command.master_id = argv[++i];
} else if (!strcmp(argv[i],"--cluster-from") && !lastarg) { } else if (!strcmp(argv[i],"--cluster-from") && !lastarg) {
config.cluster_manager_command.from = argv[++i]; config.cluster_manager_command.from = argv[++i];
} else if (!strcmp(argv[i],"--cluster-to") && !lastarg) { } else if (!strcmp(argv[i],"--cluster-to") && !lastarg) {
@ -1335,6 +1338,9 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--cluster-copy")) { } else if (!strcmp(argv[i],"--cluster-copy")) {
config.cluster_manager_command.flags |= config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_COPY; CLUSTER_MANAGER_CMD_FLAG_COPY;
} else if (!strcmp(argv[i],"--cluster-slave")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_SLAVE;
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
config.cluster_manager_command.flags |= config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
@ -1847,6 +1853,8 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name);
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n); static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
static void clusterManagerNodeResetSlots(clusterManagerNode *node); static void clusterManagerNodeResetSlots(clusterManagerNode *node);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err); static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node,
char *err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err); char **err);
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts); static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts);
@ -1875,6 +1883,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
/* Cluster Manager commands. */ /* Cluster Manager commands. */
static int clusterManagerCommandCreate(int argc, char **argv); static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandAddNode(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv); static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv); static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandFix(int argc, char **argv); static int clusterManagerCommandFix(int argc, char **argv);
@ -1895,6 +1904,8 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = { clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"}, "replicas <arg>"},
{"add-node", clusterManagerCommandAddNode, 2,
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port", NULL}, {"check", clusterManagerCommandCheck, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port", NULL}, {"fix", clusterManagerCommandFix, -1, "host:port", NULL},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL}, {"info", clusterManagerCommandInfo, -1, "host:port", NULL},
@ -3030,8 +3041,7 @@ static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
opts |= CLUSTER_MANAGER_OPT_GETFRIENDS; opts |= CLUSTER_MANAGER_OPT_GETFRIENDS;
char *e = NULL; char *e = NULL;
if (!clusterManagerNodeIsCluster(node, &e)) { if (!clusterManagerNodeIsCluster(node, &e)) {
char *msg = (e ? e : "is not configured as a cluster node."); clusterManagerPrintNotClusterNodeError(node, e);
clusterManagerLogErr("[ERR] Node %s:%d %s\n",node->ip,node->port,msg);
if (e) zfree(e); if (e) zfree(e);
freeClusterManagerNode(node); freeClusterManagerNode(node);
return 0; return 0;
@ -3313,6 +3323,27 @@ static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes,
return node; return node;
} }
/* This function returns the master that has the least number of replicas
* in the cluster. If there are multiple masters with the same smaller
* number of replicas, one at random is returned. */
static clusterManagerNode *clusterManagerNodeWithLeastReplicas() {
clusterManagerNode *node = NULL;
int lowest_count = 0;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (node->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (node == NULL || n->replicas_count < lowest_count) {
node = n;
lowest_count = n->replicas_count;
}
}
return node;
}
static int clusterManagerFixSlotsCoverage(char *all_slots) { static int clusterManagerFixSlotsCoverage(char *all_slots) {
int i, fixed = 0; int i, fixed = 0;
list *none = NULL, *single = NULL, *multi = NULL; list *none = NULL, *single = NULL, *multi = NULL;
@ -3966,6 +3997,26 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
array->nodes[array->count++] = node; array->nodes[array->count++] = node;
} }
static void clusterManagerPrintNotEmptyNodeError(clusterManagerNode *node,
char *err)
{
char *msg;
if (err) msg = err;
else {
msg = "is not empty. Either the node already knows other "
"nodes (check with CLUSTER NODES) or contains some "
"key in database 0.";
}
clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg);
}
static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node,
char *err)
{
char *msg = (err ? err : "is not configured as a cluster node.");
clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg);
}
/* Execute redis-cli in Cluster Manager mode */ /* Execute redis-cli in Cluster Manager mode */
static void clusterManagerMode(clusterManagerCommandProc *proc) { static void clusterManagerMode(clusterManagerCommandProc *proc) {
int argc = config.cluster_manager_command.argc; int argc = config.cluster_manager_command.argc;
@ -4008,8 +4059,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
} }
char *err = NULL; char *err = NULL;
if (!clusterManagerNodeIsCluster(node, &err)) { if (!clusterManagerNodeIsCluster(node, &err)) {
char *msg = (err ? err : "is not configured as a cluster node."); clusterManagerPrintNotClusterNodeError(node, err);
clusterManagerLogErr("[ERR] Node %s:%d %s\n", ip, port, msg);
if (err) zfree(err); if (err) zfree(err);
freeClusterManagerNode(node); freeClusterManagerNode(node);
return 0; return 0;
@ -4025,14 +4075,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
} }
err = NULL; err = NULL;
if (!clusterManagerNodeIsEmpty(node, &err)) { if (!clusterManagerNodeIsEmpty(node, &err)) {
char *msg; clusterManagerPrintNotEmptyNodeError(node, err);
if (err) msg = err;
else {
msg = "is not empty. Either the node already knows other "
"nodes (check with CLUSTER NODES) or contains some "
"key in database 0.";
}
clusterManagerLogErr("[ERR] Node %s:%d %s\n", ip, port, msg);
if (err) zfree(err); if (err) zfree(err);
freeClusterManagerNode(node); freeClusterManagerNode(node);
return 0; return 0;
@ -4263,6 +4306,104 @@ cleanup:
return success; return success;
} }
static int clusterManagerCommandAddNode(int argc, char **argv) {
int success = 1;
redisReply *reply = NULL;
char *ref_ip = NULL, *ip = NULL;
int ref_port = 0, port = 0;
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port))
goto invalid_args;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port))
goto invalid_args;
clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port,
ref_ip, ref_port);
// Check the existing cluster
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port);
if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
/* If --cluster-master-id was specified, try to resolve it now so that we
* abort before starting with the node configuration. */
clusterManagerNode *master_node = NULL;
if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_SLAVE) {
char *master_id = config.cluster_manager_command.master_id;
if (master_id != NULL) {
master_node = clusterManagerNodeByName(master_id);
if (master_node == NULL) {
clusterManagerLogErr("[ERR] No such master ID %s\n", master_id);
return 0;
}
} else {
master_node = clusterManagerNodeWithLeastReplicas();
assert(master_node != NULL);
printf("Automatically selected master %s:%d\n", master_node->ip,
master_node->port);
}
}
// Add the new node
clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
int added = 0;
CLUSTER_MANAGER_NODE_CONNECT(new_node);
if (new_node->context->err) {
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
ip, port);
success = 0;
goto cleanup;
}
char *err = NULL;
if (!(success = clusterManagerNodeIsCluster(new_node, &err))) {
clusterManagerPrintNotClusterNodeError(new_node, err);
if (err) zfree(err);
goto cleanup;
}
if (!clusterManagerNodeLoadInfo(new_node, 0, &err)) {
if (err) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(new_node, err);
zfree(err);
}
success = 0;
goto cleanup;
}
if (!(success = clusterManagerNodeIsEmpty(new_node, &err))) {
clusterManagerPrintNotEmptyNodeError(new_node, err);
if (err) zfree(err);
goto cleanup;
}
clusterManagerNode *first = listFirst(cluster_manager.nodes)->value;
listAddNodeTail(cluster_manager.nodes, new_node);
added = 1;
// Send CLUSTER MEET command to the new node
clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it "
"join the cluster.\n", ip, port);
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
first->ip, first->port);
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
goto cleanup;
/* Additional configuration is needed if the node is added as a slave. */
if (master_node) {
sleep(1);
clusterManagerWaitForClusterJoin();
clusterManagerLogInfo(">>> Configure node as replica of %s:%d.\n",
master_node->ip, master_node->port);
freeReplyObject(reply);
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER REPLICATE %s",
master_node->name);
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
goto cleanup;
}
clusterManagerLogOk("[OK] New node added correctly.\n");
cleanup:
if (!added && new_node) freeClusterManagerNode(new_node);
if (reply) freeReplyObject(reply);
return success;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandInfo(int argc, char **argv) { static int clusterManagerCommandInfo(int argc, char **argv) {
int port = 0; int port = 0;
char *ip = NULL; char *ip = NULL;
@ -4531,8 +4672,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
nodes_involved++; nodes_involved++;
listAddNodeTail(involved, n); listAddNodeTail(involved, n);
} }
weightedNodes = zmalloc(nodes_involved * weightedNodes = zmalloc(nodes_involved * sizeof(clusterManagerNode *));
sizeof(clusterManagerNode *));
if (weightedNodes == NULL) goto cleanup; if (weightedNodes == NULL) goto cleanup;
/* Check cluster, only proceed if it looks sane. */ /* Check cluster, only proceed if it looks sane. */
clusterManagerCheckCluster(1); clusterManagerCheckCluster(1);