1
0
mirror of https://github.com/fluencelabs/redis synced 2025-03-21 01:50:50 +00:00

Cluster Manager: code improvements and more comments added.

This commit is contained in:
artix 2018-04-19 18:52:01 +02:00
parent 5f358dae33
commit 5bc2c98789

@ -172,6 +172,7 @@ typedef struct clusterManagerCommand {
int pipeline; int pipeline;
float threshold; float threshold;
} clusterManagerCommand; } clusterManagerCommand;
static void createClusterManagerCommand(char *cmdname, int argc, char **argv); static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
@ -1788,7 +1789,7 @@ static int evalMode(int argc, char **argv) {
/* The Cluster Manager global structure */ /* The Cluster Manager global structure */
static struct clusterManager { static struct clusterManager {
list *nodes; /* List of nodes int he configuration. */ list *nodes; /* List of nodes in the configuration. */
list *errors; list *errors;
} cluster_manager; } cluster_manager;
@ -1821,7 +1822,7 @@ typedef struct clusterManagerNode {
int balance; /* Used by rebalance */ int balance; /* Used by rebalance */
} clusterManagerNode; } clusterManagerNode;
/* Data structure used to represent a sequence of nodes. */ /* Data structure used to represent a sequence of cluster nodes. */
typedef struct clusterManagerNodeArray { typedef struct clusterManagerNodeArray {
clusterManagerNode **nodes; /* Actual nodes array */ clusterManagerNode **nodes; /* Actual nodes array */
clusterManagerNode **alloc; /* Pointer to the allocated memory */ clusterManagerNode **alloc; /* Pointer to the allocated memory */
@ -1829,7 +1830,7 @@ typedef struct clusterManagerNodeArray {
int count; /* Non-NULL nodes count */ int count; /* Non-NULL nodes count */
} clusterManagerNodeArray; } clusterManagerNodeArray;
/* Used for reshard table. */ /* Used for the reshard table. */
typedef struct clusterManagerReshardTableItem { typedef struct clusterManagerReshardTableItem {
clusterManagerNode *source; clusterManagerNode *source;
int slot; int slot;
@ -1865,7 +1866,7 @@ static void clusterManagerOptimizeAntiAffinity(clusterManagerNodeArray *ipnodes,
int ip_count); int ip_count);
static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent); static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent);
static void clusterManagerShowNodes(void); static void clusterManagerShowNodes(void);
static void clusterManagerShowInfo(void); static void clusterManagerShowClusterInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err); static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
static void clusterManagerWaitForClusterJoin(void); static void clusterManagerWaitForClusterJoin(void);
static int clusterManagerCheckCluster(int quiet); static int clusterManagerCheckCluster(int quiet);
@ -2067,8 +2068,9 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
clusterManagerNodeResetSlots(node); clusterManagerNodeResetSlots(node);
return node; return node;
} }
/* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the /* Check whether reply is NULL or its type is REDIS_REPLY_ERROR. In the
* latest case, if 'err' arg is not NULL, it gets allocated with a copy * latest case, if the 'err' arg is not NULL, it gets allocated with a copy
* of reply error (it's up to the caller function to free it), elsewhere * of reply error (it's up to the caller function to free it), elsewhere
* the error is directly printed. */ * the error is directly printed. */
static int clusterManagerCheckRedisReply(clusterManagerNode *n, static int clusterManagerCheckRedisReply(clusterManagerNode *n,
@ -2100,7 +2102,7 @@ static void clusterManagerRemoveNodeFromList(list *nodelist,
} }
} }
/* Return the node with the specified ID or NULL. */ /* Return the node with the specified name (ID) or NULL. */
static clusterManagerNode *clusterManagerNodeByName(const char *name) { static clusterManagerNode *clusterManagerNodeByName(const char *name) {
if (cluster_manager.nodes == NULL) return NULL; if (cluster_manager.nodes == NULL) return NULL;
clusterManagerNode *found = NULL; clusterManagerNode *found = NULL;
@ -2121,7 +2123,7 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name) {
return found; return found;
} }
/* Like get_node_by_name but the specified name can be just the first /* Like clusterManagerNodeByName but the specified name can be just the first
* part of the node ID as long as the prefix in unique across the * part of the node ID as long as the prefix in unique across the
* cluster. * cluster.
*/ */
@ -2152,6 +2154,7 @@ static void clusterManagerNodeResetSlots(clusterManagerNode *node) {
node->slots_count = 0; node->slots_count = 0;
} }
/* Call "INFO" redis command on the specified node and return the reply. */
static redisReply *clusterManagerGetNodeRedisInfo(clusterManagerNode *node, static redisReply *clusterManagerGetNodeRedisInfo(clusterManagerNode *node,
char **err) char **err)
{ {
@ -2181,7 +2184,7 @@ static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) {
* some key or if it already knows other nodes */ * some key or if it already knows other nodes */
static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) { static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) {
redisReply *info = clusterManagerGetNodeRedisInfo(node, err); redisReply *info = clusterManagerGetNodeRedisInfo(node, err);
int is_err = 0, is_empty = 1; int is_empty = 1;
if (info == NULL) return 0; if (info == NULL) return 0;
if (strstr(info->str, "db0:") != NULL) { if (strstr(info->str, "db0:") != NULL) {
is_empty = 0; is_empty = 0;
@ -2190,11 +2193,7 @@ static int clusterManagerNodeIsEmpty(clusterManagerNode *node, char **err) {
freeReplyObject(info); freeReplyObject(info);
info = CLUSTER_MANAGER_COMMAND(node, "CLUSTER INFO"); info = CLUSTER_MANAGER_COMMAND(node, "CLUSTER INFO");
if (err != NULL) *err = NULL; if (err != NULL) *err = NULL;
if (info == NULL || (is_err = (info->type == REDIS_REPLY_ERROR))) { if (!clusterManagerCheckRedisReply(node, info, err)) {
if (is_err && err != NULL) {
*err = zmalloc((info->len + 1) * sizeof(char));
strcpy(*err, info->str);
}
is_empty = 0; is_empty = 0;
goto result; goto result;
} }
@ -2422,7 +2421,7 @@ static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
* However if the key contains the {...} pattern, only the part between * However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain * { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */ * keys to be in the same node (assuming no resharding is in progress). */
static unsigned int keyHashSlot(char *key, int keylen) { static unsigned int clusterManagerKeyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */ int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++) for (s = 0; s < keylen; s++)
@ -2443,6 +2442,7 @@ static unsigned int keyHashSlot(char *key, int keylen) {
return crc16(key+s+1,e-s-1) & 0x3FFF; return crc16(key+s+1,e-s-1) & 0x3FFF;
} }
/* Return a string representation of the cluster node. */
static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) { static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
sds info = sdsempty(); sds info = sdsempty();
sds spaces = sdsempty(); sds spaces = sdsempty();
@ -2484,7 +2484,7 @@ static void clusterManagerShowNodes(void) {
} }
} }
static void clusterManagerShowInfo(void) { static void clusterManagerShowClusterInfo(void) {
int masters = 0; int masters = 0;
int keys = 0; int keys = 0;
listIter li; listIter li;
@ -2533,11 +2533,12 @@ static void clusterManagerShowInfo(void) {
printf("%.2f keys per slot on average.\n", keys_per_slot); printf("%.2f keys per slot on average.\n", keys_per_slot);
} }
/* Flush dirty slots configuration of the node by calling CLUSTER ADDSLOTS */
static int clusterManagerAddSlots(clusterManagerNode *node, char**err) static int clusterManagerAddSlots(clusterManagerNode *node, char**err)
{ {
redisReply *reply = NULL; redisReply *reply = NULL;
void *_reply = NULL; void *_reply = NULL;
int is_err = 0, success = 1; int success = 1;
/* First two args are used for the command itself. */ /* First two args are used for the command itself. */
int argc = node->slots_count + 2; int argc = node->slots_count + 2;
sds *argv = zmalloc(argc * sizeof(*argv)); sds *argv = zmalloc(argc * sizeof(*argv));
@ -2566,14 +2567,7 @@ static int clusterManagerAddSlots(clusterManagerNode *node, char**err)
goto cleanup; goto cleanup;
} }
reply = (redisReply*) _reply; reply = (redisReply*) _reply;
if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) { success = clusterManagerCheckRedisReply(node, reply, err);
if (is_err && err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
}
success = 0;
goto cleanup;
}
cleanup: cleanup:
zfree(argvlen); zfree(argvlen);
if (argv != NULL) { if (argv != NULL) {
@ -2821,7 +2815,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source,
} }
/* Flush the dirty node configuration by calling replicate for slaves or /* Flush the dirty node configuration by calling replicate for slaves or
* adding the slots for masters. */ * adding the slots defined in the masters. */
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) { static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err) {
if (!node->dirty) return 0; if (!node->dirty) return 0;
redisReply *reply = NULL; redisReply *reply = NULL;
@ -2852,6 +2846,7 @@ cleanup:
return success; return success;
} }
/* Wait until the cluster configuration is consistent. */
static void clusterManagerWaitForClusterJoin(void) { static void clusterManagerWaitForClusterJoin(void) {
printf("Waiting for the cluster to join\n"); printf("Waiting for the cluster to join\n");
while(!clusterManagerIsConfigConsistent()) { while(!clusterManagerIsConfigConsistent()) {
@ -2871,13 +2866,9 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err) char **err)
{ {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
int is_err = 0, success = 1; int success = 1;
*err = NULL; *err = NULL;
if (reply == NULL || (is_err = (reply->type == REDIS_REPLY_ERROR))) { if (!clusterManagerCheckRedisReply(node, reply, err)) {
if (is_err && err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
}
success = 0; success = 0;
goto cleanup; goto cleanup;
} }
@ -3114,6 +3105,7 @@ invalid_friend:
return 1; return 1;
} }
/* Compare functions used by various sorting operations. */
int clusterManagerSlotCompare(const void *slot1, const void *slot2) { int clusterManagerSlotCompare(const void *slot1, const void *slot2) {
const char **i1 = (const char **)slot1; const char **i1 = (const char **)slot1;
const char **i2 = (const char **)slot2; const char **i2 = (const char **)slot2;
@ -3252,6 +3244,7 @@ static int clusterManagerIsConfigConsistent(void) {
return consistent; return consistent;
} }
/* Add the error string to cluster_manager.errors and print it. */
static void clusterManagerOnError(sds err) { static void clusterManagerOnError(sds err) {
if (cluster_manager.errors == NULL) if (cluster_manager.errors == NULL)
cluster_manager.errors = listCreate(); cluster_manager.errors = listCreate();
@ -3259,6 +3252,9 @@ static void clusterManagerOnError(sds err) {
clusterManagerLogErr("%s\n", (char *) err); clusterManagerLogErr("%s\n", (char *) err);
} }
/* Check the slots coverage of the cluster. The 'all_slots' argument must be
* and array of 16384 bytes. Every covered slot will be set to 1 in the
* 'all_slots' array. The function returns the total number if covered slots.*/
static int clusterManagerGetCoveredSlots(char *all_slots) { static int clusterManagerGetCoveredSlots(char *all_slots) {
if (cluster_manager.nodes == NULL) return 0; if (cluster_manager.nodes == NULL) return 0;
listIter li; listIter li;
@ -4482,7 +4478,7 @@ static int clusterManagerCommandInfo(int argc, char **argv) {
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port); clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerShowInfo(); clusterManagerShowClusterInfo();
return 1; return 1;
invalid_args: invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
@ -4495,7 +4491,7 @@ static int clusterManagerCommandCheck(int argc, char **argv) {
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port); clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerShowInfo(); clusterManagerShowClusterInfo();
return clusterManagerCheckCluster(0); return clusterManagerCheckCluster(0);
invalid_args: invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
@ -5047,7 +5043,7 @@ static int clusterManagerCommandImport(int argc, char **argv) {
redisReply *kr = src_reply->element[1]->element[i]; redisReply *kr = src_reply->element[1]->element[i];
assert(kr->type == REDIS_REPLY_STRING); assert(kr->type == REDIS_REPLY_STRING);
char *key = kr->str; char *key = kr->str;
uint16_t slot = keyHashSlot(key, kr->len); uint16_t slot = clusterManagerKeyHashSlot(key, kr->len);
clusterManagerNode *target = slots_map[slot]; clusterManagerNode *target = slots_map[slot];
printf("Migrating %s to %s:%d: ", key, target->ip, target->port); printf("Migrating %s to %s:%d: ", key, target->ip, target->port);
redisReply *r = reconnectingRedisCommand(src_ctx, cmdfmt, redisReply *r = reconnectingRedisCommand(src_ctx, cmdfmt,