Cluster: time switched from seconds to milliseconds.

All the internal state of cluster involving time is now using mstime_t
and mstime() in order to use milliseconds resolution.

Also the clusterCron() function is called with a 10 hz frequency instead
of 1 hz.

The cluster node_timeout must be also configured in milliseconds by the
user in redis.conf.
This commit is contained in:
antirez 2013-10-09 16:18:33 +02:00
parent 929b6a4480
commit ba42428633
5 changed files with 43 additions and 47 deletions

View File

@ -134,7 +134,7 @@ int clusterLoadConfig(char *filename) {
n->flags |= REDIS_NODE_PFAIL; n->flags |= REDIS_NODE_PFAIL;
} else if (!strcasecmp(s,"fail")) { } else if (!strcasecmp(s,"fail")) {
n->flags |= REDIS_NODE_FAIL; n->flags |= REDIS_NODE_FAIL;
n->fail_time = time(NULL); n->fail_time = mstime();
} else if (!strcasecmp(s,"handshake")) { } else if (!strcasecmp(s,"handshake")) {
n->flags |= REDIS_NODE_HANDSHAKE; n->flags |= REDIS_NODE_HANDSHAKE;
} else if (!strcasecmp(s,"noaddr")) { } else if (!strcasecmp(s,"noaddr")) {
@ -160,8 +160,8 @@ int clusterLoadConfig(char *filename) {
} }
/* Set ping sent / pong received timestamps */ /* Set ping sent / pong received timestamps */
if (atoi(argv[4])) n->ping_sent = time(NULL); if (atoi(argv[4])) n->ping_sent = mstime();
if (atoi(argv[5])) n->pong_received = time(NULL); if (atoi(argv[5])) n->pong_received = mstime();
/* Set configEpoch for this node. */ /* Set configEpoch for this node. */
n->configEpoch = strtoull(argv[6],NULL,10); n->configEpoch = strtoull(argv[6],NULL,10);
@ -310,7 +310,7 @@ void clusterInit(void) {
clusterLink *createClusterLink(clusterNode *node) { clusterLink *createClusterLink(clusterNode *node) {
clusterLink *link = zmalloc(sizeof(*link)); clusterLink *link = zmalloc(sizeof(*link));
link->ctime = time(NULL); link->ctime = mstime();
link->sndbuf = sdsempty(); link->sndbuf = sdsempty();
link->rcvbuf = sdsempty(); link->rcvbuf = sdsempty();
link->node = node; link->node = node;
@ -389,7 +389,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN); memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
else else
getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
node->ctime = time(NULL); node->ctime = mstime();
node->configEpoch = 0; node->configEpoch = 0;
node->flags = flags; node->flags = flags;
memset(node->slots,0,sizeof(node->slots)); memset(node->slots,0,sizeof(node->slots));
@ -430,7 +430,7 @@ int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
while ((ln = listNext(&li)) != NULL) { while ((ln = listNext(&li)) != NULL) {
fr = ln->value; fr = ln->value;
if (fr->node == sender) { if (fr->node == sender) {
fr->time = time(NULL); fr->time = mstime();
return 0; return 0;
} }
} }
@ -438,7 +438,7 @@ int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
/* Otherwise create a new report. */ /* Otherwise create a new report. */
fr = zmalloc(sizeof(*fr)); fr = zmalloc(sizeof(*fr));
fr->node = sender; fr->node = sender;
fr->time = time(NULL); fr->time = mstime();
listAddNodeTail(l,fr); listAddNodeTail(l,fr);
return 1; return 1;
} }
@ -453,9 +453,9 @@ void clusterNodeCleanupFailureReports(clusterNode *node) {
listNode *ln; listNode *ln;
listIter li; listIter li;
clusterNodeFailReport *fr; clusterNodeFailReport *fr;
time_t maxtime = server.cluster_node_timeout * mstime_t maxtime = server.cluster_node_timeout *
REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT; REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT;
time_t now = time(NULL); mstime_t now = mstime();
listRewind(l,&li); listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) { while ((ln = listNext(&li)) != NULL) {
@ -663,7 +663,7 @@ void markNodeAsFailingIfNeeded(clusterNode *node) {
/* Mark the node as failing. */ /* Mark the node as failing. */
node->flags &= ~REDIS_NODE_PFAIL; node->flags &= ~REDIS_NODE_PFAIL;
node->flags |= REDIS_NODE_FAIL; node->flags |= REDIS_NODE_FAIL;
node->fail_time = time(NULL); node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other /* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */ * reachable nodes to flag the node as FAIL. */
@ -676,7 +676,7 @@ void markNodeAsFailingIfNeeded(clusterNode *node) {
* to reach it again. It checks if there are the conditions to undo the FAIL * to reach it again. It checks if there are the conditions to undo the FAIL
* state. */ * state. */
void clearNodeFailureIfNeeded(clusterNode *node) { void clearNodeFailureIfNeeded(clusterNode *node) {
time_t now = time(NULL); time_t now = mstime();
redisAssert(node->flags & REDIS_NODE_FAIL); redisAssert(node->flags & REDIS_NODE_FAIL);
@ -691,17 +691,13 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
} }
/* If it is a master and... /* If it is a master and...
* 1) The FAIL state is old enough. We use our node timeout multiplicator * 1) The FAIL state is old enough.
* plus some additional fixed time. The additional time is useful when
* the node timeout is extremely short and the reaction time of
* the cluster may be longer, so wait at least a few seconds always.
* 2) It is yet serving slots from our point of view (not failed over). * 2) It is yet serving slots from our point of view (not failed over).
* Apparently no one is going to fix these slots, clear the FAIL flag. */ * Apparently no one is going to fix these slots, clear the FAIL flag. */
if (node->flags & REDIS_NODE_MASTER && if (node->flags & REDIS_NODE_MASTER &&
node->numslots > 0 && node->numslots > 0 &&
(now - node->fail_time) > (now - node->fail_time) >
(server.cluster_node_timeout * REDIS_CLUSTER_FAIL_UNDO_TIME_MULT + (server.cluster_node_timeout * REDIS_CLUSTER_FAIL_UNDO_TIME_MULT))
REDIS_CLUSTER_FAIL_UNDO_TIME_ADD))
{ {
redisLog(REDIS_NOTICE, redisLog(REDIS_NOTICE,
"Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.", "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
@ -1015,7 +1011,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Update our info about the node */ /* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) { if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = time(NULL); link->node->pong_received = mstime();
link->node->ping_sent = 0; link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external /* The PFAIL condition can be reversed without external
@ -1161,7 +1157,7 @@ int clusterProcessPacket(clusterLink *link) {
"FAIL message received from %.40s about %.40s", "FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename); hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= REDIS_NODE_FAIL; failing->flags |= REDIS_NODE_FAIL;
failing->fail_time = time(NULL); failing->fail_time = mstime();
failing->flags &= ~REDIS_NODE_PFAIL; failing->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
} }
@ -1393,7 +1389,7 @@ void clusterSendPing(clusterLink *link, int type) {
int freshnodes = dictSize(server.cluster->nodes)-2; int freshnodes = dictSize(server.cluster->nodes)-2;
if (link->node && type == CLUSTERMSG_TYPE_PING) if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = time(NULL); link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type); clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */ /* Populate the gossip fields */
@ -1595,8 +1591,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
/* We did not voted for a slave about this master for two /* We did not voted for a slave about this master for two
* times the node timeout. This is not strictly needed for correctness * times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear. */ * of the algorithm but makes the base case more linear. */
if (server.unixtime - node->slaveof->voted_time < if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
server.cluster_node_timeout * 2) return; return;
/* The slave requesting the vote must have a configEpoch for the claimed slots /* The slave requesting the vote must have a configEpoch for the claimed slots
* that is >= the one of the masters currently serving the same slots in the * that is >= the one of the masters currently serving the same slots in the
@ -1614,7 +1610,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
/* We can vote for this slave. */ /* We can vote for this slave. */
clusterSendFailoverAuth(node); clusterSendFailoverAuth(node);
server.cluster->last_vote_epoch = server.cluster->currentEpoch; server.cluster->last_vote_epoch = server.cluster->currentEpoch;
node->slaveof->voted_time = server.unixtime; node->slaveof->voted_time = mstime();
} }
/* This function is called if we are a slave node and our master serving /* This function is called if we are a slave node and our master serving
@ -1626,16 +1622,16 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
* 3) Perform the failover informing all the other nodes. * 3) Perform the failover informing all the other nodes.
*/ */
void clusterHandleSlaveFailover(void) { void clusterHandleSlaveFailover(void) {
time_t data_age; mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time; mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1; int needed_quorum = (server.cluster->size / 2) + 1;
int j; int j;
/* Set data_age to the number of seconds we are disconnected from the master. */ /* Set data_age to the number of seconds we are disconnected from the master. */
if (server.repl_state == REDIS_REPL_CONNECTED) { if (server.repl_state == REDIS_REPL_CONNECTED) {
data_age = server.unixtime - server.master->lastinteraction; data_age = server.unixtime - server.master->lastinteraction * 1000;
} else { } else {
data_age = server.unixtime - server.repl_down_since; data_age = server.unixtime - server.repl_down_since * 1000;
} }
/* Pre conditions to run the function: /* Pre conditions to run the function:
@ -1663,11 +1659,11 @@ void clusterHandleSlaveFailover(void) {
/* Compute the time at which we can start an election. */ /* Compute the time at which we can start an election. */
if (server.cluster->failover_auth_time == 0 || if (server.cluster->failover_auth_time == 0 ||
auth_age > auth_age >
server.cluster_node_timeout * 1000 * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) server.cluster_node_timeout * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT)
{ {
server.cluster->failover_auth_time = mstime() + server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
data_age * 100 + /* Add 100 milliseconds for every second of age. */ data_age / 10 + /* Add 100 milliseconds for every second of age. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0; server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_sent = 0;
@ -1680,7 +1676,7 @@ void clusterHandleSlaveFailover(void) {
if (mstime() < server.cluster->failover_auth_time) return; if (mstime() < server.cluster->failover_auth_time) return;
/* Return ASAP if the election is too old to be valid. */ /* Return ASAP if the election is too old to be valid. */
if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout * 1000) if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout)
return; return;
/* Ask for votes if needed. */ /* Ask for votes if needed. */
@ -1739,12 +1735,12 @@ void clusterHandleSlaveFailover(void) {
* CLUSTER cron job * CLUSTER cron job
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
/* This is executed 1 time every second */ /* This is executed 10 times every second */
void clusterCron(void) { void clusterCron(void) {
dictIterator *di; dictIterator *di;
dictEntry *de; dictEntry *de;
int j, update_state = 0; int j, update_state = 0;
time_t min_pong = 0; mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL; clusterNode *min_pong_node = NULL;
/* Check if we have disconnected nodes and re-establish the connection. */ /* Check if we have disconnected nodes and re-establish the connection. */
@ -1757,7 +1753,7 @@ void clusterCron(void) {
/* A Node in HANDSHAKE state has a limited lifespan equal to the /* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */ * configured node timeout. */
if (node->flags & REDIS_NODE_HANDSHAKE && if (node->flags & REDIS_NODE_HANDSHAKE &&
server.unixtime - node->ctime > server.cluster_node_timeout) now - node->ctime > server.cluster_node_timeout)
{ {
freeClusterNode(node); freeClusterNode(node);
continue; continue;
@ -1765,7 +1761,7 @@ void clusterCron(void) {
if (node->link == NULL) { if (node->link == NULL) {
int fd; int fd;
time_t old_ping_sent; mstime_t old_ping_sent;
clusterLink *link; clusterLink *link;
fd = anetTcpNonBlockConnect(server.neterr, node->ip, fd = anetTcpNonBlockConnect(server.neterr, node->ip,
@ -1804,7 +1800,7 @@ void clusterCron(void) {
/* Ping some random node. Check a few random nodes and ping the one with /* Ping some random node. Check a few random nodes and ping the one with
* the oldest pong_received time */ * the oldest pong_received time */
for (j = 0; j < 5; j++) { for (j = 0; j < 2; j++) {
de = dictGetRandomKey(server.cluster->nodes); de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de); clusterNode *this = dictGetVal(de);
@ -1825,7 +1821,7 @@ void clusterCron(void) {
di = dictGetSafeIterator(server.cluster->nodes); di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de); clusterNode *node = dictGetVal(de);
time_t now = time(NULL); now = mstime(); /* Use an updated time at every iteration. */
int delay; int delay;
if (node->flags & if (node->flags &
@ -1836,7 +1832,7 @@ void clusterCron(void) {
* timeout, reconnect the link: maybe there is a connection * timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */ * issue even if the node is alive. */
if (node->link && /* is connected */ if (node->link && /* is connected */
time(NULL) - node->link->ctime > now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */ server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */ node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */ node->pong_received < node->ping_sent && /* still waiting pong */

View File

@ -14,7 +14,7 @@
/* The following defines are amunt of time, sometimes expressed as /* The following defines are amunt of time, sometimes expressed as
* multiplicators of the node timeout value (when ending with MULT). */ * multiplicators of the node timeout value (when ending with MULT). */
#define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15 #define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15000
#define REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */ #define REDIS_CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */
#define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */
#define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */
@ -26,7 +26,7 @@ struct clusterNode;
/* clusterLink encapsulates everything needed to talk with a remote node. */ /* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink { typedef struct clusterLink {
time_t ctime; /* Link creation time */ mstime_t ctime; /* Link creation time */
int fd; /* TCP socket file descriptor */ int fd; /* TCP socket file descriptor */
sds sndbuf; /* Packet send buffer */ sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */ sds rcvbuf; /* Packet reception buffer */
@ -48,11 +48,11 @@ typedef struct clusterLink {
/* This structure represent elements of node->fail_reports. */ /* This structure represent elements of node->fail_reports. */
struct clusterNodeFailReport { struct clusterNodeFailReport {
struct clusterNode *node; /* Node reporting the failure condition. */ struct clusterNode *node; /* Node reporting the failure condition. */
time_t time; /* Time of the last report from this node. */ mstime_t time; /* Time of the last report from this node. */
} typedef clusterNodeFailReport; } typedef clusterNodeFailReport;
struct clusterNode { struct clusterNode {
time_t ctime; /* Node object creation time. */ mstime_t ctime; /* Node object creation time. */
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* REDIS_NODE_... */ int flags; /* REDIS_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */ uint64_t configEpoch; /* Last configEpoch observed for this node */
@ -61,10 +61,10 @@ struct clusterNode {
int numslaves; /* Number of slave nodes, if this is a master */ int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */ struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node */ struct clusterNode *slaveof; /* pointer to the master node */
time_t ping_sent; /* Unix time we sent latest ping */ mstime_t ping_sent; /* Unix time we sent latest ping */
time_t pong_received; /* Unix time we received the pong */ mstime_t pong_received; /* Unix time we received the pong */
time_t fail_time; /* Unix time when FAIL flag was set */ mstime_t fail_time; /* Unix time when FAIL flag was set */
time_t voted_time; /* Last time we voted for a slave of this master */ mstime_t voted_time; /* Last time we voted for a slave of this master */
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */ int port; /* Latest known port of this node */
clusterLink *link; /* TCP/IP link with this node */ clusterLink *link; /* TCP/IP link with this node */

View File

@ -416,7 +416,7 @@ void loadServerConfigFromString(char *config) {
zfree(server.cluster_configfile); zfree(server.cluster_configfile);
server.cluster_configfile = zstrdup(argv[1]); server.cluster_configfile = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"cluster-node-timeout") && argc == 2) { } else if (!strcasecmp(argv[0],"cluster-node-timeout") && argc == 2) {
server.cluster_node_timeout = atoi(argv[1]); server.cluster_node_timeout = strtoll(argv[1],NULL,10);
if (server.cluster_node_timeout <= 0) { if (server.cluster_node_timeout <= 0) {
err = "cluster node timeout must be 1 or greater"; goto loaderr; err = "cluster node timeout must be 1 or greater"; goto loaderr;
} }

View File

@ -1155,7 +1155,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
run_with_period(1000) replicationCron(); run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */ /* Run the Redis Cluster cron. */
run_with_period(1000) { run_with_period(100) {
if (server.cluster_enabled) clusterCron(); if (server.cluster_enabled) clusterCron();
} }

View File

@ -768,7 +768,7 @@ struct redisServer {
xor of REDIS_NOTIFY... flags. */ xor of REDIS_NOTIFY... flags. */
/* Cluster */ /* Cluster */
int cluster_enabled; /* Is cluster enabled? */ int cluster_enabled; /* Is cluster enabled? */
int cluster_node_timeout; /* Cluster node timeout. */ mstime_t cluster_node_timeout; /* Cluster node timeout. */
char *cluster_configfile; /* Cluster auto-generated config file name. */ char *cluster_configfile; /* Cluster auto-generated config file name. */
struct clusterState *cluster; /* State of the cluster */ struct clusterState *cluster; /* State of the cluster */
/* Scripting */ /* Scripting */