From da257afe5777e1bcf3b6b01e2355252be956b511 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 09:26:36 +0200 Subject: [PATCH 01/33] htonu64() and ntohu64 added to endianconv.h. --- src/endianconv.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/endianconv.h b/src/endianconv.h index 7afe61c6..7c16e175 100644 --- a/src/endianconv.h +++ b/src/endianconv.h @@ -61,4 +61,14 @@ uint64_t intrev64(uint64_t v); #define intrev64ifbe(v) intrev64(v) #endif +/* The functions htonu64() and ntohu64() convert the specified value to + * network byte ordering and back. In big endian systems they are no-ops. */ +#if (BYTE_ORDER == BIG_ENDIAN) +#define htonu64(v) (v) +#define ntohu64(v) (v) +#else +#define htonu64(v) intrev64(v) +#define ntohu64(v) intrev64(v) +#endif + #endif From 12483b0061e0755d17730f24a976d61301450216 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 11:47:13 +0200 Subject: [PATCH 02/33] Cluster: configEpoch added in cluster nodes description. --- src/cluster.c | 28 ++++++++++++++++++++++++++-- src/redis.h | 5 +++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e562c00c..43a8133f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -59,6 +59,23 @@ int bitmapTestBit(unsigned char *bitmap, int pos); * Initialization * -------------------------------------------------------------------------- */ +/* This function is called at startup in order to set the currentEpoch + * (which is not saved on permanent storage) to the greatest configEpoch found + * in the loaded nodes (configEpoch is stored on permanent storage as soon as + * it changes for some node). */ +void clusterSetStartupEpoch() { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->configEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = node->configEpoch; + } + dictReleaseIterator(di); +} + int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); char *line; @@ -143,8 +160,11 @@ int clusterLoadConfig(char *filename) { if (atoi(argv[4])) n->ping_sent = time(NULL); if (atoi(argv[5])) n->pong_received = time(NULL); + /* Set configEpoch for this node. */ + n->configEpoch = strtoull(argv[6],NULL,10); + /* Populate hash slots served by this instance. */ - for (j = 7; j < argc; j++) { + for (j = 8; j < argc; j++) { int start, stop; if (argv[j][0] == '[') { @@ -189,6 +209,7 @@ int clusterLoadConfig(char *filename) { redisAssert(server.cluster->myself != NULL); redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", server.cluster->myself->name); + clusterSetStartupEpoch(); clusterUpdateState(); return REDIS_OK; @@ -230,6 +251,7 @@ void clusterInit(void) { server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; + server.cluster->currentEpoch = 0; server.cluster->state = REDIS_CLUSTER_FAIL; server.cluster->size = 1; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); @@ -360,6 +382,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { else getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); node->ctime = time(NULL); + node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->numslots = 0; @@ -2071,9 +2094,10 @@ sds clusterGenNodesDescription(int filter) { ci = sdscatprintf(ci,"- "); /* Latency from the POV of this node, link status */ - ci = sdscatprintf(ci,"%ld %ld %s", + ci = sdscatprintf(ci,"%ld %ld %llu %s", (long) node->ping_sent, (long) node->pong_received, + (unsigned long long) node->configEpoch, (node->link || node->flags & REDIS_NODE_MYSELF) ? "connected" : "disconnected"); diff --git a/src/redis.h b/src/redis.h index 6e19ea3b..cd2495fd 100644 --- a/src/redis.h +++ b/src/redis.h @@ -617,6 +617,7 @@ struct clusterNode { time_t ctime; /* Node object creation time. */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* REDIS_NODE_... */ + uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ @@ -634,6 +635,7 @@ typedef struct clusterNode clusterNode; typedef struct { clusterNode *myself; /* This node */ + uint64_t currentEpoch; int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ int size; /* Num of master nodes with at least one slot */ dict *nodes; /* Hash table of name -> clusterNode structures */ @@ -707,6 +709,9 @@ typedef struct { uint64_t time; /* Time at which this request was sent (in milliseconds), this field is copied in reply messages so that the original sender knows how old the reply is. */ + uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ + uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch + advertised by its master if it is a slave. */ char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; char slaveof[REDIS_CLUSTER_NAMELEN]; From d426ada891bfe8b1e35d667f4357c14fb357c9af Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 11:53:35 +0200 Subject: [PATCH 03/33] Cluster: broadcast currentEpoch and configEpoch in packets header. --- src/cluster.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 43a8133f..f1cdb915 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1321,12 +1321,20 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { hdr->flags = htons(server.cluster->myself->flags); hdr->state = server.cluster->state; + /* Set the currentEpoch and configEpochs. Note that configEpoch is + * set to the master configEpoch if this node is a slave. */ + hdr->currentEpoch = htonu64(server.cluster->currentEpoch); + if (server.cluster->myself->flags & REDIS_NODE_SLAVE) + hdr->configEpoch = htonu64(server.cluster->myself->slaveof->configEpoch); + else + hdr->configEpoch = htonu64(server.cluster->myself->configEpoch); + if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataFail); } hdr->totlen = htonl(totlen); - /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ + /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ } /* Send a PING or PONG packet to the specified node, making sure to add enough From 6ec795d2cf3147fb8541aed45b9b53cef6e4c627 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:36:29 +0200 Subject: [PATCH 04/33] Cluster: update our currentEpoch when a greater one is seen. --- src/cluster.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index f1cdb915..c5837647 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -873,6 +873,7 @@ int clusterProcessPacket(clusterLink *link) { uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); uint16_t flags = ntohs(hdr->flags); + uint64_t senderCurrentEpoch, senderConfigEpoch; clusterNode *sender; redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", @@ -909,9 +910,17 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Process packets by type. */ + /* Check if the sender is known. + * If it is, update our currentEpoch to its epoch if greater than our. */ sender = clusterLookupNode(hdr->sender); + if (sender && !(sender->flags & REDIS_NODE_HANDSHAKE)) { + senderCurrentEpoch = ntohu64(hdr->currentEpoch); + senderConfigEpoch = ntohu64(hdr->configEpoch); + if (senderCurrentEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = senderCurrentEpoch; + } + /* Process packets by type. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node); From 32b5410af928b9d651a837f79c6c6002615a3048 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:38:36 +0200 Subject: [PATCH 05/33] Cluster: add currentEpoch to CLUSTER INFO. --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index c5837647..ede00f79 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2393,13 +2393,15 @@ void clusterCommand(redisClient *c) { "cluster_slots_fail:%d\r\n" "cluster_known_nodes:%lu\r\n" "cluster_size:%d\r\n" + "cluster_current_epoch:%llu\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, slots_pfail, slots_fail, dictSize(server.cluster->nodes), - server.cluster->size + server.cluster->size, + (unsigned long long) server.cluster->currentEpoch ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); From 341ed1d1a88bfea230a04066f6b496e4457277c0 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:44:47 +0200 Subject: [PATCH 06/33] Cluster: fix redis-trib for added configEpoch field in CLUSTER NODES. --- src/redis-trib.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 96fdddc1..1dc18e95 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -118,7 +118,7 @@ class ClusterNode nodes.each{|n| # name addr flags role ping_sent ping_recv link_status slots split = n.split - name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6] + name,addr,flags,role,ping_sent,ping_recv,config_epoch,link_status = split[0..6] slots = split[7..-1] info = { :name => name, From 656c3ffe4a6fe5fb07243cd755d4b0112e636204 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:51:01 +0200 Subject: [PATCH 07/33] Cluster: fix redis-trib node config fingerprinting for new nodes format. --- src/redis-trib.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 1dc18e95..4b7acea4 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -119,7 +119,7 @@ class ClusterNode # name addr flags role ping_sent ping_recv link_status slots split = n.split name,addr,flags,role,ping_sent,ping_recv,config_epoch,link_status = split[0..6] - slots = split[7..-1] + slots = split[8..-1] info = { :name => name, :addr => addr, @@ -230,7 +230,7 @@ class ClusterNode config = [] @r.cluster("nodes").each_line{|l| s = l.split - slots = s[7..-1].select {|x| x[0..0] != "["} + slots = s[8..-1].select {|x| x[0..0] != "["} next if slots.length == 0 config << s[0]+":"+(slots.sort.join(",")) } From fb9b76fe1435c83d84a3144f1c06ddb1378899b8 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 11:13:17 +0200 Subject: [PATCH 08/33] Cluster: slave node now uses the new protocol to get elected. --- src/cluster.c | 52 ++++++++++++++++++++++++++++++++++++++------------ src/redis.h | 11 ++++++++--- src/sentinel.c | 2 -- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ede00f79..9ce2905e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -257,6 +257,7 @@ void clusterInit(void) { server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_epoch = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -1581,16 +1582,22 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * * The gaol of this function is: * 1) To check if we are able to perform a failover, is our data updated? - * 2) Ask reachable masters the authorization to perform the failover. + * 2) Try to get elected by masters. * 3) Check if there is the majority of masters agreeing we should failover. * 4) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { time_t data_age = server.unixtime - server.repl_down_since; - time_t auth_age = server.unixtime - server.cluster->failover_auth_time; + mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Remove the node timeout from the data age as it is fine that we are + * disconnected from our master at least for the time it was down to be + * flagged as FAIL, that's the baseline. */ + if (data_age > server.cluster_node_timeout) + data_age -= server.cluster_node_timeout; + /* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. */ @@ -1598,19 +1605,37 @@ void clusterHandleSlaveFailover(void) { server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT) return; - /* TODO: check if we are the first slave as well? Or just rely on the - * master authorization? */ - - /* Ask masters if we are authorized to perform the failover. If there - * is a pending auth request that's too old, reset it. */ + /* Compute the time at which we can start an election. */ if (server.cluster->failover_auth_time == 0 || auth_age > - server.cluster_node_timeout * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) + server.cluster_node_timeout * 1000 * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) { - redisLog(REDIS_WARNING,"Asking masters if I can failover..."); - server.cluster->failover_auth_time = time(NULL); + server.cluster->failover_auth_time = mstime() + + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ + data_age * 100 + /* Add 100 milliseconds for every second of age. */ + random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_sent = 0; + redisLog(REDIS_WARNING,"Start of election delayed for %lld milliseconds.", + server.cluster->failover_auth_time - mstime()); + return; + } + + /* Return ASAP if we can't still start the election. */ + if (mstime() < server.cluster->failover_auth_time) return; + + /* Return ASAP if the election is too old to be valid. */ + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout) + return; + + /* Ask for votes if needed. */ + if (server.cluster->failover_auth_sent == 0) { + server.cluster->currentEpoch++; + server.cluster->failover_auth_epoch = server.cluster->currentEpoch; + redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.", + server.cluster->currentEpoch); clusterRequestFailoverAuth(); + server.cluster->failover_auth_sent = 1; return; /* Wait for replies. */ } @@ -1619,7 +1644,7 @@ void clusterHandleSlaveFailover(void) { clusterNode *oldmaster = server.cluster->myself->slaveof; redisLog(REDIS_WARNING, - "Masters quorum reached: failing over my (failing) master."); + "Failover election won: failing over my (failing) master."); /* We have the quorum, perform all the steps to correctly promote * this slave to a master. * @@ -1644,7 +1669,10 @@ void clusterHandleSlaveFailover(void) { * accordingly and detect that we switched to master role. */ clusterBroadcastPong(); - /* 4) Update state and save config. */ + /* 4) Update my configEpoch to the epoch of the election. */ + server.cluster->myself->configEpoch = server.cluster->failover_auth_epoch; + + /* 5) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(); } diff --git a/src/redis.h b/src/redis.h index cd2495fd..2b7ca7a0 100644 --- a/src/redis.h +++ b/src/redis.h @@ -368,6 +368,8 @@ * Data types *----------------------------------------------------------------------------*/ +typedef long long mstime_t; /* millisecond time type. */ + /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ @@ -581,7 +583,7 @@ typedef struct redisOpArray { #define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */ #define REDIS_CLUSTER_SLAVE_VALIDITY_MULT 10 /* Slave data validity. */ -#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 1 /* Auth request retry time. */ +#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 4 /* Auth request retry time. */ #define REDIS_CLUSTER_FAILOVER_DELAY 5 /* Seconds */ struct clusterNode; @@ -643,8 +645,11 @@ typedef struct { clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; clusterNode *slots[REDIS_CLUSTER_SLOTS]; zskiplist *slots_to_keys; - int failover_auth_time; /* Time at which we sent the AUTH request. */ - int failover_auth_count; /* Number of authorizations received. */ + /* The following fields are used to take the slave state on elections. */ + mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms. */ + int failover_auth_count; /* Number of votes received so far. */ + int failover_auth_sent; /* True if we already asked for votes. */ + uint64_t failover_auth_epoch; /* Epoch of the current election. */ } clusterState; /* Redis cluster messages header */ diff --git a/src/sentinel.c b/src/sentinel.c index b257ad68..4bea156d 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -43,8 +43,6 @@ extern char **environ; /* ======================== Sentinel global state =========================== */ -typedef long long mstime_t; /* millisecond time type. */ - /* Address object, used to describe an ip:port pair. */ typedef struct sentinelAddr { char *ip; From a445aa30a0fb72bc54da083998b3458fad92820f Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 13:00:41 +0200 Subject: [PATCH 09/33] Cluster: master node now uses new protocol to vote. --- src/cluster.c | 69 +++++++++++++++++++++++++-------------------------- src/redis.h | 3 +++ 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 9ce2905e..24d3efe3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -258,6 +258,7 @@ void clusterInit(void) { server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; server.cluster->failover_auth_epoch = 0; + server.cluster->last_vote_epoch = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -396,6 +397,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { memset(node->ip,0,sizeof(node->ip)); node->port = 0; node->fail_reports = listCreate(); + node->voted_time = 0; listSetFreeMethod(node->fail_reports,zfree); return node; } @@ -1178,15 +1180,18 @@ int clusterProcessPacket(clusterLink *link) { } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ - /* If we are not a master, ignore that message at all. */ - if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return 0; clusterSendFailoverAuthIfNeeded(sender,hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ - /* If this is a master, increment the number of acknowledges - * we received so far. */ - if (sender->flags & REDIS_NODE_MASTER) + /* We consider this vote only if the sender if a master serving + * a non zero number of slots, with the currentEpoch that is equal + * to our currentEpoch. */ + if (sender->flags & REDIS_NODE_MASTER && + sender->numslots > 0 && + senderCurrentEpoch == server.cluster->currentEpoch) + { server.cluster->failover_auth_count++; + } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } @@ -1538,43 +1543,38 @@ void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { clusterSendMessage(node->link,buf,totlen); } -/* If we believe 'node' is the "first slave" of it's master, reply with - * a FAILOVER_AUTH_GRANTED packet. - * The 'request' field points to the authorization request packet header, we - * need it in order to copy back the 'time' field in our reply. - * - * To be a first slave the sender must: - * 1) Be a slave. - * 2) Its master should be in FAIL state. - * 3) Ordering all the slaves IDs for its master by run-id, it should be the - * first (the smallest) among the ones not in FAIL / PFAIL state. - */ +/* Vote for the node asking for our vote if there are the conditions. */ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { - char first[REDIS_CLUSTER_NAMELEN]; clusterNode *master = node->slaveof; - int j; + uint64_t requestEpoch = ntohu64(request->currentEpoch); - /* Node is a slave? Its master is down? */ + /* IF we are not a master serving at least 1 slot, we don't have the + * right to vote, as the cluster size in Redis Cluster is the number + * of masters serving at least one slot, and quorum is the cluster size + 1 */ + if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return; + if (server.cluster->myself->numslots == 0) return; + + /* Request epoch must be >= our currentEpoch. */ + if (requestEpoch < server.cluster->currentEpoch) return; + + /* I already voted for this epoch? Return ASAP. */ + if (server.cluster->last_vote_epoch == server.cluster->currentEpoch) return; + + /* Node must be a slave and its master down. */ if (!(node->flags & REDIS_NODE_SLAVE) || master == NULL || !(master->flags & REDIS_NODE_FAIL)) return; - /* Iterate all the master slaves to check what's the first one. */ - memset(first,0xff,sizeof(first)); - for (j = 0; j < master->numslaves; j++) { - clusterNode *slave = master->slaves[j]; + /* We did not voted for a slave about this master for two + * times the node timeout. This is not strictly needed for correctness + * of the algorithm but makes the base case more linear. */ + if (server.unixtime - node->slaveof->voted_time < + server.cluster_node_timeout * 2) return; - if (slave->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) continue; - if (memcmp(slave->name,first,sizeof(first)) < 0) { - memcpy(first,slave->name,sizeof(first)); - } - } - - /* Is 'node' the first slave? */ - if (memcmp(node->name,first,sizeof(first)) != 0) return; - - /* We can send the packet. */ + /* We can vote for this slave. */ clusterSendFailoverAuth(node,request->time); + server.cluster->last_vote_epoch = server.cluster->currentEpoch; + node->slaveof->voted_time = server.unixtime; } /* This function is called if we are a slave node and our master serving @@ -1583,8 +1583,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * The gaol of this function is: * 1) To check if we are able to perform a failover, is our data updated? * 2) Try to get elected by masters. - * 3) Check if there is the majority of masters agreeing we should failover. - * 4) Perform the failover informing all the other nodes. + * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { time_t data_age = server.unixtime - server.repl_down_since; diff --git a/src/redis.h b/src/redis.h index 2b7ca7a0..66c751a1 100644 --- a/src/redis.h +++ b/src/redis.h @@ -628,6 +628,7 @@ struct clusterNode { time_t ping_sent; /* Unix time we sent latest ping */ time_t pong_received; /* Unix time we received the pong */ time_t fail_time; /* Unix time when FAIL flag was set */ + time_t voted_time; /* Last time we voted for a slave of this master */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; /* TCP/IP link with this node */ @@ -650,6 +651,8 @@ typedef struct { int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ + /* The followign fields are uesd by masters to take state on elections. */ + uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ } clusterState; /* Redis cluster messages header */ From 42fa46e49a9b71d38f4410b3376f3f2bec8d716d Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 13:28:19 +0200 Subject: [PATCH 10/33] Cluster: removed an old source of delay to start the slave failover. --- src/cluster.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 24d3efe3..d8508606 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1624,7 +1624,7 @@ void clusterHandleSlaveFailover(void) { if (mstime() < server.cluster->failover_auth_time) return; /* Return ASAP if the election is too old to be valid. */ - if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout) + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout * 1000) return; /* Ask for votes if needed. */ @@ -1835,16 +1835,10 @@ void clusterCron(void) { } /* If we are a slave and our master is down, but is serving slots, - * call the function that handles the failover. - * This function is called with a small delay in order to let the - * FAIL message to propagate after failure detection, this is not - * strictly required but makes 99.99% of failovers mechanically - * simpler. */ + * call the function that handles the failover. */ if (server.cluster->myself->flags & REDIS_NODE_SLAVE && server.cluster->myself->slaveof && server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL && - (server.unixtime - server.cluster->myself->slaveof->fail_time) > - REDIS_CLUSTER_FAILOVER_DELAY && server.cluster->myself->slaveof->numslots != 0) { clusterHandleSlaveFailover(); From 7c4b8f29e76ec24f41a0180df54057701345cf54 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 16:54:43 +0200 Subject: [PATCH 11/33] Cluster: react faster when a slave wins an election. --- src/cluster.c | 55 +++++++++++++++++++++++++++++++++++++-------------- src/redis.c | 3 +++ src/redis.h | 4 +++- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d8508606..dd496be4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -53,6 +53,7 @@ int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); +void clusterHandleSlaveFailover(void); int bitmapTestBit(unsigned char *bitmap, int pos); /* ----------------------------------------------------------------------------- @@ -1191,6 +1192,9 @@ int clusterProcessPacket(clusterLink *link) { senderCurrentEpoch == server.cluster->currentEpoch) { server.cluster->failover_auth_count++; + /* Maybe we reached a quorum here, set a flag to make sure + * we check ASAP. */ + server.cluster->handle_slave_failover_asap++; } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); @@ -1291,7 +1295,11 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } -/* Put stuff into the send buffer. */ +/* Put stuff into the send buffer. + * + * It is guaranteed that this function will never have as a side effect + * the link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) aeCreateFileEvent(server.el,link->fd,AE_WRITABLE, @@ -1301,7 +1309,11 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { } /* Send a message to all the nodes that are part of the cluster having - * a connected link. */ + * a connected link. + * + * It is guaranteed that this function will never have as a side effect + * some node->link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with node links later. */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de; @@ -1416,10 +1428,11 @@ void clusterSendPing(clusterLink *link, int type) { clusterSendMessage(link,buf,totlen); } -/* Send a PONG packet to every connected node that's not in handshake state. +/* Send a PONG packet to every connected node that's not in handshake state + * and for which we have a valid link. * - * In Redis Cluster pings are not just used for failure detection, but also - * to carry important configuration informations. So broadcasting a pong is + * In Redis Cluster pongs are not used just for failure detection, but also + * to carry important configuration information. So broadcasting a pong is * useful when something changes in the configuration and we want to make * the cluster aware ASAP (for instance after a slave promotion). */ void clusterBroadcastPong(void) { @@ -1430,6 +1443,7 @@ void clusterBroadcastPong(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); + if (!node->link) continue; if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG); } @@ -1591,6 +1605,15 @@ void clusterHandleSlaveFailover(void) { int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Pre conditions to run the function: + * 1) We are a slave. + * 2) Our master is flagged as FAIL. + * 3) It is serving slots. */ + if (!(server.cluster->myself->flags & REDIS_NODE_SLAVE) || + server.cluster->myself->slaveof == NULL || + !(server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL) || + server.cluster->myself->slaveof->numslots == 0) return; + /* Remove the node timeout from the data age as it is fine that we are * disconnected from our master at least for the time it was down to be * flagged as FAIL, that's the baseline. */ @@ -1834,19 +1857,21 @@ void clusterCron(void) { server.cluster->myself->slaveof->port); } - /* If we are a slave and our master is down, but is serving slots, - * call the function that handles the failover. */ - if (server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof && - server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL && - server.cluster->myself->slaveof->numslots != 0) - { - clusterHandleSlaveFailover(); - } - + clusterHandleSlaveFailover(); if (update_state) clusterUpdateState(); } +/* This function is called before the event handler returns to sleep for + * events. It is useful to perform operations that must be done ASAP in + * reaction to events fired but that are not safe to perform inside event + * handlers. */ +void clusterBeforeSleep(void) { + if (server.cluster->handle_slave_failover_asap) { + clusterHandleSlaveFailover(); + server.cluster->handle_slave_failover_asap = 0; + } +} + /* ----------------------------------------------------------------------------- * Slots management * -------------------------------------------------------------------------- */ diff --git a/src/redis.c b/src/redis.c index 2792f139..bd547cd3 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1203,6 +1203,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); + + /* Call the Redis Cluster before sleep function. */ + if (server.cluster_enabled) clusterBeforeSleep(); } /* =========================== Server initialization ======================== */ diff --git a/src/redis.h b/src/redis.h index 66c751a1..5883bd38 100644 --- a/src/redis.h +++ b/src/redis.h @@ -647,12 +647,13 @@ typedef struct { clusterNode *slots[REDIS_CLUSTER_SLOTS]; zskiplist *slots_to_keys; /* The following fields are used to take the slave state on elections. */ - mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms. */ + mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms*/ int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ + int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ } clusterState; /* Redis cluster messages header */ @@ -1380,6 +1381,7 @@ void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); +void clusterBeforeSleep(void); /* Sentinel */ void initSentinelConfig(void); From 026e63392e29b2d4d66bc833d2194e0679df55f2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Sep 2013 09:55:41 +0200 Subject: [PATCH 12/33] Cluster: update the node configEpoch when newer is detected. --- src/cluster.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index dd496be4..7aad6654 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -914,14 +914,17 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Check if the sender is known. - * If it is, update our currentEpoch to its epoch if greater than our. */ + /* Check if the sender is a known node. */ sender = clusterLookupNode(hdr->sender); if (sender && !(sender->flags & REDIS_NODE_HANDSHAKE)) { + /* Update our curretEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); senderConfigEpoch = ntohu64(hdr->configEpoch); if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; + /* Update the sender configEpoch if it is publishing a newer one. */ + if (senderConfigEpoch > sender->configEpoch) + sender->configEpoch = senderConfigEpoch; } /* Process packets by type. */ @@ -1999,8 +2002,14 @@ void clusterUpdateState(void) { } /* If we can't reach at least half the masters, change the cluster state - * as FAIL, as we are not even able to mark nodes as FAIL in this side - * of the netsplit because of lack of majority. */ + * to FAIL, as we are not even able to mark nodes as FAIL in this side + * of the netsplit because of lack of majority. + * + * TODO: when this condition is entered, we should not undo it for some + * (small) time after the majority is reachable again, to make sure that + * other nodes have enough time to inform this node of a configuration change. + * Otherwise a client with an old routing table may write to this node + * and later it may turn into a slave losing the write. */ { int needed_quorum = (server.cluster->size / 2) + 1; From 03ca9039835872b5ea14c361a6a970e7646cecb8 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:07 +0200 Subject: [PATCH 13/33] Cluster: fsync data when saving the cluster config. --- src/cluster.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster.c b/src/cluster.c index 7aad6654..4751c78f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -231,6 +231,7 @@ int clusterSaveConfig(void) { if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; + fsync(fd); close(fd); sdsfree(ci); return 0; From b1875177198959710d9c9a348eb9f116b797b6bc Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:33 +0200 Subject: [PATCH 14/33] Cluster: when upading the configEpoch for a node, save config on disk ASAP. --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 4751c78f..f0a6ddeb 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -924,8 +924,10 @@ int clusterProcessPacket(clusterLink *link) { if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is publishing a newer one. */ - if (senderConfigEpoch > sender->configEpoch) + if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; + clusterSaveConfigOrDie(); + } } /* Process packets by type. */ From 62b1591439901557b3a177469ed8d54da77e7503 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:58 +0200 Subject: [PATCH 15/33] Cluster: re-order failover operations to make it safer. We need to: 1) Increment the configEpoch. 2) Save it to disk and fsync the file. 3) Broadcast the PONG with the new configuration. If other nodes will receive the updated configuration we need to be sure to restart with this new config in the event of a crash. --- src/cluster.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index f0a6ddeb..e8ee45c4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1693,16 +1693,16 @@ void clusterHandleSlaveFailover(void) { } } - /* 3) Pong all the other nodes so that they can update the state - * accordingly and detect that we switched to master role. */ - clusterBroadcastPong(); - - /* 4) Update my configEpoch to the epoch of the election. */ + /* 3) Update my configEpoch to the epoch of the election. */ server.cluster->myself->configEpoch = server.cluster->failover_auth_epoch; - /* 5) Update state and save config. */ + /* 4) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(); + + /* 5) Pong all the other nodes so that they can update the state + * accordingly and detect that we switched to master role. */ + clusterBroadcastPong(); } } From 4dc247eb31e697228d9de0298397dfc582b470aa Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:44:23 +0200 Subject: [PATCH 16/33] Cluster: detect cluster reconfiguration when master slots drop to 0. The old algorithm used a PROMOTED flag and explicitly checks about slave->master convertions. Wit the new cluster meta-data propagation algorithm we just look at the configEpoch to check if we need to reconfigure slots, then: 1) If a node is a master but it reaches zero served slots becuase of reconfiguration. 2) If a node is a slave but the master reaches zero served slots because of a reconfiguration. We switch as a replica of the new slots owner. --- src/cluster.c | 77 ++++++++++++++++++++++++--------------------------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e8ee45c4..a5faa1b8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -135,8 +135,6 @@ int clusterLoadConfig(char *filename) { n->flags |= REDIS_NODE_HANDSHAKE; } else if (!strcasecmp(s,"noaddr")) { n->flags |= REDIS_NODE_NOADDR; - } else if (!strcasecmp(s,"promoted")) { - n->flags |= REDIS_NODE_PROMOTED; } else if (!strcasecmp(s,"noflags")) { /* nothing to do */ } else { @@ -755,7 +753,6 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,"); if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s", @@ -1051,9 +1048,6 @@ int clusterProcessPacket(clusterLink *link) { { /* Node is a master. */ if (sender->flags & REDIS_NODE_SLAVE) { - /* Slave turned into master! */ - clusterNode *oldmaster = sender->slaveof; - /* Reconfigure node as master. */ if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender); @@ -1061,29 +1055,6 @@ int clusterProcessPacket(clusterLink *link) { sender->flags |= REDIS_NODE_MASTER; sender->slaveof = NULL; - /* If this node used to be our slave, and now has the - * PROMOTED flag set. We'll turn ourself into a slave - * of the new master. */ - if (flags & REDIS_NODE_PROMOTED && - oldmaster == server.cluster->myself) - { - redisLog(REDIS_WARNING,"One of my slaves took my place. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - - /* If we are a slave, and this node used to be a slave - * of our master, and now has the PROMOTED flag set, we - * need to switch our replication setup over it. */ - if (flags & REDIS_NODE_PROMOTED && - server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof == oldmaster) - { - redisLog(REDIS_WARNING,"One of the slaves failed over my master. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - /* Update config and state. */ update_state = 1; update_config = 1; @@ -1125,26 +1096,55 @@ int clusterProcessPacket(clusterLink *link) { changes = memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0; if (changes) { + clusterNode *curmaster, *newmaster = NULL; + + /* Here we set curmaster to this node or the node this node + * replicates to if it's a slave. In the for loop we are + * interested to check if slots are taken away from curmaster. */ + if (server.cluster->myself->flags & REDIS_NODE_MASTER) + curmaster = server.cluster->myself; + else + curmaster = server.cluster->myself->slaveof; + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots,j)) { - /* If this slot was not served, or served by a node - * in FAIL state, update the table with the new node - * claiming to serve the slot. */ + /* We rebind the slot to the new node claiming it if: + * 1) The slot was unassigned. + * 2) The new node claims it with a greater configEpoch. */ if (server.cluster->slots[j] == sender) continue; if (server.cluster->slots[j] == NULL || - server.cluster->slots[j]->flags & REDIS_NODE_FAIL) + server.cluster->slots[j]->configEpoch < + senderConfigEpoch) { + if (server.cluster->slots[j] == curmaster) + newmaster = sender; clusterDelSlot(j); clusterAddSlot(sender,j); update_state = update_config = 1; } } else { /* This node claims to no longer handling the slot, - * however we don't change our config as this is likely - * happening because a resharding is in progress, and - * it already knows where to redirect clients. */ + * however we don't change our config as this is likely: + * 1) Rehashing in progress. + * 2) Failover. + * In both cases we'll be informed about who is serving + * the slot eventually. In the meantime it's up to the + * original owner to try to redirect our clients to the + * right node. */ } } + + /* If at least one slot was reassigned from a node to another node + * with a greater configEpoch, it is possible that: + * 1) We are a master is left without slots. This means that we were + * failed over and we should turn into a replica of the new + * master. + * 2) We are a slave and our master is left without slots. We need + * to replicate to the new slots owner. */ + if (newmaster && curmaster->numslots == 0) { + redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name); + clusterSetMaster(sender); + } } } @@ -1681,7 +1681,6 @@ void clusterHandleSlaveFailover(void) { server.cluster->myself); server.cluster->myself->flags &= ~REDIS_NODE_SLAVE; server.cluster->myself->flags |= REDIS_NODE_MASTER; - server.cluster->myself->flags |= REDIS_NODE_PROMOTED; server.cluster->myself->slaveof = NULL; replicationUnsetMaster(); @@ -2109,9 +2108,6 @@ void clusterSetMaster(clusterNode *n) { myself->flags &= ~REDIS_NODE_MASTER; myself->flags |= REDIS_NODE_SLAVE; } - /* Clear the promoted flag anyway if we are a slave, to ensure it will - * be set only when the node turns into a master because of fail over. */ - myself->flags &= ~REDIS_NODE_PROMOTED; myself->slaveof = n; replicationSetMaster(n->ip, n->port); } @@ -2159,7 +2155,6 @@ sds clusterGenNodesDescription(int filter) { if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,"); if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (node->flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; /* Slave of... or just "-" */ From 707ff0f714dd19391c52ecb0e6a1264c1a9f1de4 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:48:09 +0200 Subject: [PATCH 17/33] Make clear that runids are not cluster node IDs. --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 81323cb6..0b75f4ee 100644 --- a/src/replication.c +++ b/src/replication.c @@ -343,7 +343,7 @@ int masterTryPartialResynchronization(redisClient *c) { /* Run id "?" is used by slaves that want to force a full resync. */ if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " - "Runid mismatch (Client asked for '%s', I'm '%s')", + "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); From 2d0844ee3762fa49f71a983cb0acad40bfc7e469 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:51:58 +0200 Subject: [PATCH 18/33] Cluster: log message shortened. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index a5faa1b8..a174a964 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1672,7 +1672,7 @@ void clusterHandleSlaveFailover(void) { clusterNode *oldmaster = server.cluster->myself->slaveof; redisLog(REDIS_WARNING, - "Failover election won: failing over my (failing) master."); + "Failover election won: I'm the new master."); /* We have the quorum, perform all the steps to correctly promote * this slave to a master. * From 2b93a195374665d8bc74b33b4c769bdf9db97014 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:53:18 +0200 Subject: [PATCH 19/33] Add REWRITE to CONFIG subcommands help message. --- src/config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.c b/src/config.c index 3a14a6e9..2707bfae 100644 --- a/src/config.c +++ b/src/config.c @@ -1786,7 +1786,7 @@ void configCommand(redisClient *c) { } } else { addReplyError(c, - "CONFIG subcommand must be one of GET, SET, RESETSTAT"); + "CONFIG subcommand must be one of GET, SET, RESETSTAT, REWRITE"); } return; From 1dedf9aa36b69d8721be19b7eb37b0c1b4f8e917 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 16:19:44 +0200 Subject: [PATCH 20/33] Cluster: time field removed from cluster messages header. The new algorithm does not check replies time as checking for the currentEpoch in the reply ensures that the reply is about the current election process. --- src/cluster.c | 10 +++------- src/redis.h | 3 --- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a174a964..42c4acb9 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1543,14 +1543,11 @@ void clusterRequestFailoverAuth(void) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = mstime(); clusterBroadcastMessage(buf,totlen); } -/* Send a FAILOVER_AUTH_ACK message to the specified node. - * Reqtime is the time field from the original failover auth request packet, - * so that the receiver is able to check the reply age. */ -void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { +/* Send a FAILOVER_AUTH_ACK message to the specified node. */ +void clusterSendFailoverAuth(clusterNode *node) { unsigned char buf[4096]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -1559,7 +1556,6 @@ void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = reqtime; clusterSendMessage(node->link,buf,totlen); } @@ -1592,7 +1588,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { server.cluster_node_timeout * 2) return; /* We can vote for this slave. */ - clusterSendFailoverAuth(node,request->time); + clusterSendFailoverAuth(node); server.cluster->last_vote_epoch = server.cluster->currentEpoch; node->slaveof->voted_time = server.unixtime; } diff --git a/src/redis.h b/src/redis.h index 5883bd38..995198f6 100644 --- a/src/redis.h +++ b/src/redis.h @@ -715,9 +715,6 @@ typedef struct { uint32_t totlen; /* Total length of this message */ uint16_t type; /* Message type */ uint16_t count; /* Only used for some kind of messages. */ - uint64_t time; /* Time at which this request was sent (in milliseconds), - this field is copied in reply messages so that the - original sender knows how old the reply is. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. */ From f1bfd8233bac4b8e07b1f3f89cb3790d05445b11 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 1 Oct 2013 15:40:20 +0200 Subject: [PATCH 21/33] Cluster: fix typo in clusterProcessPacket() comment. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 42c4acb9..e5aa043f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1190,7 +1190,7 @@ int clusterProcessPacket(clusterLink *link) { clusterSendFailoverAuthIfNeeded(sender,hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ - /* We consider this vote only if the sender if a master serving + /* We consider this vote only if the sender is a master serving * a non zero number of slots, with the currentEpoch that is equal * to our currentEpoch. */ if (sender->flags & REDIS_NODE_MASTER && From 7970ebd80a735f6c73c8be1bda0e77f4bcc47a34 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 1 Oct 2013 17:21:28 +0200 Subject: [PATCH 22/33] Cluster: senderCurrentEpoch == node currentEpoch was too strict. We can accept a vote as long as its epoch is >= the epoch at which we started the voting process. There is no need for it to be exactly the same. --- src/cluster.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e5aa043f..ab528393 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1191,11 +1191,11 @@ int clusterProcessPacket(clusterLink *link) { } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a master serving - * a non zero number of slots, with the currentEpoch that is equal - * to our currentEpoch. */ + * a non zero number of slots, and its currentEpoch is greater or + * equal to epoch where this node started the election. */ if (sender->flags & REDIS_NODE_MASTER && sender->numslots > 0 && - senderCurrentEpoch == server.cluster->currentEpoch) + senderCurrentEpoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure From abe81781aeaeb0d7d3837df6c5cba71f0fae5161 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 09:42:35 +0200 Subject: [PATCH 23/33] Cluster: FAIL messages from unknown senders are handled better. Previously the event was not logged but instead the node reported an unknown packet type received. --- src/cluster.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ab528393..9328ab1b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1154,20 +1154,26 @@ int clusterProcessPacket(clusterLink *link) { /* Update the cluster state if needed */ if (update_state) clusterUpdateState(); if (update_config) clusterSaveConfigOrDie(); - } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { + } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; - failing = clusterLookupNode(hdr->data.fail.about.nodename); - if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) - { + if (sender) { + failing = clusterLookupNode(hdr->data.fail.about.nodename); + if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) + { + redisLog(REDIS_NOTICE, + "FAIL message received from %.40s about %.40s", + hdr->sender, hdr->data.fail.about.nodename); + failing->flags |= REDIS_NODE_FAIL; + failing->fail_time = time(NULL); + failing->flags &= ~REDIS_NODE_PFAIL; + clusterUpdateState(); + clusterSaveConfigOrDie(); + } + } else { redisLog(REDIS_NOTICE, - "FAIL message received from %.40s about %.40s", + "Ignoring FAIL message from unknonw node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); - failing->flags |= REDIS_NODE_FAIL; - failing->fail_time = time(NULL); - failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); } } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; From 6c4d904baf7d5c8717cce04ec69fc491f710823a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 10:10:08 +0200 Subject: [PATCH 24/33] Cluster: bus messages stats in CLUSTER info. --- src/cluster.c | 10 +++++++++- src/redis.h | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 9328ab1b..c9011246 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -259,6 +259,8 @@ void clusterInit(void) { server.cluster->failover_auth_count = 0; server.cluster->failover_auth_epoch = 0; server.cluster->last_vote_epoch = 0; + server.cluster->stats_bus_messages_sent = 0; + server.cluster->stats_bus_messages_received = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -878,6 +880,7 @@ int clusterProcessPacket(clusterLink *link) { uint64_t senderCurrentEpoch, senderConfigEpoch; clusterNode *sender; + server.cluster->stats_bus_messages_received++; redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); @@ -1318,6 +1321,7 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); + server.cluster->stats_bus_messages_sent++; } /* Send a message to all the nodes that are part of the cluster having @@ -2449,6 +2453,8 @@ void clusterCommand(redisClient *c) { "cluster_known_nodes:%lu\r\n" "cluster_size:%d\r\n" "cluster_current_epoch:%llu\r\n" + "cluster_stats_messages_sent:%lld\r\n" + "cluster_stats_messages_received:%lld\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, @@ -2456,7 +2462,9 @@ void clusterCommand(redisClient *c) { slots_fail, dictSize(server.cluster->nodes), server.cluster->size, - (unsigned long long) server.cluster->currentEpoch + (unsigned long long) server.cluster->currentEpoch, + server.cluster->stats_bus_messages_sent, + server.cluster->stats_bus_messages_received ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); diff --git a/src/redis.h b/src/redis.h index 995198f6..844c4a32 100644 --- a/src/redis.h +++ b/src/redis.h @@ -654,6 +654,8 @@ typedef struct { /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ + long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ + long long stats_bus_messages_received; /* Num of msg received via cluster bus. */ } clusterState; /* Redis cluster messages header */ From 211dcbe339be00b84dcbbd8afdb6578ddc793df6 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 12:27:12 +0200 Subject: [PATCH 25/33] Cluster: update cluster config when slave changes master. --- src/cluster.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cluster.c b/src/cluster.c index c9011246..35b6a5b4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1086,6 +1086,9 @@ int clusterProcessPacket(clusterLink *link) { clusterNodeRemoveSlave(sender->slaveof,sender); clusterNodeAddSlave(master,sender); sender->slaveof = master; + + /* Update config. */ + update_config = 1; } } } From 7afc0dd59a7ac0a76b35d6589f83f4fce1f69a0f Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 3 Oct 2013 09:55:20 +0200 Subject: [PATCH 26/33] Cluster: new clusterDoBeforeSleep() API. The new API is able to remember operations to perform before returning to the event loop, such as checking if there is the failover quorum for a slave, save and fsync the configuraiton file, and so forth. Because this operations are performed before returning on the event loop we are sure that messages that are sent in the same event loop run will be delivered *after* the configuration is already saved, that is a requirement sometimes. For instance we want to publish a new epoch only when it is already stored in nodes.conf in order to avoid returning back in the logical clock when a node is restarted. This new API provides a big performance advantage compared to saving and possibly fsyncing the configuration file multiple times in the same event loop run, especially in the case of big clusters with tens or hundreds of nodes. --- src/cluster.c | 127 ++++++++++++++++++++++++++------------------------ src/redis.h | 8 +++- 2 files changed, 72 insertions(+), 63 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 35b6a5b4..403ccf38 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -55,6 +55,7 @@ int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); void clusterHandleSlaveFailover(void); int bitmapTestBit(unsigned char *bitmap, int pos); +void clusterDoBeforeSleep(int flags); /* ----------------------------------------------------------------------------- * Initialization @@ -222,14 +223,14 @@ fmterr: * * This function writes the node config and returns 0, on error -1 * is returned. */ -int clusterSaveConfig(void) { +int clusterSaveConfig(int do_fsync) { sds ci = clusterGenNodesDescription(REDIS_NODE_HANDSHAKE); int fd; if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; - fsync(fd); + if (do_fsync) fsync(fd); close(fd); sdsfree(ci); return 0; @@ -239,8 +240,8 @@ err: return -1; } -void clusterSaveConfigOrDie(void) { - if (clusterSaveConfig() == -1) { +void clusterSaveConfigOrDie(int do_fsync) { + if (clusterSaveConfig(do_fsync) == -1) { redisLog(REDIS_WARNING,"Fatal: can't update cluster config file."); exit(1); } @@ -277,7 +278,7 @@ void clusterInit(void) { clusterAddNode(server.cluster->myself); saveconf = 1; } - if (saveconf) clusterSaveConfigOrDie(); + if (saveconf) clusterSaveConfigOrDie(1); /* We need a listening TCP port for our cluster messaging needs. */ server.cfd_count = 0; @@ -665,15 +666,13 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { * reachable nodes to flag the node as FAIL. */ if (server.cluster->myself->flags & REDIS_NODE_MASTER) clusterSendFail(node->name); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* This function is called only if a node is marked as FAIL, but we are able * to reach it again. It checks if there are the conditions to undo the FAIL * state. */ void clearNodeFailureIfNeeded(clusterNode *node) { - int changes = 0; time_t now = time(NULL); redisAssert(node->flags & REDIS_NODE_FAIL); @@ -685,7 +684,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s: slave is already reachable.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* If it is a master and... @@ -705,13 +704,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; - } - - /* Update state and save config. */ - if (changes) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } } @@ -926,13 +919,12 @@ int clusterProcessPacket(clusterLink *link) { /* Update the sender configEpoch if it is publishing a newer one. */ if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); } } /* Process packets by type. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { - int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node); /* Add this node if it is new for us and the msg type is MEET. @@ -946,7 +938,7 @@ int clusterProcessPacket(clusterLink *link) { nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); clusterAddNode(node); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } /* Get info from the gossip section */ @@ -954,18 +946,12 @@ int clusterProcessPacket(clusterLink *link) { /* Anyway reply with a PONG */ clusterSendPing(link,CLUSTERMSG_TYPE_PONG); - - /* Update config if needed */ - if (update_config) clusterSaveConfigOrDie(); } /* PING or PONG: process config information. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { - int update_state = 0; - int update_config = 0; - redisLog(REDIS_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node); @@ -978,8 +964,8 @@ int clusterProcessPacket(clusterLink *link) { "Handshake error: we already know node %.40s, updating the address if needed.", sender->name); if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we alrady have it. This will * cause the link to be freed as well. */ @@ -994,7 +980,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->name); link->node->flags &= ~REDIS_NODE_HANDSHAKE; link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { @@ -1006,7 +992,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->ip[0] = '\0'; link->node->port = 0; freeClusterLink(link); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); /* FIXME: remove this node if we already have it. * * If we already have it but the IP is different, use @@ -1021,8 +1007,7 @@ int clusterProcessPacket(clusterLink *link) { !(sender->flags & REDIS_NODE_HANDSHAKE) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } /* Update our info about the node */ @@ -1038,7 +1023,8 @@ int clusterProcessPacket(clusterLink *link) { * conditions detected by clearNodeFailureIfNeeded(). */ if (link->node->flags & REDIS_NODE_PFAIL) { link->node->flags &= ~REDIS_NODE_PFAIL; - update_state = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } else if (link->node->flags & REDIS_NODE_FAIL) { clearNodeFailureIfNeeded(link->node); } @@ -1059,8 +1045,8 @@ int clusterProcessPacket(clusterLink *link) { sender->slaveof = NULL; /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } } else { /* Node is a slave. */ @@ -1076,8 +1062,8 @@ int clusterProcessPacket(clusterLink *link) { if (sender->numslaves) clusterNodeResetSlaves(sender); /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Master node changed for this slave? */ @@ -1088,7 +1074,7 @@ int clusterProcessPacket(clusterLink *link) { sender->slaveof = master; /* Update config. */ - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } } @@ -1126,7 +1112,9 @@ int clusterProcessPacket(clusterLink *link) { newmaster = sender; clusterDelSlot(j); clusterAddSlot(sender,j); - update_state = update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } } else { /* This node claims to no longer handling the slot, @@ -1150,16 +1138,15 @@ int clusterProcessPacket(clusterLink *link) { if (newmaster && curmaster->numslots == 0) { redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name); clusterSetMaster(sender); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } } } /* Get info from the gossip section */ clusterProcessGossipSection(hdr,link); - - /* Update the cluster state if needed */ - if (update_state) clusterUpdateState(); - if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; @@ -1173,8 +1160,7 @@ int clusterProcessPacket(clusterLink *link) { failing->flags |= REDIS_NODE_FAIL; failing->fail_time = time(NULL); failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } } else { redisLog(REDIS_NOTICE, @@ -1185,7 +1171,8 @@ int clusterProcessPacket(clusterLink *link) { robj *channel, *message; uint32_t channel_len, message_len; - /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); @@ -1212,7 +1199,7 @@ int clusterProcessPacket(clusterLink *link) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ - server.cluster->handle_slave_failover_asap++; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); @@ -1673,6 +1660,9 @@ void clusterHandleSlaveFailover(void) { server.cluster->currentEpoch); clusterRequestFailoverAuth(); server.cluster->failover_auth_sent = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); return; /* Wait for replies. */ } @@ -1706,7 +1696,7 @@ void clusterHandleSlaveFailover(void) { /* 4) Update state and save config. */ clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterSaveConfigOrDie(1); /* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ @@ -1878,12 +1868,30 @@ void clusterCron(void) { /* This function is called before the event handler returns to sleep for * events. It is useful to perform operations that must be done ASAP in * reaction to events fired but that are not safe to perform inside event - * handlers. */ + * handlers, or to perform potentially expansive tasks that we need to do + * a single time before replying to clients. */ void clusterBeforeSleep(void) { - if (server.cluster->handle_slave_failover_asap) { + /* Handle failover, this is needed when it is likely that there is already + * the quorum from masters in order to react fast. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER) clusterHandleSlaveFailover(); - server.cluster->handle_slave_failover_asap = 0; + + /* Update the cluster state. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE) + clusterUpdateState(); + + /* Save the config, possibly using fsync. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) { + int fsync = server.cluster->todo_before_sleep & CLUSTER_TODO_FSYNC_CONFIG; + clusterSaveConfigOrDie(fsync); } + + /* Reset our flags. */ + server.cluster->todo_before_sleep = 0; +} + +void clusterDoBeforeSleep(int flags) { + server.cluster->todo_before_sleep |= flags; } /* ----------------------------------------------------------------------------- @@ -2097,7 +2105,7 @@ int verifyClusterConfigWithData(void) { server.cluster->importing_slots_from[j] = server.cluster->slots[j]; } } - if (update_config) clusterSaveConfigOrDie(); + if (update_config) clusterSaveConfigOrDie(1); return REDIS_OK; } @@ -2296,8 +2304,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNodeSlots(server.cluster->myself); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) @@ -2347,8 +2354,7 @@ void clusterCommand(redisClient *c) { } } zfree(slots); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { /* SETSLOT 10 MIGRATING */ @@ -2424,8 +2430,7 @@ void clusterCommand(redisClient *c) { addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); return; } - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { /* CLUSTER INFO */ @@ -2474,7 +2479,7 @@ void clusterCommand(redisClient *c) { addReplySds(c,info); addReply(c,shared.crlf); } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(); + int retval = clusterSaveConfig(1); if (retval == 0) addReply(c,shared.ok); @@ -2526,8 +2531,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNode(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { /* CLUSTER REPLICATE */ @@ -2562,8 +2566,7 @@ void clusterCommand(redisClient *c) { /* Set the master. */ clusterSetMaster(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else { addReplyError(c,"Wrong CLUSTER subcommand or number of arguments"); diff --git a/src/redis.h b/src/redis.h index 844c4a32..94decca9 100644 --- a/src/redis.h +++ b/src/redis.h @@ -653,11 +653,17 @@ typedef struct { uint64_t failover_auth_epoch; /* Epoch of the current election. */ /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ - int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ + int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ long long stats_bus_messages_received; /* Num of msg received via cluster bus. */ } clusterState; +/* clusterState todo_before_sleep flags. */ +#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0) +#define CLUSTER_TODO_UPDATE_STATE (1<<1) +#define CLUSTER_TODO_SAVE_CONFIG (1<<2) +#define CLUSTER_TODO_FSYNC_CONFIG (1<<3) + /* Redis cluster messages header */ /* Note that the PING, PONG and MEET messages are actually the same exact From 37e06bd95262f69144aae7c85401a9e7d9411b3f Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 12:25:09 +0200 Subject: [PATCH 27/33] PSYNC: safer handling of PSYNC requests. There was a bug that over-esteemed the amount of backlog available, however this could only happen when a slave was asking for an offset that was in the "future" compared to the master replication backlog. Now this case is handled well and logged as an incident in the master log file. --- src/replication.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 0b75f4ee..bcc80b11 100644 --- a/src/replication.c +++ b/src/replication.c @@ -356,10 +356,14 @@ int masterTryPartialResynchronization(redisClient *c) { REDIS_OK) goto need_full_resync; if (!server.repl_backlog || psync_offset < server.repl_backlog_off || - psync_offset >= (server.repl_backlog_off + server.repl_backlog_size)) + psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { redisLog(REDIS_NOTICE, "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); + if (psync_offset > server.master_repl_offset) { + redisLog(REDIS_WARNING, + "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); + } goto need_full_resync; } From b41570f7193b3c3bd81a420cfa234512725a0700 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 12:59:24 +0200 Subject: [PATCH 28/33] Replication: fix master timeout. Since we started sending REPLCONF ACK from slaves to masters, the lastinteraction field of the client structure is always refreshed as soon as there is room in the socket output buffer, so masters in timeout are detected with too much delay (the socket buffer takes a lot of time to be filled by small REPLCONF ACK entries). This commit only counts data received as interactions with a master, solving the issue. --- src/networking.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 23ef11dc..1da5a5a5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -829,7 +829,13 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } } - if (totwritten > 0) c->lastinteraction = server.unixtime; + if (totwritten > 0) { + /* For clients representing masters we don't count sending data + * as an interaction, since we always send REPLCONF ACK commands + * that take some time to just fill the socket output buffer. + * We just rely on data / pings received for timeout detection. */ + if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; + } if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); From 1461422ce66056eb79231e8240dde01db2260fa0 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 16:12:25 +0200 Subject: [PATCH 29/33] Replication: install the write handler when reusing a cached master. Sometimes when we resurrect a cached master after a successful partial resynchronization attempt, there is pending data in the output buffers of the client structure representing the master (likely REPLCONF ACK commands). If we don't reinstall the write handler, it will never be installed again by addReply*() family functions as they'll assume that if there is already data pending, the write handler is already installed. This bug caused some slaves after a successful partial sync to never send REPLCONF ACK, and continuously being detected as timing out by the master, with a disconnection / reconnection loop. --- src/replication.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/replication.c b/src/replication.c index bcc80b11..8102fc2d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1375,6 +1375,16 @@ void replicationResurrectCachedMaster(int newfd) { redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } + + /* We may also need to install the write handler as well if there is + * pending data in the write buffers. */ + if (server.master->bufpos || listLength(server.master->reply)) { + if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, + sendReplyToClient, server.master)) { + redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); + freeClientAsync(server.master); /* Close ASAP. */ + } + } } /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ From 72f38cd70f680fcc8b6f952b2592404a80af46c9 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 11:30:58 +0200 Subject: [PATCH 30/33] Cluster: slave nodes advertise master slots bitmap and configEpoch. --- src/cluster.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 403ccf38..bee1b7b5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1339,12 +1339,21 @@ void clusterBroadcastMessage(void *buf, size_t len) { /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { int totlen = 0; + clusterNode *master; + + /* If this node is a master, we send its slots bitmap and configEpoch. + * If this node is a slave we send the master's information instead (the + * node is flagged as slave so the receiver knows that it is NOT really + * in charge for this slots. */ + master = (server.cluster->myself->flags & REDIS_NODE_SLAVE && + server.cluster->myself->slaveof) ? + server.cluster->myself->slaveof : server.cluster->myself; memset(hdr,0,sizeof(*hdr)); hdr->type = htons(type); memcpy(hdr->sender,server.cluster->myself->name,REDIS_CLUSTER_NAMELEN); - memcpy(hdr->myslots,server.cluster->myself->slots, - sizeof(hdr->myslots)); + + memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN); if (server.cluster->myself->slaveof != NULL) { memcpy(hdr->slaveof,server.cluster->myself->slaveof->name, @@ -1354,13 +1363,9 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { hdr->flags = htons(server.cluster->myself->flags); hdr->state = server.cluster->state; - /* Set the currentEpoch and configEpochs. Note that configEpoch is - * set to the master configEpoch if this node is a slave. */ + /* Set the currentEpoch and configEpochs. */ hdr->currentEpoch = htonu64(server.cluster->currentEpoch); - if (server.cluster->myself->flags & REDIS_NODE_SLAVE) - hdr->configEpoch = htonu64(server.cluster->myself->slaveof->configEpoch); - else - hdr->configEpoch = htonu64(server.cluster->myself->configEpoch); + hdr->configEpoch = htonu64(master->configEpoch); if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); From 2c3301b9f539e3cab9f41d684eba3e12bbd0dadb Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 15:44:58 +0200 Subject: [PATCH 31/33] Cluster: log message improved when FAIL is cleared from a slave node. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index bee1b7b5..6939bac3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -681,7 +681,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * node again. */ if (node->flags & REDIS_NODE_SLAVE) { redisLog(REDIS_NOTICE, - "Clear FAIL state for node %.40s: slave is already reachable.", + "Clear FAIL state for node %.40s: slave is reachable again.", node->name); node->flags &= ~REDIS_NODE_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); From f7d6ad436681963f6bbada2552e67f2c5233495a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 16:07:13 +0200 Subject: [PATCH 32/33] Cluster: fix slave data age computation when master is still connected. --- src/cluster.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 6939bac3..33212072 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1607,11 +1607,18 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { - time_t data_age = server.unixtime - server.repl_down_since; + time_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Set data_age to the number of seconds we are disconnected from the master. */ + if (server.repl_state == REDIS_REPL_CONNECTED) { + data_age = server.unixtime - server.master->lastinteraction; + } else { + data_age = server.unixtime - server.repl_down_since; + } + /* Pre conditions to run the function: * 1) We are a slave. * 2) Our master is flagged as FAIL. From ae2763f564589c1b68b085ad6f9e68f657ba93d7 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 8 Oct 2013 12:45:35 +0200 Subject: [PATCH 33/33] Cluster: masters don't vote for a slave with stale config. When a slave requests our vote, the configEpoch he claims for its master and the set of served slots must be greater or equal to the configEpoch of the nodes serving these slots in the current configuraiton of the master granting its vote. In other terms, masters don't vote for slaves having a stale configuration for the slots they want to serve. --- src/cluster.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 33212072..23d4196d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1567,7 +1567,10 @@ void clusterSendFailoverAuth(clusterNode *node) { /* Vote for the node asking for our vote if there are the conditions. */ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { clusterNode *master = node->slaveof; - uint64_t requestEpoch = ntohu64(request->currentEpoch); + uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); + uint64_t requestConfigEpoch = ntohu64(request->configEpoch); + unsigned char *claimed_slots = request->myslots; + int j; /* IF we are not a master serving at least 1 slot, we don't have the * right to vote, as the cluster size in Redis Cluster is the number @@ -1576,7 +1579,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (server.cluster->myself->numslots == 0) return; /* Request epoch must be >= our currentEpoch. */ - if (requestEpoch < server.cluster->currentEpoch) return; + if (requestCurrentEpoch < server.cluster->currentEpoch) return; /* I already voted for this epoch? Return ASAP. */ if (server.cluster->last_vote_epoch == server.cluster->currentEpoch) return; @@ -1592,6 +1595,19 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (server.unixtime - node->slaveof->voted_time < server.cluster_node_timeout * 2) return; + /* The slave requesting the vote must have a configEpoch for the claimed slots + * that is >= the one of the masters currently serving the same slots in the + * current configuration. */ + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + if (server.cluster->slots[j] == NULL || + server.cluster->slots[j]->configEpoch <= requestConfigEpoch) continue; + /* If we reached this point we found a slot that in our current slots + * is served by a master with a greater configEpoch than the one claimed + * by the slave requesting our vote. Refuse to vote for this slave. */ + return; + } + /* We can vote for this slave. */ clusterSendFailoverAuth(node); server.cluster->last_vote_epoch = server.cluster->currentEpoch; @@ -1910,7 +1926,7 @@ void clusterDoBeforeSleep(int flags) { * Slots management * -------------------------------------------------------------------------- */ -/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is zet, +/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set, * otherwise 0. */ int bitmapTestBit(unsigned char *bitmap, int pos) { off_t byte = pos/8;