diff --git a/src/cluster.c b/src/cluster.c index 9c0d3e40..193c654e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -134,7 +134,7 @@ int clusterLoadConfig(char *filename) { n->flags |= REDIS_NODE_PFAIL; } else if (!strcasecmp(s,"fail")) { n->flags |= REDIS_NODE_FAIL; - n->fail_time = time(NULL); + n->fail_time = mstime(); } else if (!strcasecmp(s,"handshake")) { n->flags |= REDIS_NODE_HANDSHAKE; } else if (!strcasecmp(s,"noaddr")) { @@ -160,8 +160,8 @@ int clusterLoadConfig(char *filename) { } /* Set ping sent / pong received timestamps */ - if (atoi(argv[4])) n->ping_sent = time(NULL); - if (atoi(argv[5])) n->pong_received = time(NULL); + if (atoi(argv[4])) n->ping_sent = mstime(); + if (atoi(argv[5])) n->pong_received = mstime(); /* Set configEpoch for this node. */ n->configEpoch = strtoull(argv[6],NULL,10); @@ -310,7 +310,7 @@ void clusterInit(void) { clusterLink *createClusterLink(clusterNode *node) { clusterLink *link = zmalloc(sizeof(*link)); - link->ctime = time(NULL); + link->ctime = mstime(); link->sndbuf = sdsempty(); link->rcvbuf = sdsempty(); link->node = node; @@ -389,7 +389,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN); else getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); - node->ctime = time(NULL); + node->ctime = mstime(); node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); @@ -430,7 +430,7 @@ int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) { while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (fr->node == sender) { - fr->time = time(NULL); + fr->time = mstime(); return 0; } } @@ -438,7 +438,7 @@ int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) { /* Otherwise create a new report. */ fr = zmalloc(sizeof(*fr)); fr->node = sender; - fr->time = time(NULL); + fr->time = mstime(); listAddNodeTail(l,fr); return 1; } @@ -453,9 +453,9 @@ void clusterNodeCleanupFailureReports(clusterNode *node) { listNode *ln; listIter li; clusterNodeFailReport *fr; - time_t maxtime = server.cluster_node_timeout * + mstime_t maxtime = server.cluster_node_timeout * REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT; - time_t now = time(NULL); + mstime_t now = mstime(); listRewind(l,&li); while ((ln = listNext(&li)) != NULL) { @@ -663,7 +663,7 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { /* Mark the node as failing. */ node->flags &= ~REDIS_NODE_PFAIL; node->flags |= REDIS_NODE_FAIL; - node->fail_time = time(NULL); + node->fail_time = mstime(); /* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ @@ -676,7 +676,7 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { * to reach it again. It checks if there are the conditions to undo the FAIL * state. */ void clearNodeFailureIfNeeded(clusterNode *node) { - time_t now = time(NULL); + time_t now = mstime(); redisAssert(node->flags & REDIS_NODE_FAIL); @@ -691,17 +691,13 @@ void clearNodeFailureIfNeeded(clusterNode *node) { } /* If it is a master and... - * 1) The FAIL state is old enough. We use our node timeout multiplicator - * plus some additional fixed time. The additional time is useful when - * the node timeout is extremely short and the reaction time of - * the cluster may be longer, so wait at least a few seconds always. + * 1) The FAIL state is old enough. * 2) It is yet serving slots from our point of view (not failed over). * Apparently no one is going to fix these slots, clear the FAIL flag. */ if (node->flags & REDIS_NODE_MASTER && node->numslots > 0 && (now - node->fail_time) > - (server.cluster_node_timeout * REDIS_CLUSTER_FAIL_UNDO_TIME_MULT + - REDIS_CLUSTER_FAIL_UNDO_TIME_ADD)) + (server.cluster_node_timeout * REDIS_CLUSTER_FAIL_UNDO_TIME_MULT)) { redisLog(REDIS_NOTICE, "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.", @@ -1015,7 +1011,7 @@ int clusterProcessPacket(clusterLink *link) { /* Update our info about the node */ if (link->node && type == CLUSTERMSG_TYPE_PONG) { - link->node->pong_received = time(NULL); + link->node->pong_received = mstime(); link->node->ping_sent = 0; /* The PFAIL condition can be reversed without external @@ -1161,7 +1157,7 @@ int clusterProcessPacket(clusterLink *link) { "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= REDIS_NODE_FAIL; - failing->fail_time = time(NULL); + failing->fail_time = mstime(); failing->flags &= ~REDIS_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } @@ -1393,7 +1389,7 @@ void clusterSendPing(clusterLink *link, int type) { int freshnodes = dictSize(server.cluster->nodes)-2; if (link->node && type == CLUSTERMSG_TYPE_PING) - link->node->ping_sent = time(NULL); + link->node->ping_sent = mstime(); clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ @@ -1595,8 +1591,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { /* We did not voted for a slave about this master for two * times the node timeout. This is not strictly needed for correctness * of the algorithm but makes the base case more linear. */ - if (server.unixtime - node->slaveof->voted_time < - server.cluster_node_timeout * 2) return; + if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2) + return; /* The slave requesting the vote must have a configEpoch for the claimed slots * that is >= the one of the masters currently serving the same slots in the @@ -1614,7 +1610,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { /* We can vote for this slave. */ clusterSendFailoverAuth(node); server.cluster->last_vote_epoch = server.cluster->currentEpoch; - node->slaveof->voted_time = server.unixtime; + node->slaveof->voted_time = mstime(); } /* This function is called if we are a slave node and our master serving @@ -1626,16 +1622,16 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { - time_t data_age; + mstime_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; /* Set data_age to the number of seconds we are disconnected from the master. */ if (server.repl_state == REDIS_REPL_CONNECTED) { - data_age = server.unixtime - server.master->lastinteraction; + data_age = server.unixtime - server.master->lastinteraction * 1000; } else { - data_age = server.unixtime - server.repl_down_since; + data_age = server.unixtime - server.repl_down_since * 1000; } /* Pre conditions to run the function: @@ -1663,11 +1659,11 @@ void clusterHandleSlaveFailover(void) { /* Compute the time at which we can start an election. */ if (server.cluster->failover_auth_time == 0 || auth_age > - server.cluster_node_timeout * 1000 * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) + server.cluster_node_timeout * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ - data_age * 100 + /* Add 100 milliseconds for every second of age. */ + data_age / 10 + /* Add 100 milliseconds for every second of age. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; @@ -1680,7 +1676,7 @@ void clusterHandleSlaveFailover(void) { if (mstime() < server.cluster->failover_auth_time) return; /* Return ASAP if the election is too old to be valid. */ - if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout * 1000) + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout) return; /* Ask for votes if needed. */ @@ -1739,12 +1735,12 @@ void clusterHandleSlaveFailover(void) { * CLUSTER cron job * -------------------------------------------------------------------------- */ -/* This is executed 1 time every second */ +/* This is executed 10 times every second */ void clusterCron(void) { dictIterator *di; dictEntry *de; int j, update_state = 0; - time_t min_pong = 0; + mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; /* Check if we have disconnected nodes and re-establish the connection. */ @@ -1757,7 +1753,7 @@ void clusterCron(void) { /* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ if (node->flags & REDIS_NODE_HANDSHAKE && - server.unixtime - node->ctime > server.cluster_node_timeout) + now - node->ctime > server.cluster_node_timeout) { freeClusterNode(node); continue; @@ -1765,7 +1761,7 @@ void clusterCron(void) { if (node->link == NULL) { int fd; - time_t old_ping_sent; + mstime_t old_ping_sent; clusterLink *link; fd = anetTcpNonBlockConnect(server.neterr, node->ip, @@ -1804,7 +1800,7 @@ void clusterCron(void) { /* Ping some random node. Check a few random nodes and ping the one with * the oldest pong_received time */ - for (j = 0; j < 5; j++) { + for (j = 0; j < 2; j++) { de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); @@ -1825,7 +1821,7 @@ void clusterCron(void) { di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - time_t now = time(NULL); + now = mstime(); /* Use an updated time at every iteration. */ int delay; if (node->flags & @@ -1836,7 +1832,7 @@ void clusterCron(void) { * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ if (node->link && /* is connected */ - time(NULL) - node->link->ctime > + now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ diff --git a/src/cluster.h b/src/cluster.h index d46b105f..9c598be0 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -14,7 +14,7 @@ /* The following defines are amunt of time, sometimes expressed as * multiplicators of the node timeout value (when ending with MULT). */ -#define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15 +#define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15000 #define REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */ @@ -26,7 +26,7 @@ struct clusterNode; /* clusterLink encapsulates everything needed to talk with a remote node. */ typedef struct clusterLink { - time_t ctime; /* Link creation time */ + mstime_t ctime; /* Link creation time */ int fd; /* TCP socket file descriptor */ sds sndbuf; /* Packet send buffer */ sds rcvbuf; /* Packet reception buffer */ @@ -48,11 +48,11 @@ typedef struct clusterLink { /* This structure represent elements of node->fail_reports. */ struct clusterNodeFailReport { struct clusterNode *node; /* Node reporting the failure condition. */ - time_t time; /* Time of the last report from this node. */ + mstime_t time; /* Time of the last report from this node. */ } typedef clusterNodeFailReport; struct clusterNode { - time_t ctime; /* Node object creation time. */ + mstime_t ctime; /* Node object creation time. */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* REDIS_NODE_... */ uint64_t configEpoch; /* Last configEpoch observed for this node */ @@ -61,10 +61,10 @@ struct clusterNode { int numslaves; /* Number of slave nodes, if this is a master */ struct clusterNode **slaves; /* pointers to slave nodes */ struct clusterNode *slaveof; /* pointer to the master node */ - time_t ping_sent; /* Unix time we sent latest ping */ - time_t pong_received; /* Unix time we received the pong */ - time_t fail_time; /* Unix time when FAIL flag was set */ - time_t voted_time; /* Last time we voted for a slave of this master */ + mstime_t ping_sent; /* Unix time we sent latest ping */ + mstime_t pong_received; /* Unix time we received the pong */ + mstime_t fail_time; /* Unix time when FAIL flag was set */ + mstime_t voted_time; /* Last time we voted for a slave of this master */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; /* TCP/IP link with this node */ diff --git a/src/config.c b/src/config.c index 64f836c7..8bfb208f 100644 --- a/src/config.c +++ b/src/config.c @@ -416,7 +416,7 @@ void loadServerConfigFromString(char *config) { zfree(server.cluster_configfile); server.cluster_configfile = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"cluster-node-timeout") && argc == 2) { - server.cluster_node_timeout = atoi(argv[1]); + server.cluster_node_timeout = strtoll(argv[1],NULL,10); if (server.cluster_node_timeout <= 0) { err = "cluster node timeout must be 1 or greater"; goto loaderr; } diff --git a/src/redis.c b/src/redis.c index bc75b1c9..30348a67 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1155,7 +1155,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ - run_with_period(1000) { + run_with_period(100) { if (server.cluster_enabled) clusterCron(); } diff --git a/src/redis.h b/src/redis.h index 88792f77..a8a68bce 100644 --- a/src/redis.h +++ b/src/redis.h @@ -768,7 +768,7 @@ struct redisServer { xor of REDIS_NOTIFY... flags. */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ - int cluster_node_timeout; /* Cluster node timeout. */ + mstime_t cluster_node_timeout; /* Cluster node timeout. */ char *cluster_configfile; /* Cluster auto-generated config file name. */ struct clusterState *cluster; /* State of the cluster */ /* Scripting */