diff --git a/src/sentinel.c b/src/sentinel.c index 4ce48de5..edf430ef 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -1978,6 +1978,97 @@ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privd ri->last_pub_time = mstime(); } +/* Process an hello message received via Pub/Sub in master or slave instance, + * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel. + * + * If the master name specified in the message is not known, the message is + * discareded. */ +void sentinelProcessHelloMessage(char *hello, int hello_len) { + /* Format is composed of 8 tokens: + * 0=ip,1=port,2=runid,3=current_epoch,4=master_name, + * 5=master_ip,6=master_port,7=master_config_epoch. */ + int numtokens, port, removed, master_port; + uint64_t current_epoch, master_config_epoch; + char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens); + sentinelRedisInstance *si, *master; + + if (numtokens == 8) { + /* Obtain a reference to the master this hello message is about */ + master = sentinelGetMasterByName(token[4]); + if (!master) goto cleanup; /* Unknown master, skip the message. */ + + /* First, try to see if we already have this sentinel. */ + port = atoi(token[1]); + master_port = atoi(token[6]); + si = getSentinelRedisInstanceByAddrAndRunID( + master->sentinels,token[0],port,token[2]); + current_epoch = strtoull(token[3],NULL,10); + master_config_epoch = strtoull(token[7],NULL,10); + + if (!si) { + /* If not, remove all the sentinels that have the same runid + * OR the same ip/port, because it's either a restart or a + * network topology change. */ + removed = removeMatchingSentinelsFromMaster(master,token[0],port, + token[2]); + if (removed) { + sentinelEvent(REDIS_NOTICE,"-dup-sentinel",master, + "%@ #duplicate of %s:%d or %s", + token[0],port,token[2]); + } + + /* Add the new sentinel. */ + si = createSentinelRedisInstance(NULL,SRI_SENTINEL, + token[0],port,master->quorum,master); + if (si) { + sentinelEvent(REDIS_NOTICE,"+sentinel",si,"%@"); + /* The runid is NULL after a new instance creation and + * for Sentinels we don't have a later chance to fill it, + * so do it now. */ + si->runid = sdsnew(token[2]); + sentinelFlushConfig(); + } + } + + /* Update local current_epoch if received current_epoch is greater.*/ + if (current_epoch > sentinel.current_epoch) { + sentinel.current_epoch = current_epoch; + sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu", + (unsigned long long) sentinel.current_epoch); + } + + /* Update master info if received configuration is newer. */ + if (master->config_epoch < master_config_epoch) { + master->config_epoch = master_config_epoch; + if (master_port != master->addr->port || + strcmp(master->addr->ip, token[5])) + { + sentinelAddr *old_addr; + + sentinelEvent(REDIS_WARNING,"+switch-master", + master,"%s %s %d %s %d", + master->name, + master->addr->ip, master->addr->port, + token[5], master_port); + + old_addr = dupSentinelAddr(master->addr); + sentinelResetMasterAndChangeAddress(master, token[5], master_port); + sentinelCallClientReconfScript(master, + SENTINEL_OBSERVER,"start", + old_addr,master->addr); + releaseSentinelAddr(old_addr); + } + } + + /* Update the state of the Sentinel. */ + if (si) si->last_hello_time = mstime(); + } + +cleanup: + sdsfreesplitres(token,numtokens); +} + + /* This is our Pub/Sub callback for the Hello channel. It's useful in order * to discover other sentinels attached at the same master. */ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { @@ -2006,90 +2097,7 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd /* We are not interested in meeting ourselves */ if (strstr(r->element[2]->str,server.runid) != NULL) return; - { - /* Format is composed of 8 tokens: - * 0=ip,1=port,2=runid,3=current_epoch,4=master_name, - * 5=master_ip,6=master_port,7=master_config_epoch. */ - int numtokens, port, removed, master_port; - uint64_t current_epoch, master_config_epoch; - char **token = sdssplitlen(r->element[2]->str, - r->element[2]->len, - ",",1,&numtokens); - sentinelRedisInstance *si; - - if (numtokens == 8) { - /* First, try to see if we already have this sentinel. */ - port = atoi(token[1]); - master_port = atoi(token[6]); - si = getSentinelRedisInstanceByAddrAndRunID( - master->sentinels,token[0],port,token[2]); - current_epoch = strtoull(token[3],NULL,10); - master_config_epoch = strtoull(token[7],NULL,10); - sentinelRedisInstance *msgmaster; - - if (!si) { - /* If not, remove all the sentinels that have the same runid - * OR the same ip/port, because it's either a restart or a - * network topology change. */ - removed = removeMatchingSentinelsFromMaster(master,token[0],port, - token[2]); - if (removed) { - sentinelEvent(REDIS_NOTICE,"-dup-sentinel",master, - "%@ #duplicate of %s:%d or %s", - token[0],port,token[2]); - } - - /* Add the new sentinel. */ - si = createSentinelRedisInstance(NULL,SRI_SENTINEL, - token[0],port,master->quorum,master); - if (si) { - sentinelEvent(REDIS_NOTICE,"+sentinel",si,"%@"); - /* The runid is NULL after a new instance creation and - * for Sentinels we don't have a later chance to fill it, - * so do it now. */ - si->runid = sdsnew(token[2]); - sentinelFlushConfig(); - } - } - - /* Update local current_epoch if received current_epoch is greater.*/ - if (current_epoch > sentinel.current_epoch) { - sentinel.current_epoch = current_epoch; - sentinelEvent(REDIS_WARNING,"+new-epoch",ri,"%llu", - (unsigned long long) sentinel.current_epoch); - } - - /* Update master info if received configuration is newer. */ - if ((msgmaster = sentinelGetMasterByName(token[4])) != NULL) { - if (msgmaster->config_epoch < master_config_epoch) { - msgmaster->config_epoch = master_config_epoch; - if (master_port != msgmaster->addr->port || - strcmp(msgmaster->addr->ip, token[5])) - { - sentinelAddr *old_addr; - - sentinelEvent(REDIS_WARNING,"+switch-master", - msgmaster,"%s %s %d %s %d", - msgmaster->name, - msgmaster->addr->ip, msgmaster->addr->port, - token[5], master_port); - - old_addr = dupSentinelAddr(msgmaster->addr); - sentinelResetMasterAndChangeAddress(msgmaster, - token[5], master_port); - sentinelCallClientReconfScript(msgmaster, - SENTINEL_OBSERVER,"start", - old_addr,msgmaster->addr); - releaseSentinelAddr(old_addr); - } - } - } - - /* Update the state of the Sentinel. */ - if (si) si->last_hello_time = mstime(); - } - sdsfreesplitres(token,numtokens); - } + sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len); } /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis @@ -2176,10 +2184,8 @@ void sentinelPingInstance(sentinelRedisInstance *ri) { sentinelPingReplyCallback, NULL, "PING"); if (retval != REDIS_OK) return; ri->pending_commands++; - } else if ((ri->flags & SRI_SENTINEL) == 0 && - (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) - { - /* PUBLISH hello messages to masters and slaves. */ + } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { + /* PUBLISH hello messages to all the three kinds of instances. */ sentinelSendHello(ri); } }