mirror of
https://github.com/fluencelabs/redis
synced 2025-04-11 03:36:05 +00:00
Sentinel: leadership handling changes WIP.
Changes to leadership handling. Now the leader gets selected by every Sentinel, for a specified epoch, when the SENTINEL is-master-down-by-addr is sent. This command now includes the runid and the currentEpoch of the instance seeking for a vote. The Sentinel only votes a single time in a given epoch. Still a work in progress, does not even compile at this stage.
This commit is contained in:
parent
0bac36d0a1
commit
8c1bf9a2bd
135
src/sentinel.c
135
src/sentinel.c
@ -173,9 +173,8 @@ typedef struct sentinelRedisInstance {
|
|||||||
char *leader; /* If this is a master instance, this is the runid of
|
char *leader; /* If this is a master instance, this is the runid of
|
||||||
the Sentinel that should perform the failover. If
|
the Sentinel that should perform the failover. If
|
||||||
this is a Sentinel, this is the runid of the Sentinel
|
this is a Sentinel, this is the runid of the Sentinel
|
||||||
that this other Sentinel is voting as leader.
|
that this Sentinel voted as leader. */
|
||||||
This field is valid only if SRI_MASTER_DOWN is
|
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
|
||||||
set on the Sentinel instance. */
|
|
||||||
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
|
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
|
||||||
mstime_t failover_state_change_time;
|
mstime_t failover_state_change_time;
|
||||||
mstime_t failover_start_time; /* When to start to failover if leader. */
|
mstime_t failover_start_time; /* When to start to failover if leader. */
|
||||||
@ -327,6 +326,7 @@ void sentinelScheduleScriptExecution(char *path, ...);
|
|||||||
void sentinelStartFailover(sentinelRedisInstance *master, int state);
|
void sentinelStartFailover(sentinelRedisInstance *master, int state);
|
||||||
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
|
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
|
||||||
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
|
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
|
||||||
|
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
|
||||||
|
|
||||||
/* ========================= Dictionary types =============================== */
|
/* ========================= Dictionary types =============================== */
|
||||||
|
|
||||||
@ -894,6 +894,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
|
|||||||
|
|
||||||
/* Failover state. */
|
/* Failover state. */
|
||||||
ri->leader = NULL;
|
ri->leader = NULL;
|
||||||
|
ri->leader_epoch = 0;
|
||||||
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
|
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
|
||||||
ri->failover_state_change_time = 0;
|
ri->failover_state_change_time = 0;
|
||||||
ri->failover_start_time = 0;
|
ri->failover_start_time = 0;
|
||||||
@ -1031,7 +1032,7 @@ sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, c
|
|||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Simple master lookup by name */
|
/* Master lookup by name */
|
||||||
sentinelRedisInstance *sentinelGetMasterByName(char *name) {
|
sentinelRedisInstance *sentinelGetMasterByName(char *name) {
|
||||||
sentinelRedisInstance *ri;
|
sentinelRedisInstance *ri;
|
||||||
sds sdsname = sdsnew(name);
|
sds sdsname = sdsnew(name);
|
||||||
@ -1041,6 +1042,24 @@ sentinelRedisInstance *sentinelGetMasterByName(char *name) {
|
|||||||
return ri;
|
return ri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Senitnel lookup by runid */
|
||||||
|
sentinelRedisInstance *sentinelGetSentinelByRunid(sentinelRedisInstance *master, char *runid) {
|
||||||
|
sentinelRedisInstance *retval = NULL;
|
||||||
|
dictIterator *di;
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
di = dictGetIterator(master->sentinels);
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||||||
|
if (!strcmp(ri->runid,runid)) {
|
||||||
|
retval = ri;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
/* Add the specified flags to all the instances in the specified dictionary. */
|
/* Add the specified flags to all the instances in the specified dictionary. */
|
||||||
void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
|
void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
@ -1979,11 +1998,13 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
|
|||||||
addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
|
addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
|
||||||
fields++;
|
fields++;
|
||||||
|
|
||||||
if (ri->flags & SRI_MASTER_DOWN) {
|
addReplyBulkCString(c,"voted-leader");
|
||||||
addReplyBulkCString(c,"subjective-leader");
|
|
||||||
addReplyBulkCString(c,ri->leader ? ri->leader : "?");
|
addReplyBulkCString(c,ri->leader ? ri->leader : "?");
|
||||||
fields++;
|
fields++;
|
||||||
}
|
|
||||||
|
addReplyBulkCString(c,"voted-leader-epoch");
|
||||||
|
addReplyBulkLongLong(c,ri->leader_epoch);
|
||||||
|
fields++;
|
||||||
}
|
}
|
||||||
|
|
||||||
setDeferredMultiBulkLength(c,mbl,fields*2);
|
setDeferredMultiBulkLength(c,mbl,fields*2);
|
||||||
@ -2044,14 +2065,18 @@ void sentinelCommand(redisClient *c) {
|
|||||||
return;
|
return;
|
||||||
addReplyDictOfRedisInstances(c,ri->sentinels);
|
addReplyDictOfRedisInstances(c,ri->sentinels);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
|
||||||
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
|
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid> */
|
||||||
sentinelRedisInstance *ri;
|
sentinelRedisInstance *ri;
|
||||||
|
long long req_epoch;
|
||||||
|
uint64_t leader_epoch = 0;
|
||||||
char *leader = NULL;
|
char *leader = NULL;
|
||||||
long port;
|
long port;
|
||||||
int isdown = 0;
|
int isdown = 0;
|
||||||
|
|
||||||
if (c->argc != 4) goto numargserr;
|
if (c->argc != 6) goto numargserr;
|
||||||
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
|
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK ||
|
||||||
|
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
|
||||||
|
!= REDIS_OK)
|
||||||
return;
|
return;
|
||||||
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
|
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
|
||||||
c->argv[2]->ptr,port,NULL);
|
c->argv[2]->ptr,port,NULL);
|
||||||
@ -2061,12 +2086,20 @@ void sentinelCommand(redisClient *c) {
|
|||||||
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
|
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
|
||||||
(ri->flags & SRI_MASTER))
|
(ri->flags & SRI_MASTER))
|
||||||
isdown = 1;
|
isdown = 1;
|
||||||
if (ri) leader = sentinelGetSubjectiveLeader(ri);
|
|
||||||
|
|
||||||
/* Reply with a two-elements multi-bulk reply: down state, leader. */
|
/* Vote for the master (or fetch the previous vote) */
|
||||||
addReplyMultiBulkLen(c,2);
|
if (ri && ri->flags & SRI_MASTER) {
|
||||||
|
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
|
||||||
|
c->argv[5]->ptr,
|
||||||
|
&leader_epoch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reply with a three-elements multi-bulk reply:
|
||||||
|
* down state, leader, vote epoch. */
|
||||||
|
addReplyMultiBulkLen(c,3);
|
||||||
addReply(c, isdown ? shared.cone : shared.czero);
|
addReply(c, isdown ? shared.cone : shared.czero);
|
||||||
addReplyBulkCString(c, leader ? leader : "?");
|
addReplyBulkCString(c, leader ? leader : "?");
|
||||||
|
addReplyLongLong(c, (long long)leader_epoch);
|
||||||
if (leader) sdsfree(leader);
|
if (leader) sdsfree(leader);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
|
||||||
/* SENTINEL RESET <pattern> */
|
/* SENTINEL RESET <pattern> */
|
||||||
@ -2289,9 +2322,10 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p
|
|||||||
/* Ignore every error or unexpected reply.
|
/* Ignore every error or unexpected reply.
|
||||||
* Note that if the command returns an error for any reason we'll
|
* Note that if the command returns an error for any reason we'll
|
||||||
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
|
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
|
||||||
if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
|
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
|
||||||
r->element[0]->type == REDIS_REPLY_INTEGER &&
|
r->element[0]->type == REDIS_REPLY_INTEGER &&
|
||||||
r->element[1]->type == REDIS_REPLY_STRING)
|
r->element[1]->type == REDIS_REPLY_STRING &&
|
||||||
|
r->element[2]->type == REDIS_REPLY_INTEGER)
|
||||||
{
|
{
|
||||||
ri->last_master_down_reply_time = mstime();
|
ri->last_master_down_reply_time = mstime();
|
||||||
if (r->element[0]->integer == 1) {
|
if (r->element[0]->integer == 1) {
|
||||||
@ -2301,6 +2335,7 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p
|
|||||||
}
|
}
|
||||||
sdsfree(ri->leader);
|
sdsfree(ri->leader);
|
||||||
ri->leader = sdsnew(r->element[1]->str);
|
ri->leader = sdsnew(r->element[1]->str);
|
||||||
|
ri->leader_epoch = r->element[2]->integer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2341,8 +2376,8 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
|
|||||||
ll2string(port,sizeof(port),master->addr->port);
|
ll2string(port,sizeof(port),master->addr->port);
|
||||||
retval = redisAsyncCommand(ri->cc,
|
retval = redisAsyncCommand(ri->cc,
|
||||||
sentinelReceiveIsMasterDownReply, NULL,
|
sentinelReceiveIsMasterDownReply, NULL,
|
||||||
"SENTINEL is-master-down-by-addr %s %s",
|
"SENTINEL is-master-down-by-addr %s %s %llu %s",
|
||||||
master->addr->ip, port);
|
master->addr->ip, port, sentinel.current_epoch, server.runid);
|
||||||
if (retval == REDIS_OK) ri->pending_commands++;
|
if (retval == REDIS_OK) ri->pending_commands++;
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
@ -2369,41 +2404,25 @@ int compareRunID(const void *a, const void *b) {
|
|||||||
return strcasecmp(*aptrptr, *bptrptr);
|
return strcasecmp(*aptrptr, *bptrptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
|
/* Vote for the sentinel with 'req_runid' or return the old vote if already
|
||||||
dictIterator *di;
|
* voted for the specifed 'req_epoch' or one greater.
|
||||||
dictEntry *de;
|
*
|
||||||
char **instance =
|
* If a vote is not available returns NULL, otherwise return the Sentinel
|
||||||
zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
|
* runid and populate the leader_epoch with the epoch of the last vote. */
|
||||||
int instances = 0;
|
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
|
||||||
char *leader = NULL;
|
sentinelRedisInstance *si = sentinelGetSentinelByRunid(master,req_runid);
|
||||||
|
|
||||||
if (master->flags & SRI_CAN_FAILOVER) {
|
if (req_epoch > sentinel.current_epoch)
|
||||||
/* Add myself if I'm a Sentinel that can failover this master. */
|
sentinel.current_epoch = req_epoch;
|
||||||
instance[instances++] = server.runid;
|
|
||||||
|
if (si && master->leader_epoch < req_epoch) {
|
||||||
|
sdsfree(master->leader);
|
||||||
|
master->leader = sdsnew(req_runid);
|
||||||
|
master->leader_epoch = sentinel.current_epoch;
|
||||||
}
|
}
|
||||||
|
|
||||||
di = dictGetIterator(master->sentinels);
|
*leader_epoch = master->leader_epoch;
|
||||||
while((de = dictNext(di)) != NULL) {
|
return master->leader;
|
||||||
sentinelRedisInstance *ri = dictGetVal(de);
|
|
||||||
mstime_t lag = mstime() - ri->last_avail_time;
|
|
||||||
|
|
||||||
if (lag > SENTINEL_INFO_VALIDITY_TIME ||
|
|
||||||
!(ri->flags & SRI_CAN_FAILOVER) ||
|
|
||||||
(ri->flags & SRI_DISCONNECTED) ||
|
|
||||||
ri->runid == NULL)
|
|
||||||
continue;
|
|
||||||
instance[instances++] = ri->runid;
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
|
|
||||||
/* If we have at least one instance passing our checks, order the array
|
|
||||||
* by runid. */
|
|
||||||
if (instances) {
|
|
||||||
qsort(instance,instances,sizeof(char*),compareRunID);
|
|
||||||
leader = sdsnew(instance[0]);
|
|
||||||
}
|
|
||||||
zfree(instance);
|
|
||||||
return leader;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sentinelLeader {
|
struct sentinelLeader {
|
||||||
@ -2411,9 +2430,9 @@ struct sentinelLeader {
|
|||||||
unsigned long votes;
|
unsigned long votes;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Helper function for sentinelGetObjectiveLeader, increment the counter
|
/* Helper function for sentinelGetLeader, increment the counter
|
||||||
* relative to the specified runid. */
|
* relative to the specified runid. */
|
||||||
void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
|
void sentinelLeaderIncr(dict *counters, char *runid) {
|
||||||
dictEntry *de = dictFind(counters,runid);
|
dictEntry *de = dictFind(counters,runid);
|
||||||
uint64_t oldval;
|
uint64_t oldval;
|
||||||
|
|
||||||
@ -2427,9 +2446,13 @@ void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Scan all the Sentinels attached to this master to check what is the
|
/* Scan all the Sentinels attached to this master to check if there
|
||||||
* most voted leader among Sentinels. */
|
* is a leader for a given term, and return it if any.
|
||||||
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
|
*
|
||||||
|
* To be a leader for a given epoch, we should have the majorify of
|
||||||
|
* the Sentinels we know about that reported the same instance as
|
||||||
|
* leader for the same epoch. */
|
||||||
|
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t term) {
|
||||||
dict *counters;
|
dict *counters;
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -2443,7 +2466,7 @@ char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
|
|||||||
/* Count my vote. */
|
/* Count my vote. */
|
||||||
myvote = sentinelGetSubjectiveLeader(master);
|
myvote = sentinelGetSubjectiveLeader(master);
|
||||||
if (myvote) {
|
if (myvote) {
|
||||||
sentinelObjectiveLeaderIncr(counters,myvote);
|
sentinelLeaderIncr(counters,myvote);
|
||||||
voters++;
|
voters++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2458,7 +2481,7 @@ char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
|
|||||||
* leader fails. In that case we consider all the voters. */
|
* leader fails. In that case we consider all the voters. */
|
||||||
if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
||||||
!(ri->flags & SRI_MASTER_DOWN)) continue;
|
!(ri->flags & SRI_MASTER_DOWN)) continue;
|
||||||
sentinelObjectiveLeaderIncr(counters,ri->leader);
|
sentinelLeaderIncr(counters,ri->leader);
|
||||||
voters++;
|
voters++;
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user