From 2f499304aafd865e40f121389834a871d4f441cc Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 23 May 2018 18:00:42 +0200 Subject: [PATCH 1/8] Cluster Manager: check for unreachable nodes during cluster join. --- src/redis-cli.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index 97d90837..1b635563 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1974,6 +1974,15 @@ typedef struct clusterManagerReshardTableItem { int slot; } clusterManagerReshardTableItem; +/* Info about a cluster internal link. */ + +typedef struct clusterManagerLink { + sds node_name; + sds node_addr; + int connected; + int handshaking; +} clusterManagerLink; + static dictType clusterManagerDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ @@ -2012,6 +2021,7 @@ static void clusterManagerWaitForClusterJoin(void); static int clusterManagerCheckCluster(int quiet); static void clusterManagerLog(int level, const char* fmt, ...); static int clusterManagerIsConfigConsistent(void); +static dict *clusterManagerGetLinkStatus(void); static void clusterManagerOnError(sds err); static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, int len); @@ -3381,10 +3391,43 @@ cleanup: /* Wait until the cluster configuration is consistent. */ static void clusterManagerWaitForClusterJoin(void) { printf("Waiting for the cluster to join\n"); + int counter = 0, check_after = listLength(cluster_manager.nodes) * 2; while(!clusterManagerIsConfigConsistent()) { printf("."); fflush(stdout); sleep(1); + if (++counter > check_after) { + dict *status = clusterManagerGetLinkStatus(); + if (status != NULL && dictSize(status) > 0) { + printf("\n"); + clusterManagerLogErr("Warning: %d nodes may " + "be unreachable\n", dictSize(status)); + dictIterator *iter = dictGetIterator(status); + dictEntry *entry; + while ((entry = dictNext(iter)) != NULL) { + sds nodename = (sds) dictGetKey(entry); + list *from = (list *) dictGetVal(entry); + clusterManagerLogErr(" - Node %s may be unreachable " + "from:\n", nodename); + listIter li; + listNode *ln; + listRewind(from, &li); + while ((ln = listNext(&li)) != NULL) { + sds from_addr = ln->value; + clusterManagerLogErr(" %s\n", from_addr); + sdsfree(from_addr); + } + clusterManagerLogErr("Cluster bus ports must be reachable " + "by every node.\nRemember that " + "cluster bus ports are different " + "from standard instance port.\n"); + listEmpty(from); + } + dictReleaseIterator(iter); + dictRelease(status); + } + counter = 0; + } } printf("\n"); } @@ -3788,6 +3831,91 @@ static int clusterManagerIsConfigConsistent(void) { return consistent; } +static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { + list *links = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); + if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup; + links = listCreate(); + char *lines = reply->str, *p, *line; + while ((p = strstr(lines, "\n")) != NULL) { + int i = 0; + *p = '\0'; + line = lines; + lines = p + 1; + char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL; + while ((p = strchr(line, ' ')) != NULL) { + *p = '\0'; + char *token = line; + line = p + 1; + if (i == 0) nodename = token; + else if (i == 1) addr = token; + else if (i == 2) flags = token; + else if (i == 7) link_status = token; + else if (i == 8) break; + i++; + } + if (i == 7) link_status = line; + + if (nodename == NULL || addr == NULL || flags == NULL || + link_status == NULL) + continue; + if (strstr(flags, "myself") != NULL) continue; + int disconnected = ((strstr(flags, "disconnected") != NULL) || + (strstr(link_status, "disconnected"))); + if (disconnected) { + clusterManagerLink *link = malloc(sizeof(*link)); + link->node_name = sdsnew(nodename); + link->node_addr = sdsnew(addr); + link->connected = 0; + link->handshaking = (strstr(flags, "handshaking") != NULL); + listAddNodeTail(links, link); + } + } +cleanup: + if (reply != NULL) freeReplyObject(reply); + return links; +} + +/* Check for disconnected cluster links. It returns a dict whose keys + * are the unreachable node addresses and the values are lists of + * node addresses that cannot reach the unreachable node. */ +static dict *clusterManagerGetLinkStatus(void) { + if (cluster_manager.nodes == NULL) return NULL; + dictType dtype = clusterManagerDictType; + dtype.valDestructor = dictListDestructor; + dict *status = dictCreate(&dtype, NULL); + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *node = ln->value; + list *links = clusterManagerGetDisconnectedLinks(node); + if (links) { + listIter lli; + listNode *lln; + listRewind(links, &lli); + while ((lln = listNext(&lli)) != NULL) { + clusterManagerLink *link = lln->value; + list *from = NULL; + dictEntry *entry = dictFind(status, link->node_addr); + if (entry) from = dictGetVal(entry); + else { + from = listCreate(); + dictAdd(status, sdsdup(link->node_addr), from); + } + sds myaddr = sdsempty(); + myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port); + listAddNodeTail(from, myaddr); + sdsfree(link->node_name); + sdsfree(link->node_addr); + zfree(link); + } + listRelease(links); + } + } + return status; +} + /* Add the error string to cluster_manager.errors and print it. */ static void clusterManagerOnError(sds err) { if (cluster_manager.errors == NULL) From 3578aabc0528e087b80e8b69378abd50b1e1633a Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 6 Jun 2018 17:28:29 +0200 Subject: [PATCH 2/8] Cluster Manager: improve join issue checking --- src/redis-cli.c | 55 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 1b635563..9ff4dfb9 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -130,6 +130,8 @@ #define CLUSTER_MANAGER_LOG_LVL_ERR 3 #define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4 +#define CLUSTER_JOIN_CHECK_AFTER 20 + #define LOG_COLOR_BOLD "29;1m" #define LOG_COLOR_RED "31;1m" #define LOG_COLOR_GREEN "32;1m" @@ -2112,6 +2114,24 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) { return proc; } +static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr, + int *bus_port_ptr) +{ + char *c = strrchr(addr, '@'); + if (c != NULL) { + *c = '\0'; + if (bus_port_ptr != NULL) + *bus_port_ptr = atoi(c + 1); + } + c = strrchr(addr, ':'); + if (c != NULL) { + *c = '\0'; + *ip_ptr = addr; + *port_ptr = atoi(++c); + } else return 0; + return 1; +} + /* Get host ip and port from command arguments. If only one argument has * been provided it must be in the form of 'ip:port', elsewhere * the first argument must be the ip and the second one the port. @@ -2124,14 +2144,7 @@ static int getClusterHostFromCmdArgs(int argc, char **argv, char *ip = NULL; if (argc == 1) { char *addr = argv[0]; - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c != NULL) { - *c = '\0'; - ip = addr; - port = atoi(++c); - } else return 0; + if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) return 0; } else { ip = argv[0]; port = atoi(argv[1]); @@ -3391,7 +3404,9 @@ cleanup: /* Wait until the cluster configuration is consistent. */ static void clusterManagerWaitForClusterJoin(void) { printf("Waiting for the cluster to join\n"); - int counter = 0, check_after = listLength(cluster_manager.nodes) * 2; + int counter = 0, + check_after = CLUSTER_JOIN_CHECK_AFTER + + (int)(listLength(cluster_manager.nodes) * 0.15f); while(!clusterManagerIsConfigConsistent()) { printf("."); fflush(stdout); @@ -3400,15 +3415,24 @@ static void clusterManagerWaitForClusterJoin(void) { dict *status = clusterManagerGetLinkStatus(); if (status != NULL && dictSize(status) > 0) { printf("\n"); - clusterManagerLogErr("Warning: %d nodes may " + clusterManagerLogErr("Warning: %d node(s) may " "be unreachable\n", dictSize(status)); dictIterator *iter = dictGetIterator(status); dictEntry *entry; while ((entry = dictNext(iter)) != NULL) { - sds nodename = (sds) dictGetKey(entry); + sds nodeaddr = (sds) dictGetKey(entry); + char *node_ip = NULL; + int node_port = 0, node_bus_port = 0; list *from = (list *) dictGetVal(entry); - clusterManagerLogErr(" - Node %s may be unreachable " - "from:\n", nodename); + if (parseClusterNodeAddress(nodeaddr, &node_ip, + &node_port, &node_bus_port) && node_bus_port) { + clusterManagerLogErr(" - The port %d of node %s may " + "be unreachable from:\n", + node_bus_port, node_ip); + } else { + clusterManagerLogErr(" - Node %s may be unreachable " + "from:\n", nodeaddr); + } listIter li; listNode *ln; listRewind(from, &li); @@ -3862,12 +3886,13 @@ static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { if (strstr(flags, "myself") != NULL) continue; int disconnected = ((strstr(flags, "disconnected") != NULL) || (strstr(link_status, "disconnected"))); - if (disconnected) { + int handshaking = (strstr(flags, "handshake") != NULL); + if (disconnected || handshaking) { clusterManagerLink *link = malloc(sizeof(*link)); link->node_name = sdsnew(nodename); link->node_addr = sdsnew(addr); link->connected = 0; - link->handshaking = (strstr(flags, "handshaking") != NULL); + link->handshaking = handshaking; listAddNodeTail(links, link); } } From b013d2c4dbea97544239deebe7a8f5a19f2b5604 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 6 Jun 2018 18:45:31 +0200 Subject: [PATCH 3/8] Cluster Manager: fix memory leak in clusterManagerWaitForClusterJoin --- src/redis-cli.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 9ff4dfb9..9e246dce 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3413,11 +3413,12 @@ static void clusterManagerWaitForClusterJoin(void) { sleep(1); if (++counter > check_after) { dict *status = clusterManagerGetLinkStatus(); + dictIterator *iter = NULL; if (status != NULL && dictSize(status) > 0) { printf("\n"); clusterManagerLogErr("Warning: %d node(s) may " "be unreachable\n", dictSize(status)); - dictIterator *iter = dictGetIterator(status); + iter = dictGetIterator(status); dictEntry *entry; while ((entry = dictNext(iter)) != NULL) { sds nodeaddr = (sds) dictGetKey(entry); @@ -3447,11 +3448,11 @@ static void clusterManagerWaitForClusterJoin(void) { "from standard instance port.\n"); listEmpty(from); } - dictReleaseIterator(iter); - dictRelease(status); } + if (iter != NULL) dictReleaseIterator(iter); + if (status != NULL) dictRelease(status); counter = 0; - } + } } printf("\n"); } From 121adc604b4cee16fd5af15456bbb7665f37ce71 Mon Sep 17 00:00:00 2001 From: Artix Date: Wed, 6 Jun 2018 20:14:58 +0200 Subject: [PATCH 4/8] Cluster Manager: fix memory leaks in clusterManagerGetDisconnectedLinks --- src/redis-cli.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 9e246dce..71c8c6b3 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3889,7 +3889,7 @@ static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { (strstr(link_status, "disconnected"))); int handshaking = (strstr(flags, "handshake") != NULL); if (disconnected || handshaking) { - clusterManagerLink *link = malloc(sizeof(*link)); + clusterManagerLink *link = zmalloc(sizeof(*link)); link->node_name = sdsnew(nodename); link->node_addr = sdsnew(addr); link->connected = 0; @@ -3908,6 +3908,7 @@ cleanup: static dict *clusterManagerGetLinkStatus(void) { if (cluster_manager.nodes == NULL) return NULL; dictType dtype = clusterManagerDictType; + dtype.keyDestructor = dictSdsDestructor; dtype.valDestructor = dictListDestructor; dict *status = dictCreate(&dtype, NULL); listIter li; From 6e9864fe0de08f6297b1b89fd143605e6ff99ec2 Mon Sep 17 00:00:00 2001 From: artix Date: Wed, 9 Jan 2019 18:49:38 +0100 Subject: [PATCH 5/8] Cluster Manager: fix bus error in clusterManagerGetLinkStatus --- src/redis-cli.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 71c8c6b3..b17bd9ea 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1994,6 +1994,15 @@ static dictType clusterManagerDictType = { dictSdsDestructor /* val destructor */ }; +static dictType clusterManagerLinkDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictListDestructor /* val destructor */ +}; + typedef int clusterManagerCommandProc(int argc, char **argv); typedef int (*clusterManagerOnReplyError)(redisReply *reply, clusterManagerNode *n, int bulk_idx); @@ -3889,7 +3898,7 @@ static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { (strstr(link_status, "disconnected"))); int handshaking = (strstr(flags, "handshake") != NULL); if (disconnected || handshaking) { - clusterManagerLink *link = zmalloc(sizeof(*link)); + clusterManagerLink *link = zmalloc(sizeof(*link)); link->node_name = sdsnew(nodename); link->node_addr = sdsnew(addr); link->connected = 0; @@ -3902,15 +3911,12 @@ cleanup: return links; } -/* Check for disconnected cluster links. It returns a dict whose keys - * are the unreachable node addresses and the values are lists of +/* Check for disconnected cluster links. It returns a dict whose keys + * are the unreachable node addresses and the values are lists of * node addresses that cannot reach the unreachable node. */ static dict *clusterManagerGetLinkStatus(void) { if (cluster_manager.nodes == NULL) return NULL; - dictType dtype = clusterManagerDictType; - dtype.keyDestructor = dictSdsDestructor; - dtype.valDestructor = dictListDestructor; - dict *status = dictCreate(&dtype, NULL); + dict *status = dictCreate(&clusterManagerLinkDictType, NULL); listIter li; listNode *ln; listRewind(cluster_manager.nodes, &li); From 2593fb9c0cd55cff13713ec5366a9cdc91d8a052 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 17 Jan 2019 18:22:30 +0100 Subject: [PATCH 6/8] Cluster Manager: code cleanup --- src/redis-cli.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index b17bd9ea..3753b34f 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3413,8 +3413,8 @@ cleanup: /* Wait until the cluster configuration is consistent. */ static void clusterManagerWaitForClusterJoin(void) { printf("Waiting for the cluster to join\n"); - int counter = 0, - check_after = CLUSTER_JOIN_CHECK_AFTER + + int counter = 0, + check_after = CLUSTER_JOIN_CHECK_AFTER + (int)(listLength(cluster_manager.nodes) * 0.15f); while(!clusterManagerIsConfigConsistent()) { printf("."); @@ -3434,11 +3434,11 @@ static void clusterManagerWaitForClusterJoin(void) { char *node_ip = NULL; int node_port = 0, node_bus_port = 0; list *from = (list *) dictGetVal(entry); - if (parseClusterNodeAddress(nodeaddr, &node_ip, + if (parseClusterNodeAddress(nodeaddr, &node_ip, &node_port, &node_bus_port) && node_bus_port) { clusterManagerLogErr(" - The port %d of node %s may " - "be unreachable from:\n", - node_bus_port, node_ip); + "be unreachable from:\n", + node_bus_port, node_ip); } else { clusterManagerLogErr(" - Node %s may be unreachable " "from:\n", nodeaddr); @@ -3873,12 +3873,12 @@ static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { char *lines = reply->str, *p, *line; while ((p = strstr(lines, "\n")) != NULL) { int i = 0; - *p = '\0'; + *p = '\0'; line = lines; lines = p + 1; char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL; while ((p = strchr(line, ' ')) != NULL) { - *p = '\0'; + *p = '\0'; char *token = line; line = p + 1; if (i == 0) nodename = token; @@ -3889,12 +3889,10 @@ static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { i++; } if (i == 7) link_status = line; - - if (nodename == NULL || addr == NULL || flags == NULL || - link_status == NULL) - continue; + if (nodename == NULL || addr == NULL || flags == NULL || + link_status == NULL) continue; if (strstr(flags, "myself") != NULL) continue; - int disconnected = ((strstr(flags, "disconnected") != NULL) || + int disconnected = ((strstr(flags, "disconnected") != NULL) || (strstr(link_status, "disconnected"))); int handshaking = (strstr(flags, "handshake") != NULL); if (disconnected || handshaking) { From 23ad3faa50e0f3f91770735a8e0552bfc185af1e Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 26 Feb 2019 17:26:31 +0100 Subject: [PATCH 7/8] Cluster Manager: change text alert clusterManagerWaitForClusterJoin --- src/redis-cli.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 3753b34f..264abe5d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3437,11 +3437,11 @@ static void clusterManagerWaitForClusterJoin(void) { if (parseClusterNodeAddress(nodeaddr, &node_ip, &node_port, &node_bus_port) && node_bus_port) { clusterManagerLogErr(" - The port %d of node %s may " - "be unreachable from:\n", + "be unreachable by:\n", node_bus_port, node_ip); } else { clusterManagerLogErr(" - Node %s may be unreachable " - "from:\n", nodeaddr); + "by:\n", nodeaddr); } listIter li; listNode *ln; @@ -3454,7 +3454,7 @@ static void clusterManagerWaitForClusterJoin(void) { clusterManagerLogErr("Cluster bus ports must be reachable " "by every node.\nRemember that " "cluster bus ports are different " - "from standard instance port.\n"); + "from standard instance ports.\n"); listEmpty(from); } } From 6cd64c6a986cb77c8273f1eee95212ade036af41 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 28 Feb 2019 16:57:14 +0100 Subject: [PATCH 8/8] Cluster Manager: change join issue message --- src/redis-cli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 264abe5d..d9797284 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -3437,11 +3437,11 @@ static void clusterManagerWaitForClusterJoin(void) { if (parseClusterNodeAddress(nodeaddr, &node_ip, &node_port, &node_bus_port) && node_bus_port) { clusterManagerLogErr(" - The port %d of node %s may " - "be unreachable by:\n", + "be unreachable from:\n", node_bus_port, node_ip); } else { clusterManagerLogErr(" - Node %s may be unreachable " - "by:\n", nodeaddr); + "from:\n", nodeaddr); } listIter li; listNode *ln;