mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 01:20:50 +00:00
Cluster Manager: check for unreachable nodes during cluster join.
This commit is contained in:
parent
9b0b0b3942
commit
2f499304aa
128
src/redis-cli.c
128
src/redis-cli.c
@ -1974,6 +1974,15 @@ typedef struct clusterManagerReshardTableItem {
|
||||
int slot;
|
||||
} clusterManagerReshardTableItem;
|
||||
|
||||
/* Info about a cluster internal link. */
|
||||
|
||||
typedef struct clusterManagerLink {
|
||||
sds node_name;
|
||||
sds node_addr;
|
||||
int connected;
|
||||
int handshaking;
|
||||
} clusterManagerLink;
|
||||
|
||||
static dictType clusterManagerDictType = {
|
||||
dictSdsHash, /* hash function */
|
||||
NULL, /* key dup */
|
||||
@ -2012,6 +2021,7 @@ static void clusterManagerWaitForClusterJoin(void);
|
||||
static int clusterManagerCheckCluster(int quiet);
|
||||
static void clusterManagerLog(int level, const char* fmt, ...);
|
||||
static int clusterManagerIsConfigConsistent(void);
|
||||
static dict *clusterManagerGetLinkStatus(void);
|
||||
static void clusterManagerOnError(sds err);
|
||||
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
|
||||
int len);
|
||||
@ -3381,10 +3391,43 @@ cleanup:
|
||||
/* Wait until the cluster configuration is consistent. */
|
||||
static void clusterManagerWaitForClusterJoin(void) {
|
||||
printf("Waiting for the cluster to join\n");
|
||||
int counter = 0, check_after = listLength(cluster_manager.nodes) * 2;
|
||||
while(!clusterManagerIsConfigConsistent()) {
|
||||
printf(".");
|
||||
fflush(stdout);
|
||||
sleep(1);
|
||||
if (++counter > check_after) {
|
||||
dict *status = clusterManagerGetLinkStatus();
|
||||
if (status != NULL && dictSize(status) > 0) {
|
||||
printf("\n");
|
||||
clusterManagerLogErr("Warning: %d nodes may "
|
||||
"be unreachable\n", dictSize(status));
|
||||
dictIterator *iter = dictGetIterator(status);
|
||||
dictEntry *entry;
|
||||
while ((entry = dictNext(iter)) != NULL) {
|
||||
sds nodename = (sds) dictGetKey(entry);
|
||||
list *from = (list *) dictGetVal(entry);
|
||||
clusterManagerLogErr(" - Node %s may be unreachable "
|
||||
"from:\n", nodename);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(from, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds from_addr = ln->value;
|
||||
clusterManagerLogErr(" %s\n", from_addr);
|
||||
sdsfree(from_addr);
|
||||
}
|
||||
clusterManagerLogErr("Cluster bus ports must be reachable "
|
||||
"by every node.\nRemember that "
|
||||
"cluster bus ports are different "
|
||||
"from standard instance port.\n");
|
||||
listEmpty(from);
|
||||
}
|
||||
dictReleaseIterator(iter);
|
||||
dictRelease(status);
|
||||
}
|
||||
counter = 0;
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
@ -3788,6 +3831,91 @@ static int clusterManagerIsConfigConsistent(void) {
|
||||
return consistent;
|
||||
}
|
||||
|
||||
static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) {
|
||||
list *links = NULL;
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
|
||||
if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup;
|
||||
links = listCreate();
|
||||
char *lines = reply->str, *p, *line;
|
||||
while ((p = strstr(lines, "\n")) != NULL) {
|
||||
int i = 0;
|
||||
*p = '\0';
|
||||
line = lines;
|
||||
lines = p + 1;
|
||||
char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL;
|
||||
while ((p = strchr(line, ' ')) != NULL) {
|
||||
*p = '\0';
|
||||
char *token = line;
|
||||
line = p + 1;
|
||||
if (i == 0) nodename = token;
|
||||
else if (i == 1) addr = token;
|
||||
else if (i == 2) flags = token;
|
||||
else if (i == 7) link_status = token;
|
||||
else if (i == 8) break;
|
||||
i++;
|
||||
}
|
||||
if (i == 7) link_status = line;
|
||||
|
||||
if (nodename == NULL || addr == NULL || flags == NULL ||
|
||||
link_status == NULL)
|
||||
continue;
|
||||
if (strstr(flags, "myself") != NULL) continue;
|
||||
int disconnected = ((strstr(flags, "disconnected") != NULL) ||
|
||||
(strstr(link_status, "disconnected")));
|
||||
if (disconnected) {
|
||||
clusterManagerLink *link = malloc(sizeof(*link));
|
||||
link->node_name = sdsnew(nodename);
|
||||
link->node_addr = sdsnew(addr);
|
||||
link->connected = 0;
|
||||
link->handshaking = (strstr(flags, "handshaking") != NULL);
|
||||
listAddNodeTail(links, link);
|
||||
}
|
||||
}
|
||||
cleanup:
|
||||
if (reply != NULL) freeReplyObject(reply);
|
||||
return links;
|
||||
}
|
||||
|
||||
/* Check for disconnected cluster links. It returns a dict whose keys
|
||||
* are the unreachable node addresses and the values are lists of
|
||||
* node addresses that cannot reach the unreachable node. */
|
||||
static dict *clusterManagerGetLinkStatus(void) {
|
||||
if (cluster_manager.nodes == NULL) return NULL;
|
||||
dictType dtype = clusterManagerDictType;
|
||||
dtype.valDestructor = dictListDestructor;
|
||||
dict *status = dictCreate(&dtype, NULL);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *node = ln->value;
|
||||
list *links = clusterManagerGetDisconnectedLinks(node);
|
||||
if (links) {
|
||||
listIter lli;
|
||||
listNode *lln;
|
||||
listRewind(links, &lli);
|
||||
while ((lln = listNext(&lli)) != NULL) {
|
||||
clusterManagerLink *link = lln->value;
|
||||
list *from = NULL;
|
||||
dictEntry *entry = dictFind(status, link->node_addr);
|
||||
if (entry) from = dictGetVal(entry);
|
||||
else {
|
||||
from = listCreate();
|
||||
dictAdd(status, sdsdup(link->node_addr), from);
|
||||
}
|
||||
sds myaddr = sdsempty();
|
||||
myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port);
|
||||
listAddNodeTail(from, myaddr);
|
||||
sdsfree(link->node_name);
|
||||
sdsfree(link->node_addr);
|
||||
zfree(link);
|
||||
}
|
||||
listRelease(links);
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/* Add the error string to cluster_manager.errors and print it. */
|
||||
static void clusterManagerOnError(sds err) {
|
||||
if (cluster_manager.errors == NULL)
|
||||
|
Loading…
x
Reference in New Issue
Block a user