Sentinel: HELLO processing refactored into sentinelProcessHelloMessage().

This commit is contained in:
antirez 2014-03-14 10:56:44 +01:00
parent 133fccb03f
commit 9dfe426fc8

View File

@ -1978,46 +1978,25 @@ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privd
ri->last_pub_time = mstime(); ri->last_pub_time = mstime();
} }
/* This is our Pub/Sub callback for the Hello channel. It's useful in order /* Process an hello message received via Pub/Sub in master or slave instance,
* to discover other sentinels attached at the same master. */ * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { *
sentinelRedisInstance *ri = c->data, *master; * If the master name specified in the message is not known, the message is
redisReply *r; * discareded. */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
if (!reply || !ri) return;
r = reply;
master = (ri->flags & SRI_MASTER) ? ri : ri->master;
/* Update the last activity in the pubsub channel. Note that since we
* receive our messages as well this timestamp can be used to detect
* if the link is probably disconnected even if it seems otherwise. */
ri->pc_last_activity = mstime();
/* Sanity check in the reply we expect, so that the code that follows
* can avoid to check for details. */
if (r->type != REDIS_REPLY_ARRAY ||
r->elements != 3 ||
r->element[0]->type != REDIS_REPLY_STRING ||
r->element[1]->type != REDIS_REPLY_STRING ||
r->element[2]->type != REDIS_REPLY_STRING ||
strcmp(r->element[0]->str,"message") != 0) return;
/* We are not interested in meeting ourselves */
if (strstr(r->element[2]->str,server.runid) != NULL) return;
{
/* Format is composed of 8 tokens: /* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name, * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */ * 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port; int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch; uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(r->element[2]->str, char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
r->element[2]->len, sentinelRedisInstance *si, *master;
",",1,&numtokens);
sentinelRedisInstance *si;
if (numtokens == 8) { if (numtokens == 8) {
/* Obtain a reference to the master this hello message is about */
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; /* Unknown master, skip the message. */
/* First, try to see if we already have this sentinel. */ /* First, try to see if we already have this sentinel. */
port = atoi(token[1]); port = atoi(token[1]);
master_port = atoi(token[6]); master_port = atoi(token[6]);
@ -2025,7 +2004,6 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
master->sentinels,token[0],port,token[2]); master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10); current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10); master_config_epoch = strtoull(token[7],NULL,10);
sentinelRedisInstance *msgmaster;
if (!si) { if (!si) {
/* If not, remove all the sentinels that have the same runid /* If not, remove all the sentinels that have the same runid
@ -2055,41 +2033,71 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
/* Update local current_epoch if received current_epoch is greater.*/ /* Update local current_epoch if received current_epoch is greater.*/
if (current_epoch > sentinel.current_epoch) { if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch; sentinel.current_epoch = current_epoch;
sentinelEvent(REDIS_WARNING,"+new-epoch",ri,"%llu", sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch); (unsigned long long) sentinel.current_epoch);
} }
/* Update master info if received configuration is newer. */ /* Update master info if received configuration is newer. */
if ((msgmaster = sentinelGetMasterByName(token[4])) != NULL) { if (master->config_epoch < master_config_epoch) {
if (msgmaster->config_epoch < master_config_epoch) { master->config_epoch = master_config_epoch;
msgmaster->config_epoch = master_config_epoch; if (master_port != master->addr->port ||
if (master_port != msgmaster->addr->port || strcmp(master->addr->ip, token[5]))
strcmp(msgmaster->addr->ip, token[5]))
{ {
sentinelAddr *old_addr; sentinelAddr *old_addr;
sentinelEvent(REDIS_WARNING,"+switch-master", sentinelEvent(REDIS_WARNING,"+switch-master",
msgmaster,"%s %s %d %s %d", master,"%s %s %d %s %d",
msgmaster->name, master->name,
msgmaster->addr->ip, msgmaster->addr->port, master->addr->ip, master->addr->port,
token[5], master_port); token[5], master_port);
old_addr = dupSentinelAddr(msgmaster->addr); old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(msgmaster, sentinelResetMasterAndChangeAddress(master, token[5], master_port);
token[5], master_port); sentinelCallClientReconfScript(master,
sentinelCallClientReconfScript(msgmaster,
SENTINEL_OBSERVER,"start", SENTINEL_OBSERVER,"start",
old_addr,msgmaster->addr); old_addr,master->addr);
releaseSentinelAddr(old_addr); releaseSentinelAddr(old_addr);
} }
} }
}
/* Update the state of the Sentinel. */ /* Update the state of the Sentinel. */
if (si) si->last_hello_time = mstime(); if (si) si->last_hello_time = mstime();
} }
cleanup:
sdsfreesplitres(token,numtokens); sdsfreesplitres(token,numtokens);
} }
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
* to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = c->data, *master;
redisReply *r;
if (!reply || !ri) return;
r = reply;
master = (ri->flags & SRI_MASTER) ? ri : ri->master;
/* Update the last activity in the pubsub channel. Note that since we
* receive our messages as well this timestamp can be used to detect
* if the link is probably disconnected even if it seems otherwise. */
ri->pc_last_activity = mstime();
/* Sanity check in the reply we expect, so that the code that follows
* can avoid to check for details. */
if (r->type != REDIS_REPLY_ARRAY ||
r->elements != 3 ||
r->element[0]->type != REDIS_REPLY_STRING ||
r->element[1]->type != REDIS_REPLY_STRING ||
r->element[2]->type != REDIS_REPLY_STRING ||
strcmp(r->element[0]->str,"message") != 0) return;
/* We are not interested in meeting ourselves */
if (strstr(r->element[2]->str,server.runid) != NULL) return;
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
} }
/* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
@ -2176,10 +2184,8 @@ void sentinelPingInstance(sentinelRedisInstance *ri) {
sentinelPingReplyCallback, NULL, "PING"); sentinelPingReplyCallback, NULL, "PING");
if (retval != REDIS_OK) return; if (retval != REDIS_OK) return;
ri->pending_commands++; ri->pending_commands++;
} else if ((ri->flags & SRI_SENTINEL) == 0 && } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
(now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) /* PUBLISH hello messages to all the three kinds of instances. */
{
/* PUBLISH hello messages to masters and slaves. */
sentinelSendHello(ri); sentinelSendHello(ri);
} }
} }