diff --git a/src/sentinel.c b/src/sentinel.c index af5363a7..7bb924f0 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -173,9 +173,8 @@ typedef struct sentinelRedisInstance { char *leader; /* If this is a master instance, this is the runid of the Sentinel that should perform the failover. If this is a Sentinel, this is the runid of the Sentinel - that this other Sentinel is voting as leader. - This field is valid only if SRI_MASTER_DOWN is - set on the Sentinel instance. */ + that this Sentinel voted as leader. */ + uint64_t leader_epoch; /* Epoch of the 'leader' field. */ int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */ mstime_t failover_state_change_time; mstime_t failover_start_time; /* When to start to failover if leader. */ @@ -327,6 +326,7 @@ void sentinelScheduleScriptExecution(char *path, ...); void sentinelStartFailover(sentinelRedisInstance *master, int state); void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata); int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port); +char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch); /* ========================= Dictionary types =============================== */ @@ -894,6 +894,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * /* Failover state. */ ri->leader = NULL; + ri->leader_epoch = 0; ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; ri->failover_state_change_time = 0; ri->failover_start_time = 0; @@ -1031,7 +1032,7 @@ sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, c return instance; } -/* Simple master lookup by name */ +/* Master lookup by name */ sentinelRedisInstance *sentinelGetMasterByName(char *name) { sentinelRedisInstance *ri; sds sdsname = sdsnew(name); @@ -1041,6 +1042,24 @@ sentinelRedisInstance *sentinelGetMasterByName(char *name) { return ri; } +/* Senitnel lookup by runid */ +sentinelRedisInstance *sentinelGetSentinelByRunid(sentinelRedisInstance *master, char *runid) { + sentinelRedisInstance *retval = NULL; + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + if (!strcmp(ri->runid,runid)) { + retval = ri; + break; + } + } + dictReleaseIterator(di); + return retval; +} + /* Add the specified flags to all the instances in the specified dictionary. */ void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) { dictIterator *di; @@ -1979,11 +1998,13 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0); fields++; - if (ri->flags & SRI_MASTER_DOWN) { - addReplyBulkCString(c,"subjective-leader"); - addReplyBulkCString(c,ri->leader ? ri->leader : "?"); - fields++; - } + addReplyBulkCString(c,"voted-leader"); + addReplyBulkCString(c,ri->leader ? ri->leader : "?"); + fields++; + + addReplyBulkCString(c,"voted-leader-epoch"); + addReplyBulkLongLong(c,ri->leader_epoch); + fields++; } setDeferredMultiBulkLength(c,mbl,fields*2); @@ -2044,14 +2065,18 @@ void sentinelCommand(redisClient *c) { return; addReplyDictOfRedisInstances(c,ri->sentinels); } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { - /* SENTINEL IS-MASTER-DOWN-BY-ADDR */ + /* SENTINEL IS-MASTER-DOWN-BY-ADDR */ sentinelRedisInstance *ri; + long long req_epoch; + uint64_t leader_epoch = 0; char *leader = NULL; long port; int isdown = 0; - if (c->argc != 4) goto numargserr; - if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK) + if (c->argc != 6) goto numargserr; + if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK || + getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL) + != REDIS_OK) return; ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2]->ptr,port,NULL); @@ -2061,12 +2086,20 @@ void sentinelCommand(redisClient *c) { if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1; - if (ri) leader = sentinelGetSubjectiveLeader(ri); - /* Reply with a two-elements multi-bulk reply: down state, leader. */ - addReplyMultiBulkLen(c,2); + /* Vote for the master (or fetch the previous vote) */ + if (ri && ri->flags & SRI_MASTER) { + leader = sentinelVoteLeader(ri,(uint64_t)req_epoch, + c->argv[5]->ptr, + &leader_epoch); + } + + /* Reply with a three-elements multi-bulk reply: + * down state, leader, vote epoch. */ + addReplyMultiBulkLen(c,3); addReply(c, isdown ? shared.cone : shared.czero); addReplyBulkCString(c, leader ? leader : "?"); + addReplyLongLong(c, (long long)leader_epoch); if (leader) sdsfree(leader); } else if (!strcasecmp(c->argv[1]->ptr,"reset")) { /* SENTINEL RESET */ @@ -2289,9 +2322,10 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p /* Ignore every error or unexpected reply. * Note that if the command returns an error for any reason we'll * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */ - if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 && + if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 && r->element[0]->type == REDIS_REPLY_INTEGER && - r->element[1]->type == REDIS_REPLY_STRING) + r->element[1]->type == REDIS_REPLY_STRING && + r->element[2]->type == REDIS_REPLY_INTEGER) { ri->last_master_down_reply_time = mstime(); if (r->element[0]->integer == 1) { @@ -2301,6 +2335,7 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p } sdsfree(ri->leader); ri->leader = sdsnew(r->element[1]->str); + ri->leader_epoch = r->element[2]->integer; } } @@ -2341,8 +2376,8 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) { ll2string(port,sizeof(port),master->addr->port); retval = redisAsyncCommand(ri->cc, sentinelReceiveIsMasterDownReply, NULL, - "SENTINEL is-master-down-by-addr %s %s", - master->addr->ip, port); + "SENTINEL is-master-down-by-addr %s %s %llu %s", + master->addr->ip, port, sentinel.current_epoch, server.runid); if (retval == REDIS_OK) ri->pending_commands++; } dictReleaseIterator(di); @@ -2369,41 +2404,25 @@ int compareRunID(const void *a, const void *b) { return strcasecmp(*aptrptr, *bptrptr); } -char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) { - dictIterator *di; - dictEntry *de; - char **instance = - zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1)); - int instances = 0; - char *leader = NULL; +/* Vote for the sentinel with 'req_runid' or return the old vote if already + * voted for the specifed 'req_epoch' or one greater. + * + * If a vote is not available returns NULL, otherwise return the Sentinel + * runid and populate the leader_epoch with the epoch of the last vote. */ +char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) { + sentinelRedisInstance *si = sentinelGetSentinelByRunid(master,req_runid); - if (master->flags & SRI_CAN_FAILOVER) { - /* Add myself if I'm a Sentinel that can failover this master. */ - instance[instances++] = server.runid; + if (req_epoch > sentinel.current_epoch) + sentinel.current_epoch = req_epoch; + + if (si && master->leader_epoch < req_epoch) { + sdsfree(master->leader); + master->leader = sdsnew(req_runid); + master->leader_epoch = sentinel.current_epoch; } - di = dictGetIterator(master->sentinels); - while((de = dictNext(di)) != NULL) { - sentinelRedisInstance *ri = dictGetVal(de); - mstime_t lag = mstime() - ri->last_avail_time; - - if (lag > SENTINEL_INFO_VALIDITY_TIME || - !(ri->flags & SRI_CAN_FAILOVER) || - (ri->flags & SRI_DISCONNECTED) || - ri->runid == NULL) - continue; - instance[instances++] = ri->runid; - } - dictReleaseIterator(di); - - /* If we have at least one instance passing our checks, order the array - * by runid. */ - if (instances) { - qsort(instance,instances,sizeof(char*),compareRunID); - leader = sdsnew(instance[0]); - } - zfree(instance); - return leader; + *leader_epoch = master->leader_epoch; + return master->leader; } struct sentinelLeader { @@ -2411,9 +2430,9 @@ struct sentinelLeader { unsigned long votes; }; -/* Helper function for sentinelGetObjectiveLeader, increment the counter +/* Helper function for sentinelGetLeader, increment the counter * relative to the specified runid. */ -void sentinelObjectiveLeaderIncr(dict *counters, char *runid) { +void sentinelLeaderIncr(dict *counters, char *runid) { dictEntry *de = dictFind(counters,runid); uint64_t oldval; @@ -2427,9 +2446,13 @@ void sentinelObjectiveLeaderIncr(dict *counters, char *runid) { } } -/* Scan all the Sentinels attached to this master to check what is the - * most voted leader among Sentinels. */ -char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) { +/* Scan all the Sentinels attached to this master to check if there + * is a leader for a given term, and return it if any. + * + * To be a leader for a given epoch, we should have the majorify of + * the Sentinels we know about that reported the same instance as + * leader for the same epoch. */ +char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t term) { dict *counters; dictIterator *di; dictEntry *de; @@ -2443,7 +2466,7 @@ char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) { /* Count my vote. */ myvote = sentinelGetSubjectiveLeader(master); if (myvote) { - sentinelObjectiveLeaderIncr(counters,myvote); + sentinelLeaderIncr(counters,myvote); voters++; } @@ -2458,7 +2481,7 @@ char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) { * leader fails. In that case we consider all the voters. */ if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) && !(ri->flags & SRI_MASTER_DOWN)) continue; - sentinelObjectiveLeaderIncr(counters,ri->leader); + sentinelLeaderIncr(counters,ri->leader); voters++; } dictReleaseIterator(di);