Cluster Manager:

- Almost all Cluster Manager related code moved to
  the same section.
- Many macroes converted to functions
- Added various comments
- Little code restyling
This commit is contained in:
artix 2018-02-22 18:32:39 +01:00
parent 4e0c2f9c3c
commit 7d609ff952

View File

@ -75,54 +75,8 @@
(n->context = redisConnect(n->ip, n->port));
#define CLUSTER_MANAGER_COMMAND(n,...) \
(reconnectingRedisCommand(n->context, __VA_ARGS__))
#define CLUSTER_MANAGER_NODE_INFO(n) (CLUSTER_MANAGER_COMMAND(n, "INFO"))
#define CLUSTER_MANAGER_ERROR(err) do { \
if (cluster_manager.errors == NULL) \
cluster_manager.errors = listCreate(); \
listAddNodeTail(cluster_manager.errors, err); \
clusterManagerLogErr("%s\n", (char *) err); \
} while(0)
#define CLUSTER_MANAGER_RESET_SLOTS(n) do { \
memset(n->slots, 0, sizeof(n->slots)); \
n->slots_count = 0; \
} while(0)
#define CLUSTER_MANAGER_NODEARRAY_INIT(array, alloc_len) do { \
array->nodes = zcalloc(alloc_len * sizeof(clusterManagerNode*));\
array->alloc = array->nodes; \
array->len = alloc_len; \
array->count = 0; \
} while(0)
#define CLUSTER_MANAGER_NODEARRAY_RESET(array) do { \
if (array->nodes > array->alloc) { \
array->len = array->nodes - array->alloc; \
array->nodes = array->alloc; \
array->count = 0; \
int i = 0; \
for(; i < array->len; i++) { \
if (array->nodes[i] != NULL) array->count++;\
} \
} \
} while(0)
#define CLUSTER_MANAGER_NODEARRAY_FREE(array) zfree(array->alloc)
#define CLUSTER_MANAGER_NODEARRAY_SHIFT(array, nodeptr) do {\
assert(array->nodes < (array->nodes + array->len)); \
if (*array->nodes != NULL) array->count--; \
nodeptr = *array->nodes; \
array->nodes++; \
array->len--; \
} while(0)
#define CLUSTER_MANAGER_NODEARRAY_ADD(array, nodeptr) do { \
assert(array->nodes < (array->nodes + array->len)); \
assert(nodeptr != NULL); \
array->nodes[array->count++] = nodeptr; \
} while(0)
#define CLUSTER_MANAGER_NODE_ARRAY_FREE(array) zfree(array->alloc)
#define CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err) \
clusterManagerLogErr("Node %s:%d replied with error:\n%s\n", \
@ -190,6 +144,7 @@ typedef struct clusterManagerCommand {
int flags;
int replicas;
} clusterManagerCommand;
static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
static redisContext *context;
@ -237,88 +192,6 @@ static struct config {
clusterManagerCommand cluster_manager_command;
} config;
/* Cluster Manager */
static struct clusterManager {
list *nodes;
list *errors;
} cluster_manager;
typedef struct clusterManagerNode {
redisContext *context;
sds name;
char *ip;
int port;
uint64_t current_epoch;
time_t ping_sent;
time_t ping_recv;
int flags;
sds replicate;
list replicas;
int dirty;
uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count;
int replicas_count;
list *friends;
sds *migrating;
sds *importing;
int migrating_count;
int importing_count;
} clusterManagerNode;
typedef struct clusterManagerNodeArray {
clusterManagerNode **nodes;
clusterManagerNode **alloc;
int len;
int count;
} clusterManagerNodeArray;
static dictType clusterManagerDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictSdsDestructor /* val destructor */
};
static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err);
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts);
static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err);
static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
int ip_len, clusterManagerNode ***offending, int *offending_len);
static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
int ip_len);
static sds clusterManagerNodeInfo(clusterManagerNode *node);
static void clusterManagerShowNodes(void);
static void clusterManagerShowInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
static void clusterManagerWaitForClusterJoin(void);
static void clusterManagerCheckCluster(int quiet);
static void clusterManagerLog(int level, const char* fmt, ...);
typedef int clusterManagerCommandProc(int argc, char **argv);
typedef struct clusterManagerCommandDef {
char *name;
clusterManagerCommandProc *proc;
int arity;
char *args;
char *options;
} clusterManagerCommandDef;
static int clusterManagerIsConfigConsistent(void);
/* Cluster Manager commands. */
static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv);
/* User preferences. */
static struct pref {
int hints;
@ -1291,14 +1164,6 @@ static redisReply *reconnectingRedisCommand(redisContext *c, const char *fmt, ..
* User interface
*--------------------------------------------------------------------------- */
static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
clusterManagerCommand *cmd = &config.cluster_manager_command;
cmd->name = cmdname;
cmd->argc = argc;
cmd->argv = argc ? argv : NULL;
if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR;
}
static int parseOptions(int argc, char **argv) {
int i;
@ -1828,6 +1693,100 @@ static int evalMode(int argc, char **argv) {
* Cluster Manager mode
*--------------------------------------------------------------------------- */
/* The Cluster Manager global structure */
static struct clusterManager {
list *nodes; /* List of nodes int he configuration. */
list *errors;
} cluster_manager;
typedef struct clusterManagerNode {
redisContext *context;
sds name;
char *ip;
int port;
uint64_t current_epoch;
time_t ping_sent;
time_t ping_recv;
int flags;
sds replicate; /* Master ID if node is a slave */
list replicas;
int dirty; /* Node has changes that can be flushed */
uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count;
int replicas_count;
list *friends;
sds *migrating;
sds *importing;
int migrating_count;
int importing_count;
} clusterManagerNode;
/* Data structure used to represent a sequence of nodes. */
typedef struct clusterManagerNodeArray {
clusterManagerNode **nodes; /* Actual nodes array */
clusterManagerNode **alloc; /* Pointer to the allocated memory */
int len; /* Actual length of the array */
int count; /* Non-NULL nodes count */
} clusterManagerNodeArray;
static dictType clusterManagerDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictSdsDestructor /* val destructor */
};
typedef int clusterManagerCommandProc(int argc, char **argv);
/* Cluster Manager helper functions */
static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err);
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts);
static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err);
static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
int ip_count, clusterManagerNode ***offending, int *offending_len);
static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
int ip_count);
static sds clusterManagerNodeInfo(clusterManagerNode *node);
static void clusterManagerShowNodes(void);
static void clusterManagerShowInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
static void clusterManagerWaitForClusterJoin(void);
static void clusterManagerCheckCluster(int quiet);
static void clusterManagerLog(int level, const char* fmt, ...);
static int clusterManagerIsConfigConsistent(void);
static void clusterManagerOnError(sds err);
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
int len);
static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array);
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
clusterManagerNode **nodeptr);
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
clusterManagerNode *node);
/* Cluster Manager commands. */
static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv);
typedef struct clusterManagerCommandDef {
char *name;
clusterManagerCommandProc *proc;
int arity;
char *args;
char *options;
} clusterManagerCommandDef;
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"cluster-replicas"},
@ -1838,6 +1797,16 @@ clusterManagerCommandDef clusterManagerCommands[] = {
{"help", clusterManagerCommandHelp, 0, NULL, NULL}
};
static void createClusterManagerCommand(char *cmdname, int argc, char **argv) {
clusterManagerCommand *cmd = &config.cluster_manager_command;
cmd->name = cmdname;
cmd->argc = argc;
cmd->argv = argc ? argv : NULL;
if (isColorTerm()) cmd->flags |= CLUSTER_MANAGER_CMD_FLAG_COLOR;
}
static clusterManagerCommandProc *validateClusterManagerCommand(void) {
int i, commands_count = sizeof(clusterManagerCommands) /
sizeof(clusterManagerCommandDef);
@ -1930,7 +1899,7 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
node->migrating_count = 0;
node->importing_count = 0;
node->replicas_count = 0;
CLUSTER_MANAGER_RESET_SLOTS(node);
clusterManagerNodeResetSlots(node);
return node;
}
@ -1954,41 +1923,49 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name) {
return found;
}
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) {
redisReply *info = CLUSTER_MANAGER_NODE_INFO(node);
int is_err = 0;
*err = NULL;
if (info == NULL || (is_err = (info->type == REDIS_REPLY_ERROR))) {
if (is_err && err != NULL) {
static void clusterManagerNodeResetSlots(clusterManagerNode *node) {
memset(node->slots, 0, sizeof(node->slots));
node->slots_count = 0;
}
static redisReply *clusterManagerGetNodeRedisInfo(clusterManagerNode *node,
char **err)
{
redisReply *info = CLUSTER_MANAGER_COMMAND(node, "INFO");
if (err != NULL) *err = NULL;
if (info == NULL) return NULL;
if (info->type == REDIS_REPLY_ERROR) {
if (err != NULL) {
*err = zmalloc((info->len + 1) * sizeof(char));
strcpy(*err, info->str);
}
freeReplyObject(info);
return 0;
return NULL;
}
return info;
}
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) {
redisReply *info = clusterManagerGetNodeRedisInfo(node, err);
if (info == NULL) return 0;
int is_cluster = (int) getLongInfoField(info->str, "cluster_enabled");
freeReplyObject(info);
return is_cluster;
}
/* Checks whether the node is empty. Node is considered not-empty if it has
* some key or if it already knows other nodes */
static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) {
redisReply *info = CLUSTER_MANAGER_NODE_INFO(node);
redisReply *info = clusterManagerGetNodeRedisInfo(node, err);
int is_err = 0, is_empty = 1;
*err = NULL;
if (info == NULL || (is_err = (info->type == REDIS_REPLY_ERROR))) {
if (is_err && err != NULL) {
*err = zmalloc((info->len + 1) * sizeof(char));
strcpy(*err, info->str);
}
is_empty = 0;
goto result;
}
if (info == NULL) return 0;
if (strstr(info->str, "db0:") != NULL) {
is_empty = 0;
goto result;
}
freeReplyObject(info);
info = CLUSTER_MANAGER_COMMAND(node, "CLUSTER INFO");
if (err != NULL) *err = NULL;
if (info == NULL || (is_err = (info->type == REDIS_REPLY_ERROR))) {
if (is_err && err != NULL) {
*err = zmalloc((info->len + 1) * sizeof(char));
@ -2004,8 +1981,37 @@ result:
return is_empty;
}
/* Return the anti-affinity score, which is a measure of the amount of
* violations of anti-affinity in the current cluster layout, that is, how
* badly the masters and slaves are distributed in the different IP
* addresses so that slaves of the same master are not in the master
* host and are also in different hosts.
*
* The score is calculated as follows:
*
* SAME_AS_MASTER = 10000 * each slave in the same IP of its master.
* SAME_AS_SLAVE = 1 * each slave having the same IP as another slave
of the same master.
* FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
*
* So a greater score means a worse anti-affinity level, while zero
* means perfect anti-affinity.
*
* The anti affinity optimizator will try to get a score as low as
* possible. Since we do not want to sacrifice the fact that slaves should
* not be in the same host as the master, we assign 10000 times the score
* to this violation, so that we'll optimize for the second factor only
* if it does not impact the first one.
*
* The ipnodes argument is an array of clusterManagerNodeArray, one for
* each IP, while ip_count is the total number of IPs in the configuration.
*
* The function returns the above score, and the list of
* offending slaves can be stored into the 'offending' argument,
* so that the optimizer can try changing the configuration of the
* slaves violating the anti-affinity goals. */
static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
int ip_len, clusterManagerNode ***offending, int *offending_len)
int ip_count, clusterManagerNode ***offending, int *offending_len)
{
int score = 0, i, j;
int node_len = cluster_manager.nodes->len;
@ -2014,7 +2020,10 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
*offending = zcalloc(node_len * sizeof(clusterManagerNode*));
offending_p = *offending;
}
for (i = 0; i < ip_len; i++) {
/* For each set of nodes in the same host, split by
* related nodes (masters and slaves which are involved in
* replication of each other) */
for (i = 0; i < ip_count; i++) {
clusterManagerNodeArray *node_array = &(ipnodes[i]);
dict *related = dictCreate(&clusterManagerDictType, NULL);
char *ip = NULL;
@ -2038,6 +2047,8 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
else types = sdscat(otypes, "s");
if (types != otypes) dictReplace(related, key, types);
}
/* Now it's trivial to check, for each related group having the
* same host, what is their local score. */
dictIterator *iter = dictGetIterator(related);
dictEntry *entry;
while ((entry = dictNext(iter)) != NULL) {
@ -2048,6 +2059,7 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
if (types[0] == 'm') score += (10000 * (typeslen - 1));
else score += (1 * typeslen);
if (offending == NULL) continue;
/* Populate the list of offending nodes. */
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
@ -2069,15 +2081,16 @@ static int clusterManagerGetAntiAffinityScore(clusterManagerNodeArray *ipnodes,
}
static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
int ip_len)
int ip_count)
{
clusterManagerNode **offenders = NULL;
int score = clusterManagerGetAntiAffinityScore(ipnodes, ip_len, NULL, NULL);
int score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count,
NULL, NULL);
if (score == 0) goto cleanup;
clusterManagerLogInfo(">>> Trying to optimize slaves allocation "
"for anti-affinity\n");
int node_len = cluster_manager.nodes->len;
int maxiter = 500 * node_len;
int maxiter = 500 * node_len; // Effort is proportional to cluster size...
srand(time(NULL));
while (maxiter > 0) {
int offending_len = 0;
@ -2085,9 +2098,14 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
zfree(offenders);
offenders = NULL;
}
score = clusterManagerGetAntiAffinityScore(ipnodes, ip_len, &offenders,
score = clusterManagerGetAntiAffinityScore(ipnodes,
ip_count,
&offenders,
&offending_len);
if (score == 0) break;
if (score == 0) break; // Optimal anti affinity reached
/* We'll try to randomly swap a slave's assigned master causing
* an affinity problem with another random slave, to see if we
* can improve the affinity. */
int rand_idx = rand() % offending_len;
clusterManagerNode *first = offenders[rand_idx],
*second = NULL;
@ -2112,8 +2130,12 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
*second_master = second->replicate;
first->replicate = second_master, first->dirty = 1;
second->replicate = first_master, second->dirty = 1;
int new_score = clusterManagerGetAntiAffinityScore(ipnodes, ip_len,
int new_score = clusterManagerGetAntiAffinityScore(ipnodes,
ip_count,
NULL, NULL);
/* If the change actually makes thing worse, revert. Otherwise
* leave as it is becuase the best solution may need a few
* combined swaps. */
if (new_score > score) {
first->replicate = first_master;
second->replicate = second_master;
@ -2121,7 +2143,7 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
zfree(other_replicas);
maxiter--;
}
score = clusterManagerGetAntiAffinityScore(ipnodes, ip_len, NULL, NULL);
score = clusterManagerGetAntiAffinityScore(ipnodes, ip_count, NULL, NULL);
char *msg;
int perfect = (score == 0);
int log_level = (perfect ? CLUSTER_MANAGER_LOG_LVL_SUCCESS :
@ -2136,6 +2158,7 @@ cleanup:
zfree(offenders);
}
/* Return a representable string of the node's slots */
static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
sds slots = sdsempty();
int first_range_idx = -1, last_slot_idx = -1, i;
@ -2303,11 +2326,13 @@ cleanup:
return success;
}
/* Flush the dirty node configuration by calling replicate for slaves or
* adding the slots for masters. */
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
if (!node->dirty) return 0;
redisReply *reply = NULL;
int is_err = 0, success = 1;
*err = NULL;
if (err != NULL) *err = NULL;
if (node->replicate != NULL) {
reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER REPLICATE %s",
node->replicate);
@ -2317,14 +2342,15 @@ static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
strcpy(*err, reply->str);
}
success = 0;
/* If the cluster did not already joined it is possible that
* the slave does not know the master node yet. So on errors
* we return ASAP leaving the dirty flag set, to flush the
* config later. */
goto cleanup;
}
} else {
int added = clusterManagerAddSlots(node, err);
if (!added || *err != NULL) {
success = 0;
goto cleanup;
}
if (!added || *err != NULL) success = 0;
}
node->dirty = 0;
cleanup:
@ -2342,6 +2368,11 @@ static void clusterManagerWaitForClusterJoin(void) {
printf("\n");
}
/* Load node's cluster configuration by calling "CLUSTER NODES" command.
* Node's configuration (name, replicate, slots, ...) is then updated.
* If CLUSTER_MANAGER_OPT_GETFRIENDS flag is set into 'opts' argument,
* and node already knows other nodes, the node's friends list is populated
* with the other nodes info. */
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err)
{
@ -2391,7 +2422,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
if (myself) {
node->flags |= CLUSTER_MANAGER_FLAG_MYSELF;
currentNode = node;
CLUSTER_MANAGER_RESET_SLOTS(node);
clusterManagerNodeResetSlots(node);
if (i == 8) {
int remaining = strlen(line);
//TODO: just while(remaining) && assign p inside the block
@ -2501,7 +2532,6 @@ cleanup:
* point. All nodes will be loaded inside the cluster_manager.nodes list.
* Warning: if something goes wrong, it will free the starting node before
* returning 0. */
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
if (node->context == NULL)
CLUSTER_MANAGER_NODE_CONNECT(node);
@ -2681,7 +2711,6 @@ static int clusterManagerIsConfigConsistent(void) {
if (cluster_manager.nodes == NULL) return 0;
int consistent = (listLength(cluster_manager.nodes) <= 1);
// If the Cluster has only one node, it's always consistent
// Does it make sense?
if (consistent) return 1;
sds first_cfg = NULL;
listIter li;
@ -2705,6 +2734,13 @@ static int clusterManagerIsConfigConsistent(void) {
return consistent;
}
static void clusterManagerOnError(sds err) {
if (cluster_manager.errors == NULL)
cluster_manager.errors = listCreate();
listAddNodeTail(cluster_manager.errors, err);
clusterManagerLogErr("%s\n", (char *) err);
}
static int clusterManagerGetCoveredSlots(char *all_slots) {
if (cluster_manager.nodes == NULL) return 0;
listIter li;
@ -2732,7 +2768,7 @@ static void clusterManagerCheckCluster(int quiet) {
if (!quiet) clusterManagerShowNodes();
if (!clusterManagerIsConfigConsistent()) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
CLUSTER_MANAGER_ERROR(err);
clusterManagerOnError(err);
} else {
clusterManagerLogOk("[OK] All nodes agree about slots "
"configuration.\n");
@ -2761,7 +2797,7 @@ static void clusterManagerCheckCluster(int quiet) {
errstr = sdscatfmt(errstr, fmt, slot);
}
errstr = sdscat(errstr, ".");
CLUSTER_MANAGER_ERROR(errstr);
clusterManagerOnError(errstr);
}
if (n->importing != NULL) {
if (open_slots == NULL)
@ -2779,7 +2815,7 @@ static void clusterManagerCheckCluster(int quiet) {
errstr = sdscatfmt(errstr, fmt, slot);
}
errstr = sdscat(errstr, ".");
CLUSTER_MANAGER_ERROR(errstr);
clusterManagerOnError(errstr);
}
}
if (open_slots != NULL) {
@ -2808,7 +2844,7 @@ static void clusterManagerCheckCluster(int quiet) {
err = sdscatprintf(err, "[ERR] Not all %d slots are "
"covered by nodes.\n",
CLUSTER_MANAGER_SLOTS);
CLUSTER_MANAGER_ERROR(err);
clusterManagerOnError(err);
}
}
@ -2832,6 +2868,53 @@ static void clusterManagerLog(int level, const char* fmt, ...) {
if (use_colors) printf("\033[" LOG_COLOR_RESET);
}
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
int alloc_len)
{
array->nodes = zcalloc(alloc_len * sizeof(clusterManagerNode*));
array->alloc = array->nodes;
array->len = alloc_len;
array->count = 0;
}
/* Reset array->nodes to the original array allocation and re-count non-NULL
* nodes. */
static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) {
if (array->nodes > array->alloc) {
array->len = array->nodes - array->alloc;
array->nodes = array->alloc;
array->count = 0;
int i = 0;
for(; i < array->len; i++) {
if (array->nodes[i] != NULL) array->count++;
}
}
}
/* Shift array->nodes and store the shifted node into 'nodeptr'. */
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
clusterManagerNode **nodeptr)
{
assert(array->nodes < (array->nodes + array->len));
/* If the first node to be shifted is not NULL, decrement count. */
if (*array->nodes != NULL) array->count--;
/* Store the first node to be shifted into 'nodeptr'. */
*nodeptr = *array->nodes;
/* Shift the nodes array and decrement length. */
array->nodes++;
array->len--;
}
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
clusterManagerNode *node)
{
assert(array->nodes < (array->nodes + array->len));
assert(node != NULL);
assert(array->count < array->len);
array->nodes[array->count++] = node;
}
/* Execute redis-cli in Cluster Manager mode */
static void clusterManagerMode(clusterManagerCommandProc *proc) {
int argc = config.cluster_manager_command.argc;
char **argv = config.cluster_manager_command.argv;
@ -2919,7 +3002,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
}
clusterManagerLogInfo(">>> Performing hash slots allocation "
"on %d nodes...\n", node_len);
int interleaved_len = 0, ips_len = 0;
int interleaved_len = 0, ip_count = 0;
clusterManagerNode **interleaved = zcalloc(node_len*sizeof(**interleaved));
char **ips = zcalloc(node_len * sizeof(char*));
clusterManagerNodeArray *ip_nodes = zcalloc(node_len * sizeof(*ip_nodes));
@ -2929,7 +3012,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
int found = 0;
for (i = 0; i < ips_len; i++) {
for (i = 0; i < ip_count; i++) {
char *ip = ips[i];
if (!strcmp(ip, n->ip)) {
found = 1;
@ -2937,19 +3020,19 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
}
}
if (!found) {
ips[ips_len++] = n->ip;
ips[ip_count++] = n->ip;
}
clusterManagerNodeArray *node_array = &(ip_nodes[i]);
if (node_array->nodes == NULL)
CLUSTER_MANAGER_NODEARRAY_INIT(node_array, node_len);
CLUSTER_MANAGER_NODEARRAY_ADD(node_array, n);
clusterManagerNodeArrayInit(node_array, node_len);
clusterManagerNodeArrayAdd(node_array, n);
}
while (interleaved_len < node_len) {
for (i = 0; i < ips_len; i++) {
for (i = 0; i < ip_count; i++) {
clusterManagerNodeArray *node_array = &(ip_nodes[i]);
if (node_array->count > 0) {
clusterManagerNode *n;
CLUSTER_MANAGER_NODEARRAY_SHIFT(node_array, n);
clusterManagerNode *n = NULL;
clusterManagerNodeArrayShift(node_array, &n);
interleaved[interleaved_len++] = n;
}
}
@ -3019,11 +3102,11 @@ assign_replicas:
printf("Adding extra replicas...\n");
goto assign_replicas;
}
for (i = 0; i < ips_len; i++) {
for (i = 0; i < ip_count; i++) {
clusterManagerNodeArray *node_array = ip_nodes + i;
CLUSTER_MANAGER_NODEARRAY_RESET(node_array);
clusterManagerNodeArrayReset(node_array);
}
clusterManagerOptimizeAntiAffinity(ip_nodes, ips_len);
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
clusterManagerShowNodes();
printf("Can I set the above configuration? %s", "(type 'yes' to accept): ");
fflush(stdout);
@ -3031,7 +3114,6 @@ assign_replicas:
int nread = read(fileno(stdin),buf,4);
buf[3] = '\0';
if (nread != 0 && !strcmp("yes", buf)) {
printf("\nFlushing configuration!\n");
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;
@ -3128,7 +3210,7 @@ cleanup:
zfree(ips);
for (i = 0; i < node_len; i++) {
clusterManagerNodeArray *node_array = ip_nodes + i;
CLUSTER_MANAGER_NODEARRAY_FREE(node_array);
CLUSTER_MANAGER_NODE_ARRAY_FREE(node_array);
}
zfree(ip_nodes);
return success;