From d7374032c0d9d87d9169139b0182c49891504d7c Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 20 Dec 2013 12:37:18 +0100 Subject: [PATCH] Redis Cluster: handshake code refactoring + Gossip IP switch detection. This commit makes it simple to start an handshake with a specific node address, and uses this in order to detect a node IP change and start a new handshake in order to fix the IP if possible. --- src/cluster.c | 128 +++++++++++++++++++++++++++++++------------------- 1 file changed, 80 insertions(+), 48 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 090dbf98..019678d7 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -799,6 +799,64 @@ int clusterHandshakeInProgress(char *ip, int port) { return de != NULL; } +/* Start an handshake with the specified address if there is not one + * already in progress. Returns non-zero if the handshake was actually + * started. On error zero is returned and errno is set to one of the + * following values: + * + * EAGAIN - There is already an handshake in progress for this address. + * EINVAL - IP or port are not valid. */ +int clusterStartHandshake(char *ip, int port) { + clusterNode *n; + char norm_ip[REDIS_IP_STR_LEN]; + struct sockaddr_storage sa; + + /* IP sanity check */ + if (inet_pton(AF_INET,ip, + &(((struct sockaddr_in *)&sa)->sin_addr))) + { + sa.ss_family = AF_INET; + } else if (inet_pton(AF_INET6,ip, + &(((struct sockaddr_in6 *)&sa)->sin6_addr))) + { + sa.ss_family = AF_INET6; + } else { + errno = EINVAL; + return 0; + } + + /* Port sanity check */ + if (port <= 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) { + errno = EINVAL; + return 0; + } + + /* Set norm_ip as the normalized string representation of the node + * IP address. */ + if (sa.ss_family == AF_INET) + inet_ntop(AF_INET, + (void*)&(((struct sockaddr_in *)&sa)->sin_addr), + norm_ip,REDIS_CLUSTER_IPLEN); + else + inet_ntop(AF_INET6, + (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr), + norm_ip,REDIS_CLUSTER_IPLEN); + + if (clusterHandshakeInProgress(norm_ip,port)) { + errno = EAGAIN; + return 0; + } + + /* Add the node with a random address (NULL as first argument to + * createClusterNode()). Everything will be fixed during the + * handskake. */ + n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET); + memcpy(n->ip,norm_ip,sizeof(n->ip)); + n->port = port; + clusterAddNode(n); + return 1; +} + /* Process the gossip section of PING or PONG packets. * Note that this function assumes that the packet is already sanity-checked * by the caller, not in the content of the gossip section, but in the @@ -832,7 +890,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { /* Update our state accordingly to the gossip sections */ node = clusterLookupNode(g->nodename); - if (node != NULL) { + if (node) { /* We already know this node. Handle failure reports, only when the sender is a master. */ if (sender && sender->flags & REDIS_NODE_MASTER && @@ -853,6 +911,17 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { } } } + + /* If we already know this node, but it is not reachable, and + * we see a different address in the gossip section, start an + * handshake with the (possibly) new address: this will result + * into a node address update if the handshake will be + * successful. */ + if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) && + (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port))) + { + clusterStartHandshake(g->ip,ntohs(g->port)); + } } else { /* If it's not in NOADDR state and we don't have it, we * start a handshake process against this IP/PORT pairs. @@ -860,17 +929,8 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { * Note that we require that the sender of this gossip message * is a well known node in our cluster, otherwise we risk * joining another cluster. */ - if (sender && !(flags & REDIS_NODE_NOADDR) && - !clusterHandshakeInProgress(g->ip,ntohs(g->port))) - { - clusterNode *newnode; - - redisLog(REDIS_DEBUG,"Adding the new node"); - newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE); - memcpy(newnode->ip,g->ip,sizeof(g->ip)); - newnode->port = ntohs(g->port); - clusterAddNode(newnode); - } + if (sender && !(flags & REDIS_NODE_NOADDR)) + clusterStartHandshake(g->ip,ntohs(g->port)); } /* Next node */ @@ -2492,48 +2552,20 @@ void clusterCommand(redisClient *c) { } if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) { - /* CLUSTER MEET */ - clusterNode *n; - struct sockaddr_storage sa; long port; - /* Perform sanity checks on IP/port */ - if (inet_pton(AF_INET,c->argv[2]->ptr, - &(((struct sockaddr_in *)&sa)->sin_addr))) - { - sa.ss_family = AF_INET; - } else if (inet_pton(AF_INET6,c->argv[2]->ptr, - &(((struct sockaddr_in6 *)&sa)->sin6_addr))) - { - sa.ss_family = AF_INET6; - } else { - addReplyError(c,"Invalid IP address in MEET"); - return; - } - if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK || - port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) - { + if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK) { addReplyError(c,"Invalid TCP port specified"); return; } - /* Finally add the node to the cluster with a random name, this - * will get fixed in the first handshake (ping/pong). */ - n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET); - - /* Set node->ip as the normalized string representation of the node - * IP address. */ - if (sa.ss_family == AF_INET) - inet_ntop(AF_INET, - (void*)&(((struct sockaddr_in *)&sa)->sin_addr), - n->ip,REDIS_CLUSTER_IPLEN); - else - inet_ntop(AF_INET6, - (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr), - n->ip,REDIS_CLUSTER_IPLEN); - n->port = port; - clusterAddNode(n); - addReply(c,shared.ok); + if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && + errno == EINVAL) + { + addReplyError(c,"Invalid node address specified"); + } else { + addReply(c,shared.ok); + } } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { /* CLUSTER NODES */ robj *o;