mirror of
https://github.com/fluencelabs/redis
synced 2025-03-23 02:50:50 +00:00
Cluster: introduced a failure reports system.
A §Redis Cluster node used to mark a node as failing when itself detected a failure for that node, and a single acknowledge was received about the possible failure state. The new API will be used in order to possible to require that N other nodes have a PFAIL or FAIL state for a given node for a node to set it as failing.
This commit is contained in:
parent
36af851550
commit
974929770b
@ -336,9 +336,69 @@ clusterNode *createClusterNode(char *nodename, int flags) {
|
||||
node->link = NULL;
|
||||
memset(node->ip,0,sizeof(node->ip));
|
||||
node->port = 0;
|
||||
node->fail_reports = listCreate();
|
||||
listSetFreeMethod(node->fail_reports,zfree);
|
||||
return node;
|
||||
}
|
||||
|
||||
/* This function is called every time we get a failure report from a node.
|
||||
* The side effect is to populate the fail_reports list (or to update
|
||||
* the timestamp of an existing report).
|
||||
*
|
||||
* 'failing' is the node that is in failure state according to the
|
||||
* 'sender' node. */
|
||||
void clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
|
||||
list *l = failing->fail_reports;
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
clusterNodeFailReport *fr;
|
||||
|
||||
/* If a failure report from the same sender already exists, just update
|
||||
* the timestamp. */
|
||||
listRewind(l,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
fr = ln->value;
|
||||
if (fr->node == sender) {
|
||||
fr->time = time(NULL);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Otherwise create a new report. */
|
||||
fr = zmalloc(sizeof(*fr));
|
||||
fr->node = sender;
|
||||
fr->time = time(NULL);
|
||||
listAddNodeTail(l,fr);
|
||||
}
|
||||
|
||||
/* Remove failure reports that are too old, where too old means reasonably
|
||||
* older than the global node timeout. Note that anyway for a node to be
|
||||
* flagged as FAIL we need to have a local PFAIL state that is at least
|
||||
* older than the global node timeout, so we don't just trust the number
|
||||
* of failure reports from other nodes. */
|
||||
void clusterNodeCleanupFailureReports(clusterNode *node) {
|
||||
list *l = node->fail_reports;
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
clusterNodeFailReport *fr;
|
||||
time_t maxtime = server.cluster->node_timeout*2;
|
||||
time_t now = time(NULL);
|
||||
|
||||
listRewind(l,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
fr = ln->value;
|
||||
if (now - fr->time > maxtime) listDelNode(l,ln);
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the number of external nodes that believe 'node' is failing,
|
||||
* not including this node, that may have a PFAIL or FAIL state for this
|
||||
* node as well. */
|
||||
int clusterNodeFailureReportsCount(clusterNode *node) {
|
||||
clusterNodeCleanupFailureReports(node);
|
||||
return listLength(node->fail_reports);
|
||||
}
|
||||
|
||||
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
|
||||
int j;
|
||||
|
||||
@ -373,12 +433,13 @@ void clusterNodeResetSlaves(clusterNode *n) {
|
||||
|
||||
void freeClusterNode(clusterNode *n) {
|
||||
sds nodename;
|
||||
|
||||
|
||||
nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN);
|
||||
redisAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
|
||||
sdsfree(nodename);
|
||||
if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n);
|
||||
if (n->link) freeClusterLink(n->link);
|
||||
listRelease(n->fail_reports);
|
||||
zfree(n);
|
||||
}
|
||||
|
||||
@ -1172,6 +1233,7 @@ void clusterUpdateState(void) {
|
||||
int ok = 1;
|
||||
int j;
|
||||
|
||||
/* Check if all the slots are covered. */
|
||||
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
|
||||
if (server.cluster->slots[j] == NULL ||
|
||||
server.cluster->slots[j]->flags & (REDIS_NODE_FAIL))
|
||||
@ -1180,6 +1242,8 @@ void clusterUpdateState(void) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Update cluster->state accordingly. */
|
||||
if (ok) {
|
||||
if (server.cluster->state == REDIS_CLUSTER_NEEDHELP) {
|
||||
server.cluster->state = REDIS_CLUSTER_NEEDHELP;
|
||||
|
@ -543,6 +543,12 @@ typedef struct clusterLink {
|
||||
#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */
|
||||
#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
|
||||
|
||||
/* This structure represent elements of node->fail_reports. */
|
||||
struct clusterNodeFailReport {
|
||||
struct clusterNode *node; /* Node reporting the failure condition. */
|
||||
time_t time; /* Time of the last report from this node. */
|
||||
} typedef clusterNodeFailReport;
|
||||
|
||||
struct clusterNode {
|
||||
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
|
||||
int flags; /* REDIS_NODE_... */
|
||||
@ -557,6 +563,7 @@ struct clusterNode {
|
||||
char ip[16]; /* Latest known IP address of this node */
|
||||
int port; /* Latest known port of this node */
|
||||
clusterLink *link; /* TCP/IP link with this node */
|
||||
list *fail_reports; /* List of nodes signaling this as failing */
|
||||
};
|
||||
typedef struct clusterNode clusterNode;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user