mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Sentinel command renaming: base machanism implemented.
This commit is contained in:
parent
70b7fa2c52
commit
72e8a33b35
@ -178,6 +178,10 @@ typedef struct sentinelRedisInstance {
|
|||||||
mstime_t o_down_since_time; /* Objectively down since time. */
|
mstime_t o_down_since_time; /* Objectively down since time. */
|
||||||
mstime_t down_after_period; /* Consider it down after that period. */
|
mstime_t down_after_period; /* Consider it down after that period. */
|
||||||
mstime_t info_refresh; /* Time at which we received INFO output from it. */
|
mstime_t info_refresh; /* Time at which we received INFO output from it. */
|
||||||
|
dict *renamed_commands; /* Commands renamed in this instance:
|
||||||
|
Sentinel will use the alternative commands
|
||||||
|
mapped on this table to send things like
|
||||||
|
SLAVEOF, CONFING, INFO, ... */
|
||||||
|
|
||||||
/* Role and the first time we observed it.
|
/* Role and the first time we observed it.
|
||||||
* This is useful in order to delay replacing what the instance reports
|
* This is useful in order to delay replacing what the instance reports
|
||||||
@ -384,6 +388,7 @@ void sentinelSimFailureCrash(void);
|
|||||||
|
|
||||||
uint64_t dictSdsHash(const void *key);
|
uint64_t dictSdsHash(const void *key);
|
||||||
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
|
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
|
||||||
|
int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2);
|
||||||
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
|
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
|
||||||
|
|
||||||
void dictInstancesValDestructor (void *privdata, void *obj) {
|
void dictInstancesValDestructor (void *privdata, void *obj) {
|
||||||
@ -417,6 +422,16 @@ dictType leaderVotesDictType = {
|
|||||||
NULL /* val destructor */
|
NULL /* val destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* Instance renamed commands table. */
|
||||||
|
dictType renamedCommandsDictType = {
|
||||||
|
dictSdsHash, /* hash function */
|
||||||
|
NULL, /* key dup */
|
||||||
|
NULL, /* val dup */
|
||||||
|
dictSdsKeyCaseCompare, /* key compare */
|
||||||
|
dictSdsDestructor, /* key destructor */
|
||||||
|
dictSdsDestructor /* val destructor */
|
||||||
|
};
|
||||||
|
|
||||||
/* =========================== Initialization =============================== */
|
/* =========================== Initialization =============================== */
|
||||||
|
|
||||||
void sentinelCommand(client *c);
|
void sentinelCommand(client *c);
|
||||||
@ -1211,6 +1226,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
|
|||||||
ri->master = master;
|
ri->master = master;
|
||||||
ri->slaves = dictCreate(&instancesDictType,NULL);
|
ri->slaves = dictCreate(&instancesDictType,NULL);
|
||||||
ri->info_refresh = 0;
|
ri->info_refresh = 0;
|
||||||
|
ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);
|
||||||
|
|
||||||
/* Failover state. */
|
/* Failover state. */
|
||||||
ri->leader = NULL;
|
ri->leader = NULL;
|
||||||
@ -1258,6 +1274,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
|
|||||||
sdsfree(ri->auth_pass);
|
sdsfree(ri->auth_pass);
|
||||||
sdsfree(ri->info);
|
sdsfree(ri->info);
|
||||||
releaseSentinelAddr(ri->addr);
|
releaseSentinelAddr(ri->addr);
|
||||||
|
dictRelease(ri->renamed_commands);
|
||||||
|
|
||||||
/* Clear state into the master if needed. */
|
/* Clear state into the master if needed. */
|
||||||
if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
|
if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
|
||||||
@ -1572,6 +1589,21 @@ char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
|
|||||||
else return "unknown";
|
else return "unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function is used in order to send commands to Redis instances: the
|
||||||
|
* commands we send from Sentinel may be renamed, a common case is a master
|
||||||
|
* with CONFIG and SLAVEOF commands renamed for security concerns. In that
|
||||||
|
* case we check the ri->renamed_command table (or if the instance is a slave,
|
||||||
|
* we check the one of the master), and map the command that we should send
|
||||||
|
* to the set of renamed commads. However, if the command was not renamed,
|
||||||
|
* we just return "command" itself. */
|
||||||
|
char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
|
||||||
|
sds sc = sdsnew(command);
|
||||||
|
if (ri->master) ri = ri->master;
|
||||||
|
char *retval = dictFetchValue(ri->renamed_commands, sc);
|
||||||
|
sdsfree(sc);
|
||||||
|
return retval ? retval : command;
|
||||||
|
}
|
||||||
|
|
||||||
/* ============================ Config handling ============================= */
|
/* ============================ Config handling ============================= */
|
||||||
char *sentinelHandleConfiguration(char **argv, int argc) {
|
char *sentinelHandleConfiguration(char **argv, int argc) {
|
||||||
sentinelRedisInstance *ri;
|
sentinelRedisInstance *ri;
|
||||||
@ -1891,7 +1923,8 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
|||||||
ri->master->auth_pass;
|
ri->master->auth_pass;
|
||||||
|
|
||||||
if (auth_pass) {
|
if (auth_pass) {
|
||||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "AUTH %s",
|
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
|
||||||
|
sentinelInstanceMapCommand(ri,"AUTH"),
|
||||||
auth_pass) == C_OK) ri->link->pending_commands++;
|
auth_pass) == C_OK) ri->link->pending_commands++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1907,7 +1940,9 @@ void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char
|
|||||||
|
|
||||||
snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
|
snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
|
||||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
|
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
|
||||||
"CLIENT SETNAME %s", name) == C_OK)
|
"%s SETNAME %s",
|
||||||
|
sentinelInstanceMapCommand(ri,"CLIENT"),
|
||||||
|
name) == C_OK)
|
||||||
{
|
{
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
}
|
}
|
||||||
@ -1969,8 +2004,9 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
sentinelSetClientName(ri,link->pc,"pubsub");
|
sentinelSetClientName(ri,link->pc,"pubsub");
|
||||||
/* Now we subscribe to the Sentinels "Hello" channel. */
|
/* Now we subscribe to the Sentinels "Hello" channel. */
|
||||||
retval = redisAsyncCommand(link->pc,
|
retval = redisAsyncCommand(link->pc,
|
||||||
sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
|
sentinelReceiveHelloMessages, ri, "%s %s",
|
||||||
SENTINEL_HELLO_CHANNEL);
|
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
|
||||||
|
SENTINEL_HELLO_CHANNEL);
|
||||||
if (retval != C_OK) {
|
if (retval != C_OK) {
|
||||||
/* If we can't subscribe, the Pub/Sub connection is useless
|
/* If we can't subscribe, the Pub/Sub connection is useless
|
||||||
* and we can simply disconnect it and try again. */
|
* and we can simply disconnect it and try again. */
|
||||||
@ -2304,8 +2340,11 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
|
|||||||
{
|
{
|
||||||
if (redisAsyncCommand(ri->link->cc,
|
if (redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri,
|
sentinelDiscardReplyCallback, ri,
|
||||||
"SCRIPT KILL") == C_OK)
|
"%s KILL",
|
||||||
|
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
|
||||||
|
{
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
}
|
||||||
ri->flags |= SRI_SCRIPT_KILL_SENT;
|
ri->flags |= SRI_SCRIPT_KILL_SENT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2511,8 +2550,9 @@ int sentinelSendHello(sentinelRedisInstance *ri) {
|
|||||||
master->name,master_addr->ip,master_addr->port,
|
master->name,master_addr->ip,master_addr->port,
|
||||||
(unsigned long long) master->config_epoch);
|
(unsigned long long) master->config_epoch);
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
|
sentinelPublishReplyCallback, ri, "%s %s %s",
|
||||||
SENTINEL_HELLO_CHANNEL,payload);
|
sentinelInstanceMapCommand(ri,"PUBLISH"),
|
||||||
|
SENTINEL_HELLO_CHANNEL,payload);
|
||||||
if (retval != C_OK) return C_ERR;
|
if (retval != C_OK) return C_ERR;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
return C_OK;
|
return C_OK;
|
||||||
@ -2557,7 +2597,8 @@ int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
|
|||||||
* queued in the connection. */
|
* queued in the connection. */
|
||||||
int sentinelSendPing(sentinelRedisInstance *ri) {
|
int sentinelSendPing(sentinelRedisInstance *ri) {
|
||||||
int retval = redisAsyncCommand(ri->link->cc,
|
int retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelPingReplyCallback, ri, "PING");
|
sentinelPingReplyCallback, ri, "%s",
|
||||||
|
sentinelInstanceMapCommand(ri,"PING"));
|
||||||
if (retval == C_OK) {
|
if (retval == C_OK) {
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
ri->link->last_ping_time = mstime();
|
ri->link->last_ping_time = mstime();
|
||||||
@ -2621,7 +2662,8 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
|
|||||||
(now - ri->info_refresh) > info_period))
|
(now - ri->info_refresh) > info_period))
|
||||||
{
|
{
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelInfoReplyCallback, ri, "INFO");
|
sentinelInfoReplyCallback, ri, "%s",
|
||||||
|
sentinelInstanceMapCommand(ri,"INFO"));
|
||||||
if (retval == C_OK) ri->link->pending_commands++;
|
if (retval == C_OK) ri->link->pending_commands++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3617,7 +3659,8 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f
|
|||||||
ll2string(port,sizeof(port),master->addr->port);
|
ll2string(port,sizeof(port),master->addr->port);
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelReceiveIsMasterDownReply, ri,
|
sentinelReceiveIsMasterDownReply, ri,
|
||||||
"SENTINEL is-master-down-by-addr %s %s %llu %s",
|
"%s is-master-down-by-addr %s %s %llu %s",
|
||||||
|
sentinelInstanceMapCommand(ri,"SENTINEL"),
|
||||||
master->addr->ip, port,
|
master->addr->ip, port,
|
||||||
sentinel.current_epoch,
|
sentinel.current_epoch,
|
||||||
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
|
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
|
||||||
@ -3796,17 +3839,21 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
|
|||||||
* Note that we don't check the replies returned by commands, since we
|
* Note that we don't check the replies returned by commands, since we
|
||||||
* will observe instead the effects in the next INFO output. */
|
* will observe instead the effects in the next INFO output. */
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri, "MULTI");
|
sentinelDiscardReplyCallback, ri, "%s",
|
||||||
|
sentinelInstanceMapCommand(ri,"MULTI"));
|
||||||
if (retval == C_ERR) return retval;
|
if (retval == C_ERR) return retval;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri, "SLAVEOF %s %s", host, portstr);
|
sentinelDiscardReplyCallback, ri, "%s %s %s",
|
||||||
|
sentinelInstanceMapCommand(ri,"SLAVEOF"),
|
||||||
|
host, portstr);
|
||||||
if (retval == C_ERR) return retval;
|
if (retval == C_ERR) return retval;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri, "CONFIG REWRITE");
|
sentinelDiscardReplyCallback, ri, "%s REWRITE",
|
||||||
|
sentinelInstanceMapCommand(ri,"CONFIG"));
|
||||||
if (retval == C_ERR) return retval;
|
if (retval == C_ERR) return retval;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
|
||||||
@ -3816,12 +3863,14 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
|
|||||||
* recognized as a syntax error, and the transaction will not fail (but
|
* recognized as a syntax error, and the transaction will not fail (but
|
||||||
* only the unsupported command will fail). */
|
* only the unsupported command will fail). */
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri, "CLIENT KILL TYPE normal");
|
sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
|
||||||
|
sentinelInstanceMapCommand(ri,"CLIENT"));
|
||||||
if (retval == C_ERR) return retval;
|
if (retval == C_ERR) return retval;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
|
||||||
retval = redisAsyncCommand(ri->link->cc,
|
retval = redisAsyncCommand(ri->link->cc,
|
||||||
sentinelDiscardReplyCallback, ri, "EXEC");
|
sentinelDiscardReplyCallback, ri, "%s",
|
||||||
|
sentinelInstanceMapCommand(ri,"EXEC"));
|
||||||
if (retval == C_ERR) return retval;
|
if (retval == C_ERR) return retval;
|
||||||
ri->link->pending_commands++;
|
ri->link->pending_commands++;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user