diff --git a/src/cluster.c b/src/cluster.c index e562c00c..23d4196d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -53,12 +53,31 @@ int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); +void clusterHandleSlaveFailover(void); int bitmapTestBit(unsigned char *bitmap, int pos); +void clusterDoBeforeSleep(int flags); /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ +/* This function is called at startup in order to set the currentEpoch + * (which is not saved on permanent storage) to the greatest configEpoch found + * in the loaded nodes (configEpoch is stored on permanent storage as soon as + * it changes for some node). */ +void clusterSetStartupEpoch() { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->configEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = node->configEpoch; + } + dictReleaseIterator(di); +} + int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); char *line; @@ -117,8 +136,6 @@ int clusterLoadConfig(char *filename) { n->flags |= REDIS_NODE_HANDSHAKE; } else if (!strcasecmp(s,"noaddr")) { n->flags |= REDIS_NODE_NOADDR; - } else if (!strcasecmp(s,"promoted")) { - n->flags |= REDIS_NODE_PROMOTED; } else if (!strcasecmp(s,"noflags")) { /* nothing to do */ } else { @@ -143,8 +160,11 @@ int clusterLoadConfig(char *filename) { if (atoi(argv[4])) n->ping_sent = time(NULL); if (atoi(argv[5])) n->pong_received = time(NULL); + /* Set configEpoch for this node. */ + n->configEpoch = strtoull(argv[6],NULL,10); + /* Populate hash slots served by this instance. */ - for (j = 7; j < argc; j++) { + for (j = 8; j < argc; j++) { int start, stop; if (argv[j][0] == '[') { @@ -189,6 +209,7 @@ int clusterLoadConfig(char *filename) { redisAssert(server.cluster->myself != NULL); redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", server.cluster->myself->name); + clusterSetStartupEpoch(); clusterUpdateState(); return REDIS_OK; @@ -202,13 +223,14 @@ fmterr: * * This function writes the node config and returns 0, on error -1 * is returned. */ -int clusterSaveConfig(void) { +int clusterSaveConfig(int do_fsync) { sds ci = clusterGenNodesDescription(REDIS_NODE_HANDSHAKE); int fd; if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; + if (do_fsync) fsync(fd); close(fd); sdsfree(ci); return 0; @@ -218,8 +240,8 @@ err: return -1; } -void clusterSaveConfigOrDie(void) { - if (clusterSaveConfig() == -1) { +void clusterSaveConfigOrDie(int do_fsync) { + if (clusterSaveConfig(do_fsync) == -1) { redisLog(REDIS_WARNING,"Fatal: can't update cluster config file."); exit(1); } @@ -230,11 +252,16 @@ void clusterInit(void) { server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; + server.cluster->currentEpoch = 0; server.cluster->state = REDIS_CLUSTER_FAIL; server.cluster->size = 1; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_epoch = 0; + server.cluster->last_vote_epoch = 0; + server.cluster->stats_bus_messages_sent = 0; + server.cluster->stats_bus_messages_received = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -251,7 +278,7 @@ void clusterInit(void) { clusterAddNode(server.cluster->myself); saveconf = 1; } - if (saveconf) clusterSaveConfigOrDie(); + if (saveconf) clusterSaveConfigOrDie(1); /* We need a listening TCP port for our cluster messaging needs. */ server.cfd_count = 0; @@ -360,6 +387,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { else getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); node->ctime = time(NULL); + node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->numslots = 0; @@ -372,6 +400,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { memset(node->ip,0,sizeof(node->ip)); node->port = 0; node->fail_reports = listCreate(); + node->voted_time = 0; listSetFreeMethod(node->fail_reports,zfree); return node; } @@ -637,15 +666,13 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { * reachable nodes to flag the node as FAIL. */ if (server.cluster->myself->flags & REDIS_NODE_MASTER) clusterSendFail(node->name); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* This function is called only if a node is marked as FAIL, but we are able * to reach it again. It checks if there are the conditions to undo the FAIL * state. */ void clearNodeFailureIfNeeded(clusterNode *node) { - int changes = 0; time_t now = time(NULL); redisAssert(node->flags & REDIS_NODE_FAIL); @@ -654,10 +681,10 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * node again. */ if (node->flags & REDIS_NODE_SLAVE) { redisLog(REDIS_NOTICE, - "Clear FAIL state for node %.40s: slave is already reachable.", + "Clear FAIL state for node %.40s: slave is reachable again.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* If it is a master and... @@ -677,13 +704,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; - } - - /* Update state and save config. */ - if (changes) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } } @@ -727,7 +748,6 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,"); if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s", @@ -850,8 +870,10 @@ int clusterProcessPacket(clusterLink *link) { uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); uint16_t flags = ntohs(hdr->flags); + uint64_t senderCurrentEpoch, senderConfigEpoch; clusterNode *sender; + server.cluster->stats_bus_messages_received++; redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); @@ -886,11 +908,23 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Process packets by type. */ + /* Check if the sender is a known node. */ sender = clusterLookupNode(hdr->sender); + if (sender && !(sender->flags & REDIS_NODE_HANDSHAKE)) { + /* Update our curretEpoch if we see a newer epoch in the cluster. */ + senderCurrentEpoch = ntohu64(hdr->currentEpoch); + senderConfigEpoch = ntohu64(hdr->configEpoch); + if (senderCurrentEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = senderCurrentEpoch; + /* Update the sender configEpoch if it is publishing a newer one. */ + if (senderConfigEpoch > sender->configEpoch) { + sender->configEpoch = senderConfigEpoch; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); + } + } + /* Process packets by type. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { - int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node); /* Add this node if it is new for us and the msg type is MEET. @@ -904,7 +938,7 @@ int clusterProcessPacket(clusterLink *link) { nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); clusterAddNode(node); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } /* Get info from the gossip section */ @@ -912,18 +946,12 @@ int clusterProcessPacket(clusterLink *link) { /* Anyway reply with a PONG */ clusterSendPing(link,CLUSTERMSG_TYPE_PONG); - - /* Update config if needed */ - if (update_config) clusterSaveConfigOrDie(); } /* PING or PONG: process config information. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { - int update_state = 0; - int update_config = 0; - redisLog(REDIS_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node); @@ -936,8 +964,8 @@ int clusterProcessPacket(clusterLink *link) { "Handshake error: we already know node %.40s, updating the address if needed.", sender->name); if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we alrady have it. This will * cause the link to be freed as well. */ @@ -952,7 +980,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->name); link->node->flags &= ~REDIS_NODE_HANDSHAKE; link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { @@ -964,7 +992,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->ip[0] = '\0'; link->node->port = 0; freeClusterLink(link); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); /* FIXME: remove this node if we already have it. * * If we already have it but the IP is different, use @@ -979,8 +1007,7 @@ int clusterProcessPacket(clusterLink *link) { !(sender->flags & REDIS_NODE_HANDSHAKE) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } /* Update our info about the node */ @@ -996,7 +1023,8 @@ int clusterProcessPacket(clusterLink *link) { * conditions detected by clearNodeFailureIfNeeded(). */ if (link->node->flags & REDIS_NODE_PFAIL) { link->node->flags &= ~REDIS_NODE_PFAIL; - update_state = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } else if (link->node->flags & REDIS_NODE_FAIL) { clearNodeFailureIfNeeded(link->node); } @@ -1009,9 +1037,6 @@ int clusterProcessPacket(clusterLink *link) { { /* Node is a master. */ if (sender->flags & REDIS_NODE_SLAVE) { - /* Slave turned into master! */ - clusterNode *oldmaster = sender->slaveof; - /* Reconfigure node as master. */ if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender); @@ -1019,32 +1044,9 @@ int clusterProcessPacket(clusterLink *link) { sender->flags |= REDIS_NODE_MASTER; sender->slaveof = NULL; - /* If this node used to be our slave, and now has the - * PROMOTED flag set. We'll turn ourself into a slave - * of the new master. */ - if (flags & REDIS_NODE_PROMOTED && - oldmaster == server.cluster->myself) - { - redisLog(REDIS_WARNING,"One of my slaves took my place. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - - /* If we are a slave, and this node used to be a slave - * of our master, and now has the PROMOTED flag set, we - * need to switch our replication setup over it. */ - if (flags & REDIS_NODE_PROMOTED && - server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof == oldmaster) - { - redisLog(REDIS_WARNING,"One of the slaves failed over my master. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } } else { /* Node is a slave. */ @@ -1060,8 +1062,8 @@ int clusterProcessPacket(clusterLink *link) { if (sender->numslaves) clusterNodeResetSlaves(sender); /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Master node changed for this slave? */ @@ -1070,6 +1072,9 @@ int clusterProcessPacket(clusterLink *link) { clusterNodeRemoveSlave(sender->slaveof,sender); clusterNodeAddSlave(master,sender); sender->slaveof = master; + + /* Update config. */ + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } } @@ -1083,55 +1088,91 @@ int clusterProcessPacket(clusterLink *link) { changes = memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0; if (changes) { + clusterNode *curmaster, *newmaster = NULL; + + /* Here we set curmaster to this node or the node this node + * replicates to if it's a slave. In the for loop we are + * interested to check if slots are taken away from curmaster. */ + if (server.cluster->myself->flags & REDIS_NODE_MASTER) + curmaster = server.cluster->myself; + else + curmaster = server.cluster->myself->slaveof; + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots,j)) { - /* If this slot was not served, or served by a node - * in FAIL state, update the table with the new node - * claiming to serve the slot. */ + /* We rebind the slot to the new node claiming it if: + * 1) The slot was unassigned. + * 2) The new node claims it with a greater configEpoch. */ if (server.cluster->slots[j] == sender) continue; if (server.cluster->slots[j] == NULL || - server.cluster->slots[j]->flags & REDIS_NODE_FAIL) + server.cluster->slots[j]->configEpoch < + senderConfigEpoch) { + if (server.cluster->slots[j] == curmaster) + newmaster = sender; clusterDelSlot(j); clusterAddSlot(sender,j); - update_state = update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } } else { /* This node claims to no longer handling the slot, - * however we don't change our config as this is likely - * happening because a resharding is in progress, and - * it already knows where to redirect clients. */ + * however we don't change our config as this is likely: + * 1) Rehashing in progress. + * 2) Failover. + * In both cases we'll be informed about who is serving + * the slot eventually. In the meantime it's up to the + * original owner to try to redirect our clients to the + * right node. */ } } + + /* If at least one slot was reassigned from a node to another node + * with a greater configEpoch, it is possible that: + * 1) We are a master is left without slots. This means that we were + * failed over and we should turn into a replica of the new + * master. + * 2) We are a slave and our master is left without slots. We need + * to replicate to the new slots owner. */ + if (newmaster && curmaster->numslots == 0) { + redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name); + clusterSetMaster(sender); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } } } /* Get info from the gossip section */ clusterProcessGossipSection(hdr,link); - - /* Update the cluster state if needed */ - if (update_state) clusterUpdateState(); - if (update_config) clusterSaveConfigOrDie(); - } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { + } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; - failing = clusterLookupNode(hdr->data.fail.about.nodename); - if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) - { + if (sender) { + failing = clusterLookupNode(hdr->data.fail.about.nodename); + if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) + { + redisLog(REDIS_NOTICE, + "FAIL message received from %.40s about %.40s", + hdr->sender, hdr->data.fail.about.nodename); + failing->flags |= REDIS_NODE_FAIL; + failing->fail_time = time(NULL); + failing->flags &= ~REDIS_NODE_PFAIL; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); + } + } else { redisLog(REDIS_NOTICE, - "FAIL message received from %.40s about %.40s", + "Ignoring FAIL message from unknonw node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); - failing->flags |= REDIS_NODE_FAIL; - failing->fail_time = time(NULL); - failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); } } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; uint32_t channel_len, message_len; - /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); @@ -1145,15 +1186,21 @@ int clusterProcessPacket(clusterLink *link) { } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ - /* If we are not a master, ignore that message at all. */ - if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return 0; clusterSendFailoverAuthIfNeeded(sender,hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ - /* If this is a master, increment the number of acknowledges - * we received so far. */ - if (sender->flags & REDIS_NODE_MASTER) + /* We consider this vote only if the sender is a master serving + * a non zero number of slots, and its currentEpoch is greater or + * equal to epoch where this node started the election. */ + if (sender->flags & REDIS_NODE_MASTER && + sender->numslots > 0 && + senderCurrentEpoch >= server.cluster->failover_auth_epoch) + { server.cluster->failover_auth_count++; + /* Maybe we reached a quorum here, set a flag to make sure + * we check ASAP. */ + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); + } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } @@ -1253,17 +1300,26 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } -/* Put stuff into the send buffer. */ +/* Put stuff into the send buffer. + * + * It is guaranteed that this function will never have as a side effect + * the link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) aeCreateFileEvent(server.el,link->fd,AE_WRITABLE, clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); + server.cluster->stats_bus_messages_sent++; } /* Send a message to all the nodes that are part of the cluster having - * a connected link. */ + * a connected link. + * + * It is guaranteed that this function will never have as a side effect + * some node->link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with node links later. */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de; @@ -1283,12 +1339,21 @@ void clusterBroadcastMessage(void *buf, size_t len) { /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { int totlen = 0; + clusterNode *master; + + /* If this node is a master, we send its slots bitmap and configEpoch. + * If this node is a slave we send the master's information instead (the + * node is flagged as slave so the receiver knows that it is NOT really + * in charge for this slots. */ + master = (server.cluster->myself->flags & REDIS_NODE_SLAVE && + server.cluster->myself->slaveof) ? + server.cluster->myself->slaveof : server.cluster->myself; memset(hdr,0,sizeof(*hdr)); hdr->type = htons(type); memcpy(hdr->sender,server.cluster->myself->name,REDIS_CLUSTER_NAMELEN); - memcpy(hdr->myslots,server.cluster->myself->slots, - sizeof(hdr->myslots)); + + memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN); if (server.cluster->myself->slaveof != NULL) { memcpy(hdr->slaveof,server.cluster->myself->slaveof->name, @@ -1298,12 +1363,16 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { hdr->flags = htons(server.cluster->myself->flags); hdr->state = server.cluster->state; + /* Set the currentEpoch and configEpochs. */ + hdr->currentEpoch = htonu64(server.cluster->currentEpoch); + hdr->configEpoch = htonu64(master->configEpoch); + if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataFail); } hdr->totlen = htonl(totlen); - /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ + /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ } /* Send a PING or PONG packet to the specified node, making sure to add enough @@ -1370,10 +1439,11 @@ void clusterSendPing(clusterLink *link, int type) { clusterSendMessage(link,buf,totlen); } -/* Send a PONG packet to every connected node that's not in handshake state. +/* Send a PONG packet to every connected node that's not in handshake state + * and for which we have a valid link. * - * In Redis Cluster pings are not just used for failure detection, but also - * to carry important configuration informations. So broadcasting a pong is + * In Redis Cluster pongs are not used just for failure detection, but also + * to carry important configuration information. So broadcasting a pong is * useful when something changes in the configuration and we want to make * the cluster aware ASAP (for instance after a slave promotion). */ void clusterBroadcastPong(void) { @@ -1384,6 +1454,7 @@ void clusterBroadcastPong(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); + if (!node->link) continue; if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG); } @@ -1477,14 +1548,11 @@ void clusterRequestFailoverAuth(void) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = mstime(); clusterBroadcastMessage(buf,totlen); } -/* Send a FAILOVER_AUTH_ACK message to the specified node. - * Reqtime is the time field from the original failover auth request packet, - * so that the receiver is able to check the reply age. */ -void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { +/* Send a FAILOVER_AUTH_ACK message to the specified node. */ +void clusterSendFailoverAuth(clusterNode *node) { unsigned char buf[4096]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -1493,47 +1561,57 @@ void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = reqtime; clusterSendMessage(node->link,buf,totlen); } -/* If we believe 'node' is the "first slave" of it's master, reply with - * a FAILOVER_AUTH_GRANTED packet. - * The 'request' field points to the authorization request packet header, we - * need it in order to copy back the 'time' field in our reply. - * - * To be a first slave the sender must: - * 1) Be a slave. - * 2) Its master should be in FAIL state. - * 3) Ordering all the slaves IDs for its master by run-id, it should be the - * first (the smallest) among the ones not in FAIL / PFAIL state. - */ +/* Vote for the node asking for our vote if there are the conditions. */ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { - char first[REDIS_CLUSTER_NAMELEN]; clusterNode *master = node->slaveof; + uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); + uint64_t requestConfigEpoch = ntohu64(request->configEpoch); + unsigned char *claimed_slots = request->myslots; int j; - /* Node is a slave? Its master is down? */ + /* IF we are not a master serving at least 1 slot, we don't have the + * right to vote, as the cluster size in Redis Cluster is the number + * of masters serving at least one slot, and quorum is the cluster size + 1 */ + if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return; + if (server.cluster->myself->numslots == 0) return; + + /* Request epoch must be >= our currentEpoch. */ + if (requestCurrentEpoch < server.cluster->currentEpoch) return; + + /* I already voted for this epoch? Return ASAP. */ + if (server.cluster->last_vote_epoch == server.cluster->currentEpoch) return; + + /* Node must be a slave and its master down. */ if (!(node->flags & REDIS_NODE_SLAVE) || master == NULL || !(master->flags & REDIS_NODE_FAIL)) return; - /* Iterate all the master slaves to check what's the first one. */ - memset(first,0xff,sizeof(first)); - for (j = 0; j < master->numslaves; j++) { - clusterNode *slave = master->slaves[j]; + /* We did not voted for a slave about this master for two + * times the node timeout. This is not strictly needed for correctness + * of the algorithm but makes the base case more linear. */ + if (server.unixtime - node->slaveof->voted_time < + server.cluster_node_timeout * 2) return; - if (slave->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) continue; - if (memcmp(slave->name,first,sizeof(first)) < 0) { - memcpy(first,slave->name,sizeof(first)); - } + /* The slave requesting the vote must have a configEpoch for the claimed slots + * that is >= the one of the masters currently serving the same slots in the + * current configuration. */ + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + if (server.cluster->slots[j] == NULL || + server.cluster->slots[j]->configEpoch <= requestConfigEpoch) continue; + /* If we reached this point we found a slot that in our current slots + * is served by a master with a greater configEpoch than the one claimed + * by the slave requesting our vote. Refuse to vote for this slave. */ + return; } - /* Is 'node' the first slave? */ - if (memcmp(node->name,first,sizeof(first)) != 0) return; - - /* We can send the packet. */ - clusterSendFailoverAuth(node,request->time); + /* We can vote for this slave. */ + clusterSendFailoverAuth(node); + server.cluster->last_vote_epoch = server.cluster->currentEpoch; + node->slaveof->voted_time = server.unixtime; } /* This function is called if we are a slave node and our master serving @@ -1541,16 +1619,37 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * * The gaol of this function is: * 1) To check if we are able to perform a failover, is our data updated? - * 2) Ask reachable masters the authorization to perform the failover. - * 3) Check if there is the majority of masters agreeing we should failover. - * 4) Perform the failover informing all the other nodes. + * 2) Try to get elected by masters. + * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { - time_t data_age = server.unixtime - server.repl_down_since; - time_t auth_age = server.unixtime - server.cluster->failover_auth_time; + time_t data_age; + mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Set data_age to the number of seconds we are disconnected from the master. */ + if (server.repl_state == REDIS_REPL_CONNECTED) { + data_age = server.unixtime - server.master->lastinteraction; + } else { + data_age = server.unixtime - server.repl_down_since; + } + + /* Pre conditions to run the function: + * 1) We are a slave. + * 2) Our master is flagged as FAIL. + * 3) It is serving slots. */ + if (!(server.cluster->myself->flags & REDIS_NODE_SLAVE) || + server.cluster->myself->slaveof == NULL || + !(server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL) || + server.cluster->myself->slaveof->numslots == 0) return; + + /* Remove the node timeout from the data age as it is fine that we are + * disconnected from our master at least for the time it was down to be + * flagged as FAIL, that's the baseline. */ + if (data_age > server.cluster_node_timeout) + data_age -= server.cluster_node_timeout; + /* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. */ @@ -1558,19 +1657,40 @@ void clusterHandleSlaveFailover(void) { server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT) return; - /* TODO: check if we are the first slave as well? Or just rely on the - * master authorization? */ - - /* Ask masters if we are authorized to perform the failover. If there - * is a pending auth request that's too old, reset it. */ + /* Compute the time at which we can start an election. */ if (server.cluster->failover_auth_time == 0 || auth_age > - server.cluster_node_timeout * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) + server.cluster_node_timeout * 1000 * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) { - redisLog(REDIS_WARNING,"Asking masters if I can failover..."); - server.cluster->failover_auth_time = time(NULL); + server.cluster->failover_auth_time = mstime() + + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ + data_age * 100 + /* Add 100 milliseconds for every second of age. */ + random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_sent = 0; + redisLog(REDIS_WARNING,"Start of election delayed for %lld milliseconds.", + server.cluster->failover_auth_time - mstime()); + return; + } + + /* Return ASAP if we can't still start the election. */ + if (mstime() < server.cluster->failover_auth_time) return; + + /* Return ASAP if the election is too old to be valid. */ + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout * 1000) + return; + + /* Ask for votes if needed. */ + if (server.cluster->failover_auth_sent == 0) { + server.cluster->currentEpoch++; + server.cluster->failover_auth_epoch = server.cluster->currentEpoch; + redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.", + server.cluster->currentEpoch); clusterRequestFailoverAuth(); + server.cluster->failover_auth_sent = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); return; /* Wait for replies. */ } @@ -1579,7 +1699,7 @@ void clusterHandleSlaveFailover(void) { clusterNode *oldmaster = server.cluster->myself->slaveof; redisLog(REDIS_WARNING, - "Masters quorum reached: failing over my (failing) master."); + "Failover election won: I'm the new master."); /* We have the quorum, perform all the steps to correctly promote * this slave to a master. * @@ -1588,7 +1708,6 @@ void clusterHandleSlaveFailover(void) { server.cluster->myself); server.cluster->myself->flags &= ~REDIS_NODE_SLAVE; server.cluster->myself->flags |= REDIS_NODE_MASTER; - server.cluster->myself->flags |= REDIS_NODE_PROMOTED; server.cluster->myself->slaveof = NULL; replicationUnsetMaster(); @@ -1600,13 +1719,16 @@ void clusterHandleSlaveFailover(void) { } } - /* 3) Pong all the other nodes so that they can update the state - * accordingly and detect that we switched to master role. */ - clusterBroadcastPong(); + /* 3) Update my configEpoch to the epoch of the election. */ + server.cluster->myself->configEpoch = server.cluster->failover_auth_epoch; /* 4) Update state and save config. */ clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterSaveConfigOrDie(1); + + /* 5) Pong all the other nodes so that they can update the state + * accordingly and detect that we switched to master role. */ + clusterBroadcastPong(); } } @@ -1767,30 +1889,44 @@ void clusterCron(void) { server.cluster->myself->slaveof->port); } - /* If we are a slave and our master is down, but is serving slots, - * call the function that handles the failover. - * This function is called with a small delay in order to let the - * FAIL message to propagate after failure detection, this is not - * strictly required but makes 99.99% of failovers mechanically - * simpler. */ - if (server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof && - server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL && - (server.unixtime - server.cluster->myself->slaveof->fail_time) > - REDIS_CLUSTER_FAILOVER_DELAY && - server.cluster->myself->slaveof->numslots != 0) - { + clusterHandleSlaveFailover(); + if (update_state) clusterUpdateState(); +} + +/* This function is called before the event handler returns to sleep for + * events. It is useful to perform operations that must be done ASAP in + * reaction to events fired but that are not safe to perform inside event + * handlers, or to perform potentially expansive tasks that we need to do + * a single time before replying to clients. */ +void clusterBeforeSleep(void) { + /* Handle failover, this is needed when it is likely that there is already + * the quorum from masters in order to react fast. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER) clusterHandleSlaveFailover(); + + /* Update the cluster state. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE) + clusterUpdateState(); + + /* Save the config, possibly using fsync. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) { + int fsync = server.cluster->todo_before_sleep & CLUSTER_TODO_FSYNC_CONFIG; + clusterSaveConfigOrDie(fsync); } - if (update_state) clusterUpdateState(); + /* Reset our flags. */ + server.cluster->todo_before_sleep = 0; +} + +void clusterDoBeforeSleep(int flags) { + server.cluster->todo_before_sleep |= flags; } /* ----------------------------------------------------------------------------- * Slots management * -------------------------------------------------------------------------- */ -/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is zet, +/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set, * otherwise 0. */ int bitmapTestBit(unsigned char *bitmap, int pos) { off_t byte = pos/8; @@ -1913,8 +2049,14 @@ void clusterUpdateState(void) { } /* If we can't reach at least half the masters, change the cluster state - * as FAIL, as we are not even able to mark nodes as FAIL in this side - * of the netsplit because of lack of majority. */ + * to FAIL, as we are not even able to mark nodes as FAIL in this side + * of the netsplit because of lack of majority. + * + * TODO: when this condition is entered, we should not undo it for some + * (small) time after the majority is reachable again, to make sure that + * other nodes have enough time to inform this node of a configuration change. + * Otherwise a client with an old routing table may write to this node + * and later it may turn into a slave losing the write. */ { int needed_quorum = (server.cluster->size / 2) + 1; @@ -1991,7 +2133,7 @@ int verifyClusterConfigWithData(void) { server.cluster->importing_slots_from[j] = server.cluster->slots[j]; } } - if (update_config) clusterSaveConfigOrDie(); + if (update_config) clusterSaveConfigOrDie(1); return REDIS_OK; } @@ -2011,9 +2153,6 @@ void clusterSetMaster(clusterNode *n) { myself->flags &= ~REDIS_NODE_MASTER; myself->flags |= REDIS_NODE_SLAVE; } - /* Clear the promoted flag anyway if we are a slave, to ensure it will - * be set only when the node turns into a master because of fail over. */ - myself->flags &= ~REDIS_NODE_PROMOTED; myself->slaveof = n; replicationSetMaster(n->ip, n->port); } @@ -2061,7 +2200,6 @@ sds clusterGenNodesDescription(int filter) { if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,"); if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (node->flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; /* Slave of... or just "-" */ @@ -2071,9 +2209,10 @@ sds clusterGenNodesDescription(int filter) { ci = sdscatprintf(ci,"- "); /* Latency from the POV of this node, link status */ - ci = sdscatprintf(ci,"%ld %ld %s", + ci = sdscatprintf(ci,"%ld %ld %llu %s", (long) node->ping_sent, (long) node->pong_received, + (unsigned long long) node->configEpoch, (node->link || node->flags & REDIS_NODE_MYSELF) ? "connected" : "disconnected"); @@ -2193,8 +2332,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNodeSlots(server.cluster->myself); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) @@ -2244,8 +2382,7 @@ void clusterCommand(redisClient *c) { } } zfree(slots); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { /* SETSLOT 10 MIGRATING */ @@ -2321,8 +2458,7 @@ void clusterCommand(redisClient *c) { addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); return; } - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { /* CLUSTER INFO */ @@ -2352,20 +2488,26 @@ void clusterCommand(redisClient *c) { "cluster_slots_fail:%d\r\n" "cluster_known_nodes:%lu\r\n" "cluster_size:%d\r\n" + "cluster_current_epoch:%llu\r\n" + "cluster_stats_messages_sent:%lld\r\n" + "cluster_stats_messages_received:%lld\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, slots_pfail, slots_fail, dictSize(server.cluster->nodes), - server.cluster->size + server.cluster->size, + (unsigned long long) server.cluster->currentEpoch, + server.cluster->stats_bus_messages_sent, + server.cluster->stats_bus_messages_received ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(); + int retval = clusterSaveConfig(1); if (retval == 0) addReply(c,shared.ok); @@ -2417,8 +2559,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNode(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { /* CLUSTER REPLICATE */ @@ -2453,8 +2594,7 @@ void clusterCommand(redisClient *c) { /* Set the master. */ clusterSetMaster(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else { addReplyError(c,"Wrong CLUSTER subcommand or number of arguments"); diff --git a/src/endianconv.h b/src/endianconv.h index 7afe61c6..7c16e175 100644 --- a/src/endianconv.h +++ b/src/endianconv.h @@ -61,4 +61,14 @@ uint64_t intrev64(uint64_t v); #define intrev64ifbe(v) intrev64(v) #endif +/* The functions htonu64() and ntohu64() convert the specified value to + * network byte ordering and back. In big endian systems they are no-ops. */ +#if (BYTE_ORDER == BIG_ENDIAN) +#define htonu64(v) (v) +#define ntohu64(v) (v) +#else +#define htonu64(v) intrev64(v) +#define ntohu64(v) intrev64(v) +#endif + #endif diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 96fdddc1..4b7acea4 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -118,8 +118,8 @@ class ClusterNode nodes.each{|n| # name addr flags role ping_sent ping_recv link_status slots split = n.split - name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6] - slots = split[7..-1] + name,addr,flags,role,ping_sent,ping_recv,config_epoch,link_status = split[0..6] + slots = split[8..-1] info = { :name => name, :addr => addr, @@ -230,7 +230,7 @@ class ClusterNode config = [] @r.cluster("nodes").each_line{|l| s = l.split - slots = s[7..-1].select {|x| x[0..0] != "["} + slots = s[8..-1].select {|x| x[0..0] != "["} next if slots.length == 0 config << s[0]+":"+(slots.sort.join(",")) } diff --git a/src/redis.c b/src/redis.c index 2792f139..bd547cd3 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1203,6 +1203,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); + + /* Call the Redis Cluster before sleep function. */ + if (server.cluster_enabled) clusterBeforeSleep(); } /* =========================== Server initialization ======================== */ diff --git a/src/redis.h b/src/redis.h index 6e19ea3b..94decca9 100644 --- a/src/redis.h +++ b/src/redis.h @@ -368,6 +368,8 @@ * Data types *----------------------------------------------------------------------------*/ +typedef long long mstime_t; /* millisecond time type. */ + /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ @@ -581,7 +583,7 @@ typedef struct redisOpArray { #define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */ #define REDIS_CLUSTER_SLAVE_VALIDITY_MULT 10 /* Slave data validity. */ -#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 1 /* Auth request retry time. */ +#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 4 /* Auth request retry time. */ #define REDIS_CLUSTER_FAILOVER_DELAY 5 /* Seconds */ struct clusterNode; @@ -617,6 +619,7 @@ struct clusterNode { time_t ctime; /* Node object creation time. */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* REDIS_NODE_... */ + uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ @@ -625,6 +628,7 @@ struct clusterNode { time_t ping_sent; /* Unix time we sent latest ping */ time_t pong_received; /* Unix time we received the pong */ time_t fail_time; /* Unix time when FAIL flag was set */ + time_t voted_time; /* Last time we voted for a slave of this master */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; /* TCP/IP link with this node */ @@ -634,6 +638,7 @@ typedef struct clusterNode clusterNode; typedef struct { clusterNode *myself; /* This node */ + uint64_t currentEpoch; int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ int size; /* Num of master nodes with at least one slot */ dict *nodes; /* Hash table of name -> clusterNode structures */ @@ -641,10 +646,24 @@ typedef struct { clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; clusterNode *slots[REDIS_CLUSTER_SLOTS]; zskiplist *slots_to_keys; - int failover_auth_time; /* Time at which we sent the AUTH request. */ - int failover_auth_count; /* Number of authorizations received. */ + /* The following fields are used to take the slave state on elections. */ + mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms*/ + int failover_auth_count; /* Number of votes received so far. */ + int failover_auth_sent; /* True if we already asked for votes. */ + uint64_t failover_auth_epoch; /* Epoch of the current election. */ + /* The followign fields are uesd by masters to take state on elections. */ + uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ + int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ + long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ + long long stats_bus_messages_received; /* Num of msg received via cluster bus. */ } clusterState; +/* clusterState todo_before_sleep flags. */ +#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0) +#define CLUSTER_TODO_UPDATE_STATE (1<<1) +#define CLUSTER_TODO_SAVE_CONFIG (1<<2) +#define CLUSTER_TODO_FSYNC_CONFIG (1<<3) + /* Redis cluster messages header */ /* Note that the PING, PONG and MEET messages are actually the same exact @@ -704,9 +723,9 @@ typedef struct { uint32_t totlen; /* Total length of this message */ uint16_t type; /* Message type */ uint16_t count; /* Only used for some kind of messages. */ - uint64_t time; /* Time at which this request was sent (in milliseconds), - this field is copied in reply messages so that the - original sender knows how old the reply is. */ + uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ + uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch + advertised by its master if it is a slave. */ char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; char slaveof[REDIS_CLUSTER_NAMELEN]; @@ -1367,6 +1386,7 @@ void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); +void clusterBeforeSleep(void); /* Sentinel */ void initSentinelConfig(void); diff --git a/src/replication.c b/src/replication.c index d0d0594a..8102fc2d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -343,7 +343,7 @@ int masterTryPartialResynchronization(redisClient *c) { /* Run id "?" is used by slaves that want to force a full resync. */ if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " - "Runid mismatch (Client asked for '%s', I'm '%s')", + "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); diff --git a/src/sentinel.c b/src/sentinel.c index b257ad68..4bea156d 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -43,8 +43,6 @@ extern char **environ; /* ======================== Sentinel global state =========================== */ -typedef long long mstime_t; /* millisecond time type. */ - /* Address object, used to describe an ip:port pair. */ typedef struct sentinelAddr { char *ip;