diff --git a/src/redis-cli.c b/src/redis-cli.c index 97d90837..d9797284 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -130,6 +130,8 @@ #define CLUSTER_MANAGER_LOG_LVL_ERR 3 #define CLUSTER_MANAGER_LOG_LVL_SUCCESS 4 +#define CLUSTER_JOIN_CHECK_AFTER 20 + #define LOG_COLOR_BOLD "29;1m" #define LOG_COLOR_RED "31;1m" #define LOG_COLOR_GREEN "32;1m" @@ -1974,6 +1976,15 @@ typedef struct clusterManagerReshardTableItem { int slot; } clusterManagerReshardTableItem; +/* Info about a cluster internal link. */ + +typedef struct clusterManagerLink { + sds node_name; + sds node_addr; + int connected; + int handshaking; +} clusterManagerLink; + static dictType clusterManagerDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ @@ -1983,6 +1994,15 @@ static dictType clusterManagerDictType = { dictSdsDestructor /* val destructor */ }; +static dictType clusterManagerLinkDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictListDestructor /* val destructor */ +}; + typedef int clusterManagerCommandProc(int argc, char **argv); typedef int (*clusterManagerOnReplyError)(redisReply *reply, clusterManagerNode *n, int bulk_idx); @@ -2012,6 +2032,7 @@ static void clusterManagerWaitForClusterJoin(void); static int clusterManagerCheckCluster(int quiet); static void clusterManagerLog(int level, const char* fmt, ...); static int clusterManagerIsConfigConsistent(void); +static dict *clusterManagerGetLinkStatus(void); static void clusterManagerOnError(sds err); static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, int len); @@ -2102,6 +2123,24 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) { return proc; } +static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr, + int *bus_port_ptr) +{ + char *c = strrchr(addr, '@'); + if (c != NULL) { + *c = '\0'; + if (bus_port_ptr != NULL) + *bus_port_ptr = atoi(c + 1); + } + c = strrchr(addr, ':'); + if (c != NULL) { + *c = '\0'; + *ip_ptr = addr; + *port_ptr = atoi(++c); + } else return 0; + return 1; +} + /* Get host ip and port from command arguments. If only one argument has * been provided it must be in the form of 'ip:port', elsewhere * the first argument must be the ip and the second one the port. @@ -2114,14 +2153,7 @@ static int getClusterHostFromCmdArgs(int argc, char **argv, char *ip = NULL; if (argc == 1) { char *addr = argv[0]; - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c != NULL) { - *c = '\0'; - ip = addr; - port = atoi(++c); - } else return 0; + if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) return 0; } else { ip = argv[0]; port = atoi(argv[1]); @@ -3381,10 +3413,55 @@ cleanup: /* Wait until the cluster configuration is consistent. */ static void clusterManagerWaitForClusterJoin(void) { printf("Waiting for the cluster to join\n"); + int counter = 0, + check_after = CLUSTER_JOIN_CHECK_AFTER + + (int)(listLength(cluster_manager.nodes) * 0.15f); while(!clusterManagerIsConfigConsistent()) { printf("."); fflush(stdout); sleep(1); + if (++counter > check_after) { + dict *status = clusterManagerGetLinkStatus(); + dictIterator *iter = NULL; + if (status != NULL && dictSize(status) > 0) { + printf("\n"); + clusterManagerLogErr("Warning: %d node(s) may " + "be unreachable\n", dictSize(status)); + iter = dictGetIterator(status); + dictEntry *entry; + while ((entry = dictNext(iter)) != NULL) { + sds nodeaddr = (sds) dictGetKey(entry); + char *node_ip = NULL; + int node_port = 0, node_bus_port = 0; + list *from = (list *) dictGetVal(entry); + if (parseClusterNodeAddress(nodeaddr, &node_ip, + &node_port, &node_bus_port) && node_bus_port) { + clusterManagerLogErr(" - The port %d of node %s may " + "be unreachable from:\n", + node_bus_port, node_ip); + } else { + clusterManagerLogErr(" - Node %s may be unreachable " + "from:\n", nodeaddr); + } + listIter li; + listNode *ln; + listRewind(from, &li); + while ((ln = listNext(&li)) != NULL) { + sds from_addr = ln->value; + clusterManagerLogErr(" %s\n", from_addr); + sdsfree(from_addr); + } + clusterManagerLogErr("Cluster bus ports must be reachable " + "by every node.\nRemember that " + "cluster bus ports are different " + "from standard instance ports.\n"); + listEmpty(from); + } + } + if (iter != NULL) dictReleaseIterator(iter); + if (status != NULL) dictRelease(status); + counter = 0; + } } printf("\n"); } @@ -3788,6 +3865,88 @@ static int clusterManagerIsConfigConsistent(void) { return consistent; } +static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { + list *links = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); + if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup; + links = listCreate(); + char *lines = reply->str, *p, *line; + while ((p = strstr(lines, "\n")) != NULL) { + int i = 0; + *p = '\0'; + line = lines; + lines = p + 1; + char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL; + while ((p = strchr(line, ' ')) != NULL) { + *p = '\0'; + char *token = line; + line = p + 1; + if (i == 0) nodename = token; + else if (i == 1) addr = token; + else if (i == 2) flags = token; + else if (i == 7) link_status = token; + else if (i == 8) break; + i++; + } + if (i == 7) link_status = line; + if (nodename == NULL || addr == NULL || flags == NULL || + link_status == NULL) continue; + if (strstr(flags, "myself") != NULL) continue; + int disconnected = ((strstr(flags, "disconnected") != NULL) || + (strstr(link_status, "disconnected"))); + int handshaking = (strstr(flags, "handshake") != NULL); + if (disconnected || handshaking) { + clusterManagerLink *link = zmalloc(sizeof(*link)); + link->node_name = sdsnew(nodename); + link->node_addr = sdsnew(addr); + link->connected = 0; + link->handshaking = handshaking; + listAddNodeTail(links, link); + } + } +cleanup: + if (reply != NULL) freeReplyObject(reply); + return links; +} + +/* Check for disconnected cluster links. It returns a dict whose keys + * are the unreachable node addresses and the values are lists of + * node addresses that cannot reach the unreachable node. */ +static dict *clusterManagerGetLinkStatus(void) { + if (cluster_manager.nodes == NULL) return NULL; + dict *status = dictCreate(&clusterManagerLinkDictType, NULL); + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *node = ln->value; + list *links = clusterManagerGetDisconnectedLinks(node); + if (links) { + listIter lli; + listNode *lln; + listRewind(links, &lli); + while ((lln = listNext(&lli)) != NULL) { + clusterManagerLink *link = lln->value; + list *from = NULL; + dictEntry *entry = dictFind(status, link->node_addr); + if (entry) from = dictGetVal(entry); + else { + from = listCreate(); + dictAdd(status, sdsdup(link->node_addr), from); + } + sds myaddr = sdsempty(); + myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port); + listAddNodeTail(from, myaddr); + sdsfree(link->node_name); + sdsfree(link->node_addr); + zfree(link); + } + listRelease(links); + } + } + return status; +} + /* Add the error string to cluster_manager.errors and print it. */ static void clusterManagerOnError(sds err) { if (cluster_manager.errors == NULL)