mirror of
https://github.com/fluencelabs/redis
synced 2025-03-30 22:31:03 +00:00
Merge pull request #5881 from artix75/cluster_manager_join_issues
Cluster Manager: create command checks for issues during "CLUSTER MEET"
This commit is contained in:
commit
5f2a256ca9
175
src/redis-cli.c
175
src/redis-cli.c
@ -130,6 +130,8 @@
|
|||||||
#define CLUSTER_MANAGER_LOG_LVL_ERR 3
|
#define CLUSTER_MANAGER_LOG_LVL_ERR 3
|
||||||
#define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4
|
#define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4
|
||||||
|
|
||||||
|
#define CLUSTER_JOIN_CHECK_AFTER 20
|
||||||
|
|
||||||
#define LOG_COLOR_BOLD "29;1m"
|
#define LOG_COLOR_BOLD "29;1m"
|
||||||
#define LOG_COLOR_RED "31;1m"
|
#define LOG_COLOR_RED "31;1m"
|
||||||
#define LOG_COLOR_GREEN "32;1m"
|
#define LOG_COLOR_GREEN "32;1m"
|
||||||
@ -1974,6 +1976,15 @@ typedef struct clusterManagerReshardTableItem {
|
|||||||
int slot;
|
int slot;
|
||||||
} clusterManagerReshardTableItem;
|
} clusterManagerReshardTableItem;
|
||||||
|
|
||||||
|
/* Info about a cluster internal link. */
|
||||||
|
|
||||||
|
typedef struct clusterManagerLink {
|
||||||
|
sds node_name;
|
||||||
|
sds node_addr;
|
||||||
|
int connected;
|
||||||
|
int handshaking;
|
||||||
|
} clusterManagerLink;
|
||||||
|
|
||||||
static dictType clusterManagerDictType = {
|
static dictType clusterManagerDictType = {
|
||||||
dictSdsHash, /* hash function */
|
dictSdsHash, /* hash function */
|
||||||
NULL, /* key dup */
|
NULL, /* key dup */
|
||||||
@ -1983,6 +1994,15 @@ static dictType clusterManagerDictType = {
|
|||||||
dictSdsDestructor /* val destructor */
|
dictSdsDestructor /* val destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static dictType clusterManagerLinkDictType = {
|
||||||
|
dictSdsHash, /* hash function */
|
||||||
|
NULL, /* key dup */
|
||||||
|
NULL, /* val dup */
|
||||||
|
dictSdsKeyCompare, /* key compare */
|
||||||
|
dictSdsDestructor, /* key destructor */
|
||||||
|
dictListDestructor /* val destructor */
|
||||||
|
};
|
||||||
|
|
||||||
typedef int clusterManagerCommandProc(int argc, char **argv);
|
typedef int clusterManagerCommandProc(int argc, char **argv);
|
||||||
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
|
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
|
||||||
clusterManagerNode *n, int bulk_idx);
|
clusterManagerNode *n, int bulk_idx);
|
||||||
@ -2012,6 +2032,7 @@ static void clusterManagerWaitForClusterJoin(void);
|
|||||||
static int clusterManagerCheckCluster(int quiet);
|
static int clusterManagerCheckCluster(int quiet);
|
||||||
static void clusterManagerLog(int level, const char* fmt, ...);
|
static void clusterManagerLog(int level, const char* fmt, ...);
|
||||||
static int clusterManagerIsConfigConsistent(void);
|
static int clusterManagerIsConfigConsistent(void);
|
||||||
|
static dict *clusterManagerGetLinkStatus(void);
|
||||||
static void clusterManagerOnError(sds err);
|
static void clusterManagerOnError(sds err);
|
||||||
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
|
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
|
||||||
int len);
|
int len);
|
||||||
@ -2102,6 +2123,24 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) {
|
|||||||
return proc;
|
return proc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
|
||||||
|
int *bus_port_ptr)
|
||||||
|
{
|
||||||
|
char *c = strrchr(addr, '@');
|
||||||
|
if (c != NULL) {
|
||||||
|
*c = '\0';
|
||||||
|
if (bus_port_ptr != NULL)
|
||||||
|
*bus_port_ptr = atoi(c + 1);
|
||||||
|
}
|
||||||
|
c = strrchr(addr, ':');
|
||||||
|
if (c != NULL) {
|
||||||
|
*c = '\0';
|
||||||
|
*ip_ptr = addr;
|
||||||
|
*port_ptr = atoi(++c);
|
||||||
|
} else return 0;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Get host ip and port from command arguments. If only one argument has
|
/* Get host ip and port from command arguments. If only one argument has
|
||||||
* been provided it must be in the form of 'ip:port', elsewhere
|
* been provided it must be in the form of 'ip:port', elsewhere
|
||||||
* the first argument must be the ip and the second one the port.
|
* the first argument must be the ip and the second one the port.
|
||||||
@ -2114,14 +2153,7 @@ static int getClusterHostFromCmdArgs(int argc, char **argv,
|
|||||||
char *ip = NULL;
|
char *ip = NULL;
|
||||||
if (argc == 1) {
|
if (argc == 1) {
|
||||||
char *addr = argv[0];
|
char *addr = argv[0];
|
||||||
char *c = strrchr(addr, '@');
|
if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) return 0;
|
||||||
if (c != NULL) *c = '\0';
|
|
||||||
c = strrchr(addr, ':');
|
|
||||||
if (c != NULL) {
|
|
||||||
*c = '\0';
|
|
||||||
ip = addr;
|
|
||||||
port = atoi(++c);
|
|
||||||
} else return 0;
|
|
||||||
} else {
|
} else {
|
||||||
ip = argv[0];
|
ip = argv[0];
|
||||||
port = atoi(argv[1]);
|
port = atoi(argv[1]);
|
||||||
@ -3381,10 +3413,55 @@ cleanup:
|
|||||||
/* Wait until the cluster configuration is consistent. */
|
/* Wait until the cluster configuration is consistent. */
|
||||||
static void clusterManagerWaitForClusterJoin(void) {
|
static void clusterManagerWaitForClusterJoin(void) {
|
||||||
printf("Waiting for the cluster to join\n");
|
printf("Waiting for the cluster to join\n");
|
||||||
|
int counter = 0,
|
||||||
|
check_after = CLUSTER_JOIN_CHECK_AFTER +
|
||||||
|
(int)(listLength(cluster_manager.nodes) * 0.15f);
|
||||||
while(!clusterManagerIsConfigConsistent()) {
|
while(!clusterManagerIsConfigConsistent()) {
|
||||||
printf(".");
|
printf(".");
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
sleep(1);
|
sleep(1);
|
||||||
|
if (++counter > check_after) {
|
||||||
|
dict *status = clusterManagerGetLinkStatus();
|
||||||
|
dictIterator *iter = NULL;
|
||||||
|
if (status != NULL && dictSize(status) > 0) {
|
||||||
|
printf("\n");
|
||||||
|
clusterManagerLogErr("Warning: %d node(s) may "
|
||||||
|
"be unreachable\n", dictSize(status));
|
||||||
|
iter = dictGetIterator(status);
|
||||||
|
dictEntry *entry;
|
||||||
|
while ((entry = dictNext(iter)) != NULL) {
|
||||||
|
sds nodeaddr = (sds) dictGetKey(entry);
|
||||||
|
char *node_ip = NULL;
|
||||||
|
int node_port = 0, node_bus_port = 0;
|
||||||
|
list *from = (list *) dictGetVal(entry);
|
||||||
|
if (parseClusterNodeAddress(nodeaddr, &node_ip,
|
||||||
|
&node_port, &node_bus_port) && node_bus_port) {
|
||||||
|
clusterManagerLogErr(" - The port %d of node %s may "
|
||||||
|
"be unreachable from:\n",
|
||||||
|
node_bus_port, node_ip);
|
||||||
|
} else {
|
||||||
|
clusterManagerLogErr(" - Node %s may be unreachable "
|
||||||
|
"from:\n", nodeaddr);
|
||||||
|
}
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(from, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
sds from_addr = ln->value;
|
||||||
|
clusterManagerLogErr(" %s\n", from_addr);
|
||||||
|
sdsfree(from_addr);
|
||||||
|
}
|
||||||
|
clusterManagerLogErr("Cluster bus ports must be reachable "
|
||||||
|
"by every node.\nRemember that "
|
||||||
|
"cluster bus ports are different "
|
||||||
|
"from standard instance ports.\n");
|
||||||
|
listEmpty(from);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (iter != NULL) dictReleaseIterator(iter);
|
||||||
|
if (status != NULL) dictRelease(status);
|
||||||
|
counter = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
printf("\n");
|
printf("\n");
|
||||||
}
|
}
|
||||||
@ -3788,6 +3865,88 @@ static int clusterManagerIsConfigConsistent(void) {
|
|||||||
return consistent;
|
return consistent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) {
|
||||||
|
list *links = NULL;
|
||||||
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
|
||||||
|
if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup;
|
||||||
|
links = listCreate();
|
||||||
|
char *lines = reply->str, *p, *line;
|
||||||
|
while ((p = strstr(lines, "\n")) != NULL) {
|
||||||
|
int i = 0;
|
||||||
|
*p = '\0';
|
||||||
|
line = lines;
|
||||||
|
lines = p + 1;
|
||||||
|
char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL;
|
||||||
|
while ((p = strchr(line, ' ')) != NULL) {
|
||||||
|
*p = '\0';
|
||||||
|
char *token = line;
|
||||||
|
line = p + 1;
|
||||||
|
if (i == 0) nodename = token;
|
||||||
|
else if (i == 1) addr = token;
|
||||||
|
else if (i == 2) flags = token;
|
||||||
|
else if (i == 7) link_status = token;
|
||||||
|
else if (i == 8) break;
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
if (i == 7) link_status = line;
|
||||||
|
if (nodename == NULL || addr == NULL || flags == NULL ||
|
||||||
|
link_status == NULL) continue;
|
||||||
|
if (strstr(flags, "myself") != NULL) continue;
|
||||||
|
int disconnected = ((strstr(flags, "disconnected") != NULL) ||
|
||||||
|
(strstr(link_status, "disconnected")));
|
||||||
|
int handshaking = (strstr(flags, "handshake") != NULL);
|
||||||
|
if (disconnected || handshaking) {
|
||||||
|
clusterManagerLink *link = zmalloc(sizeof(*link));
|
||||||
|
link->node_name = sdsnew(nodename);
|
||||||
|
link->node_addr = sdsnew(addr);
|
||||||
|
link->connected = 0;
|
||||||
|
link->handshaking = handshaking;
|
||||||
|
listAddNodeTail(links, link);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
if (reply != NULL) freeReplyObject(reply);
|
||||||
|
return links;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check for disconnected cluster links. It returns a dict whose keys
|
||||||
|
* are the unreachable node addresses and the values are lists of
|
||||||
|
* node addresses that cannot reach the unreachable node. */
|
||||||
|
static dict *clusterManagerGetLinkStatus(void) {
|
||||||
|
if (cluster_manager.nodes == NULL) return NULL;
|
||||||
|
dict *status = dictCreate(&clusterManagerLinkDictType, NULL);
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(cluster_manager.nodes, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterManagerNode *node = ln->value;
|
||||||
|
list *links = clusterManagerGetDisconnectedLinks(node);
|
||||||
|
if (links) {
|
||||||
|
listIter lli;
|
||||||
|
listNode *lln;
|
||||||
|
listRewind(links, &lli);
|
||||||
|
while ((lln = listNext(&lli)) != NULL) {
|
||||||
|
clusterManagerLink *link = lln->value;
|
||||||
|
list *from = NULL;
|
||||||
|
dictEntry *entry = dictFind(status, link->node_addr);
|
||||||
|
if (entry) from = dictGetVal(entry);
|
||||||
|
else {
|
||||||
|
from = listCreate();
|
||||||
|
dictAdd(status, sdsdup(link->node_addr), from);
|
||||||
|
}
|
||||||
|
sds myaddr = sdsempty();
|
||||||
|
myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port);
|
||||||
|
listAddNodeTail(from, myaddr);
|
||||||
|
sdsfree(link->node_name);
|
||||||
|
sdsfree(link->node_addr);
|
||||||
|
zfree(link);
|
||||||
|
}
|
||||||
|
listRelease(links);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
/* Add the error string to cluster_manager.errors and print it. */
|
/* Add the error string to cluster_manager.errors and print it. */
|
||||||
static void clusterManagerOnError(sds err) {
|
static void clusterManagerOnError(sds err) {
|
||||||
if (cluster_manager.errors == NULL)
|
if (cluster_manager.errors == NULL)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user