Cluster: always add PFAIL nodes at end of gossip section.

To rely on the fact that nodes in PFAIL state will be shared around by
randomly adding them in the gossip section is a weak assumption,
especially after changes related to sending less ping/pong packets.

We want to always include gossip entries for all the nodes that are in
PFAIL state, so that the PFAIL -> FAIL state promotion can happen much
faster and reliably.

Related to #3929.
This commit is contained in:
antirez 2017-04-14 13:39:49 +02:00
parent 8c829d9e43
commit 02777bb252
2 changed files with 71 additions and 23 deletions

View File

@ -427,6 +427,7 @@ void clusterInit(void) {
server.cluster->stats_bus_messages_sent[i] = 0; server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0; server.cluster->stats_bus_messages_received[i] = 0;
} }
server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots)); memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); clusterCloseAllSlots();
@ -2254,6 +2255,33 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
} }
/* Return non zero if the node is already present in the gossip section of the
* message pointed by 'hdr' and having 'count' gossip entries. Otherwise
* zero is returned. Helper for clusterSendPing(). */
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
int j;
for (j = 0; j < count; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
CLUSTER_NAMELEN) == 0) break;
}
return j != count;
}
/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
* to the info of the specified node 'n'. */
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(n->ping_sent/1000);
gossip->pong_received = htonl(n->pong_received/1000);
memcpy(gossip->ip,n->ip,sizeof(n->ip));
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
/* Send a PING or PONG packet to the specified node, making sure to add enough /* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */ * gossip informations. */
void clusterSendPing(clusterLink *link, int type) { void clusterSendPing(clusterLink *link, int type) {
@ -2298,11 +2326,15 @@ void clusterSendPing(clusterLink *link, int type) {
if (wanted < 3) wanted = 3; if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes; if (wanted > freshnodes) wanted = freshnodes;
/* Include all the nodes in PFAIL state, so that failure reports are
* faster to propagate to go from PFAIL to FAIL state. */
int pfail_wanted = server.cluster->stats_pfail_nodes;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able * later according to the number of gossip sections we really were able
* to put inside the packet. */ * to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*wanted); totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */ * sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
@ -2319,17 +2351,13 @@ void clusterSendPing(clusterLink *link, int type) {
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes); dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de); clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
/* Don't include this node: the whole packet header is about us /* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */ * already, so we just gossip about other nodes. */
if (this == myself) continue; if (this == myself) continue;
/* Give a bias to FAIL/PFAIL nodes. */ /* PFAIL nodes will be added later. */
if (maxiterations > wanted*2 && if (this->flags & CLUSTER_NODE_PFAIL) continue;
!(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)))
continue;
/* In the gossip section don't include: /* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state. * 1) Nodes in HANDSHAKE state.
@ -2343,27 +2371,37 @@ void clusterSendPing(clusterLink *link, int type) {
continue; continue;
} }
/* Check if we already added this node */ /* Do not add a node we already have. */
for (j = 0; j < gossipcount; j++) { if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;
/* Add it */ /* Add it */
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--; freshnodes--;
gossip = &(hdr->data.ping.gossip[gossipcount]);
memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(this->ping_sent/1000);
gossip->pong_received = htonl(this->pong_received/1000);
memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port);
gossip->cport = htons(this->cport);
gossip->flags = htons(this->flags);
gossip->notused1 = 0;
gossipcount++; gossipcount++;
} }
/* If there are PFAIL nodes, add them at the end. */
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (node->flags & CLUSTER_NODE_NOADDR) continue;
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
clusterSetGossipEntry(hdr,gossipcount,node);
freshnodes--;
gossipcount++;
/* We take the count of the slots we allocated, since the
* PFAIL stats may not match perfectly with the current number
* of PFAIL nodes. */
pfail_wanted--;
}
dictReleaseIterator(di);
}
/* Ready to send... fix the totlen fiend and queue the message in the /* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */ * output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
@ -3189,13 +3227,21 @@ void clusterCron(void) {
handshake_timeout = server.cluster_node_timeout; handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000; if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Check if we have disconnected nodes and re-establish the connection. */ /* Check if we have disconnected nodes and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
di = dictGetSafeIterator(server.cluster->nodes); di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de); clusterNode *node = dictGetVal(de);
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
if (node->flags & CLUSTER_NODE_PFAIL)
server.cluster->stats_pfail_nodes++;
/* A Node in HANDSHAKE state has a limited lifespan equal to the /* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */ * configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {

View File

@ -165,6 +165,8 @@ typedef struct clusterState {
/* Messages received and sent by type. */ /* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
} clusterState; } clusterState;
/* Redis cluster messages header */ /* Redis cluster messages header */