mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 00:50:50 +00:00
Cluster Manager: add-node command.
This commit is contained in:
parent
efa51f1617
commit
aeaf6ee1c3
168
src/redis-cli.c
168
src/redis-cli.c
@ -165,6 +165,7 @@ typedef struct clusterManagerCommand {
|
||||
char *from;
|
||||
char *to;
|
||||
char **weight;
|
||||
char *master_id;
|
||||
int weight_argc;
|
||||
int slots;
|
||||
int timeout;
|
||||
@ -1299,6 +1300,8 @@ static int parseOptions(int argc, char **argv) {
|
||||
usage();
|
||||
} else if (!strcmp(argv[i],"--cluster-replicas") && !lastarg) {
|
||||
config.cluster_manager_command.replicas = atoi(argv[++i]);
|
||||
} else if (!strcmp(argv[i],"--cluster-master-id") && !lastarg) {
|
||||
config.cluster_manager_command.master_id = argv[++i];
|
||||
} else if (!strcmp(argv[i],"--cluster-from") && !lastarg) {
|
||||
config.cluster_manager_command.from = argv[++i];
|
||||
} else if (!strcmp(argv[i],"--cluster-to") && !lastarg) {
|
||||
@ -1335,6 +1338,9 @@ static int parseOptions(int argc, char **argv) {
|
||||
} else if (!strcmp(argv[i],"--cluster-copy")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_COPY;
|
||||
} else if (!strcmp(argv[i],"--cluster-slave")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_SLAVE;
|
||||
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
|
||||
@ -1847,6 +1853,8 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name);
|
||||
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
|
||||
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
|
||||
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
|
||||
static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node,
|
||||
char *err);
|
||||
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
|
||||
char **err);
|
||||
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts);
|
||||
@ -1875,6 +1883,7 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
|
||||
/* Cluster Manager commands. */
|
||||
|
||||
static int clusterManagerCommandCreate(int argc, char **argv);
|
||||
static int clusterManagerCommandAddNode(int argc, char **argv);
|
||||
static int clusterManagerCommandInfo(int argc, char **argv);
|
||||
static int clusterManagerCommandCheck(int argc, char **argv);
|
||||
static int clusterManagerCommandFix(int argc, char **argv);
|
||||
@ -1895,6 +1904,8 @@ typedef struct clusterManagerCommandDef {
|
||||
clusterManagerCommandDef clusterManagerCommands[] = {
|
||||
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
||||
"replicas <arg>"},
|
||||
{"add-node", clusterManagerCommandAddNode, 2,
|
||||
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
|
||||
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
||||
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
|
||||
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
||||
@ -3030,8 +3041,7 @@ static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
|
||||
opts |= CLUSTER_MANAGER_OPT_GETFRIENDS;
|
||||
char *e = NULL;
|
||||
if (!clusterManagerNodeIsCluster(node, &e)) {
|
||||
char *msg = (e ? e : "is not configured as a cluster node.");
|
||||
clusterManagerLogErr("[ERR] Node %s:%d %s\n",node->ip,node->port,msg);
|
||||
clusterManagerPrintNotClusterNodeError(node, e);
|
||||
if (e) zfree(e);
|
||||
freeClusterManagerNode(node);
|
||||
return 0;
|
||||
@ -3313,6 +3323,27 @@ static clusterManagerNode * clusterManagerGetNodeWithMostKeysInSlot(list *nodes,
|
||||
return node;
|
||||
}
|
||||
|
||||
/* This function returns the master that has the least number of replicas
|
||||
* in the cluster. If there are multiple masters with the same smaller
|
||||
* number of replicas, one at random is returned. */
|
||||
|
||||
static clusterManagerNode *clusterManagerNodeWithLeastReplicas() {
|
||||
clusterManagerNode *node = NULL;
|
||||
int lowest_count = 0;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (node->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (node == NULL || n->replicas_count < lowest_count) {
|
||||
node = n;
|
||||
lowest_count = n->replicas_count;
|
||||
}
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
int i, fixed = 0;
|
||||
list *none = NULL, *single = NULL, *multi = NULL;
|
||||
@ -3966,6 +3997,26 @@ static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
|
||||
array->nodes[array->count++] = node;
|
||||
}
|
||||
|
||||
static void clusterManagerPrintNotEmptyNodeError(clusterManagerNode *node,
|
||||
char *err)
|
||||
{
|
||||
char *msg;
|
||||
if (err) msg = err;
|
||||
else {
|
||||
msg = "is not empty. Either the node already knows other "
|
||||
"nodes (check with CLUSTER NODES) or contains some "
|
||||
"key in database 0.";
|
||||
}
|
||||
clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg);
|
||||
}
|
||||
|
||||
static void clusterManagerPrintNotClusterNodeError(clusterManagerNode *node,
|
||||
char *err)
|
||||
{
|
||||
char *msg = (err ? err : "is not configured as a cluster node.");
|
||||
clusterManagerLogErr("[ERR] Node %s:%d %s\n", node->ip, node->port, msg);
|
||||
}
|
||||
|
||||
/* Execute redis-cli in Cluster Manager mode */
|
||||
static void clusterManagerMode(clusterManagerCommandProc *proc) {
|
||||
int argc = config.cluster_manager_command.argc;
|
||||
@ -4008,8 +4059,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
|
||||
}
|
||||
char *err = NULL;
|
||||
if (!clusterManagerNodeIsCluster(node, &err)) {
|
||||
char *msg = (err ? err : "is not configured as a cluster node.");
|
||||
clusterManagerLogErr("[ERR] Node %s:%d %s\n", ip, port, msg);
|
||||
clusterManagerPrintNotClusterNodeError(node, err);
|
||||
if (err) zfree(err);
|
||||
freeClusterManagerNode(node);
|
||||
return 0;
|
||||
@ -4025,14 +4075,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
|
||||
}
|
||||
err = NULL;
|
||||
if (!clusterManagerNodeIsEmpty(node, &err)) {
|
||||
char *msg;
|
||||
if (err) msg = err;
|
||||
else {
|
||||
msg = "is not empty. Either the node already knows other "
|
||||
"nodes (check with CLUSTER NODES) or contains some "
|
||||
"key in database 0.";
|
||||
}
|
||||
clusterManagerLogErr("[ERR] Node %s:%d %s\n", ip, port, msg);
|
||||
clusterManagerPrintNotEmptyNodeError(node, err);
|
||||
if (err) zfree(err);
|
||||
freeClusterManagerNode(node);
|
||||
return 0;
|
||||
@ -4263,6 +4306,104 @@ cleanup:
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerCommandAddNode(int argc, char **argv) {
|
||||
int success = 1;
|
||||
redisReply *reply = NULL;
|
||||
char *ref_ip = NULL, *ip = NULL;
|
||||
int ref_port = 0, port = 0;
|
||||
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port))
|
||||
goto invalid_args;
|
||||
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port))
|
||||
goto invalid_args;
|
||||
clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port,
|
||||
ref_ip, ref_port);
|
||||
// Check the existing cluster
|
||||
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port);
|
||||
if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
|
||||
if (!clusterManagerCheckCluster(0)) return 0;
|
||||
|
||||
/* If --cluster-master-id was specified, try to resolve it now so that we
|
||||
* abort before starting with the node configuration. */
|
||||
clusterManagerNode *master_node = NULL;
|
||||
if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_SLAVE) {
|
||||
char *master_id = config.cluster_manager_command.master_id;
|
||||
if (master_id != NULL) {
|
||||
master_node = clusterManagerNodeByName(master_id);
|
||||
if (master_node == NULL) {
|
||||
clusterManagerLogErr("[ERR] No such master ID %s\n", master_id);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
master_node = clusterManagerNodeWithLeastReplicas();
|
||||
assert(master_node != NULL);
|
||||
printf("Automatically selected master %s:%d\n", master_node->ip,
|
||||
master_node->port);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the new node
|
||||
clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
|
||||
int added = 0;
|
||||
CLUSTER_MANAGER_NODE_CONNECT(new_node);
|
||||
if (new_node->context->err) {
|
||||
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
|
||||
ip, port);
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
char *err = NULL;
|
||||
if (!(success = clusterManagerNodeIsCluster(new_node, &err))) {
|
||||
clusterManagerPrintNotClusterNodeError(new_node, err);
|
||||
if (err) zfree(err);
|
||||
goto cleanup;
|
||||
}
|
||||
if (!clusterManagerNodeLoadInfo(new_node, 0, &err)) {
|
||||
if (err) {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(new_node, err);
|
||||
zfree(err);
|
||||
}
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
if (!(success = clusterManagerNodeIsEmpty(new_node, &err))) {
|
||||
clusterManagerPrintNotEmptyNodeError(new_node, err);
|
||||
if (err) zfree(err);
|
||||
goto cleanup;
|
||||
}
|
||||
clusterManagerNode *first = listFirst(cluster_manager.nodes)->value;
|
||||
listAddNodeTail(cluster_manager.nodes, new_node);
|
||||
added = 1;
|
||||
|
||||
// Send CLUSTER MEET command to the new node
|
||||
clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it "
|
||||
"join the cluster.\n", ip, port);
|
||||
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
|
||||
first->ip, first->port);
|
||||
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
|
||||
goto cleanup;
|
||||
|
||||
/* Additional configuration is needed if the node is added as a slave. */
|
||||
if (master_node) {
|
||||
sleep(1);
|
||||
clusterManagerWaitForClusterJoin();
|
||||
clusterManagerLogInfo(">>> Configure node as replica of %s:%d.\n",
|
||||
master_node->ip, master_node->port);
|
||||
freeReplyObject(reply);
|
||||
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER REPLICATE %s",
|
||||
master_node->name);
|
||||
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
|
||||
goto cleanup;
|
||||
}
|
||||
clusterManagerLogOk("[OK] New node added correctly.\n");
|
||||
cleanup:
|
||||
if (!added && new_node) freeClusterManagerNode(new_node);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
invalid_args:
|
||||
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int clusterManagerCommandInfo(int argc, char **argv) {
|
||||
int port = 0;
|
||||
char *ip = NULL;
|
||||
@ -4531,8 +4672,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
|
||||
nodes_involved++;
|
||||
listAddNodeTail(involved, n);
|
||||
}
|
||||
weightedNodes = zmalloc(nodes_involved *
|
||||
sizeof(clusterManagerNode *));
|
||||
weightedNodes = zmalloc(nodes_involved * sizeof(clusterManagerNode *));
|
||||
if (weightedNodes == NULL) goto cleanup;
|
||||
/* Check cluster, only proceed if it looks sane. */
|
||||
clusterManagerCheckCluster(1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user