mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
min-slaves-to-write: don't accept writes with less than N replicas.
This feature allows the user to specify the minimum number of connected replicas having a lag less or equal than the specified amount of seconds for writes to be accepted.
This commit is contained in:
parent
cbdb2153c5
commit
ed599d3aca
@ -281,7 +281,13 @@ slave-priority 100
|
|||||||
#
|
#
|
||||||
# For example to require at least 3 slaves with a lag <= 10 seconds use:
|
# For example to require at least 3 slaves with a lag <= 10 seconds use:
|
||||||
#
|
#
|
||||||
# repl-min-slaves-to-write 3 10
|
# min-slaves-to-write 3
|
||||||
|
# min-slaves-max-lag 10
|
||||||
|
#
|
||||||
|
# Setting one or the other to 0 disables the feature.
|
||||||
|
#
|
||||||
|
# By default min-slaves-to-write is set to 0 (feature disabled) and
|
||||||
|
# min-slaves-max-lag is set to 10.
|
||||||
|
|
||||||
################################## SECURITY ###################################
|
################################## SECURITY ###################################
|
||||||
|
|
||||||
|
20
src/config.c
20
src/config.c
@ -440,6 +440,16 @@ void loadServerConfigFromString(char *config) {
|
|||||||
}
|
}
|
||||||
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
|
||||||
server.slave_priority = atoi(argv[1]);
|
server.slave_priority = atoi(argv[1]);
|
||||||
|
} else if (!strcasecmp(argv[0],"min-slaves-to-write") && argc == 2) {
|
||||||
|
server.repl_min_slaves_to_write = atoi(argv[1]);
|
||||||
|
if (server.repl_min_slaves_to_write < 0) {
|
||||||
|
err = "Invalid value for min-slaves-to-write."; goto loaderr;
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp(argv[0],"min-slaves-max-lag") && argc == 2) {
|
||||||
|
server.repl_min_slaves_max_lag = atoi(argv[1]);
|
||||||
|
if (server.repl_min_slaves_max_lag < 0) {
|
||||||
|
err = "Invalid value for min-slaves-max-lag."; goto loaderr;
|
||||||
|
}
|
||||||
} else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"notify-keyspace-events") && argc == 2) {
|
||||||
int flags = keyspaceEventsStringToFlags(argv[1]);
|
int flags = keyspaceEventsStringToFlags(argv[1]);
|
||||||
|
|
||||||
@ -801,6 +811,14 @@ void configSetCommand(redisClient *c) {
|
|||||||
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
ll <= 0) goto badfmt;
|
ll <= 0) goto badfmt;
|
||||||
server.slave_priority = ll;
|
server.slave_priority = ll;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"min-slaves-to-write")) {
|
||||||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
|
ll < 0) goto badfmt;
|
||||||
|
server.repl_min_slaves_to_write = ll;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"min-slaves-max-lag")) {
|
||||||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
|
ll < 0) goto badfmt;
|
||||||
|
server.repl_min_slaves_max_lag = ll;
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"cluster-node-timeout")) {
|
} else if (!strcasecmp(c->argv[2]->ptr,"cluster-node-timeout")) {
|
||||||
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
ll <= 0) goto badfmt;
|
ll <= 0) goto badfmt;
|
||||||
@ -902,6 +920,8 @@ void configGetCommand(redisClient *c) {
|
|||||||
config_get_numerical_field("maxclients",server.maxclients);
|
config_get_numerical_field("maxclients",server.maxclients);
|
||||||
config_get_numerical_field("watchdog-period",server.watchdog_period);
|
config_get_numerical_field("watchdog-period",server.watchdog_period);
|
||||||
config_get_numerical_field("slave-priority",server.slave_priority);
|
config_get_numerical_field("slave-priority",server.slave_priority);
|
||||||
|
config_get_numerical_field("min-slaves-to-write",server.repl_min_slaves_to_write);
|
||||||
|
config_get_numerical_field("min-slaves-max-lag",server.repl_min_slaves_max_lag);
|
||||||
config_get_numerical_field("hz",server.hz);
|
config_get_numerical_field("hz",server.hz);
|
||||||
config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout);
|
config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout);
|
||||||
|
|
||||||
|
@ -692,6 +692,7 @@ void freeClient(redisClient *c) {
|
|||||||
* backlog. */
|
* backlog. */
|
||||||
if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
|
if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0)
|
||||||
server.repl_no_slaves_since = server.unixtime;
|
server.repl_no_slaves_since = server.unixtime;
|
||||||
|
refreshGoodSlavesCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Case 2: we lost the connection with the master. */
|
/* Case 2: we lost the connection with the master. */
|
||||||
|
15
src/redis.c
15
src/redis.c
@ -1161,6 +1161,8 @@ void createSharedObjects(void) {
|
|||||||
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
|
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
|
||||||
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
|
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
|
||||||
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
|
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
|
||||||
|
shared.noreplicaserr = createObject(REDIS_STRING,sdsnew(
|
||||||
|
"-NOREPLICAS Not enough good slaves to write.\r\n"));
|
||||||
shared.space = createObject(REDIS_STRING,sdsnew(" "));
|
shared.space = createObject(REDIS_STRING,sdsnew(" "));
|
||||||
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
|
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
|
||||||
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
|
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
|
||||||
@ -1264,6 +1266,8 @@ void initServerConfig() {
|
|||||||
server.shutdown_asap = 0;
|
server.shutdown_asap = 0;
|
||||||
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
|
||||||
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
server.repl_timeout = REDIS_REPL_TIMEOUT;
|
||||||
|
server.repl_min_slaves_to_write = REDIS_DEFAULT_MIN_SLAVES_TO_WRITE;
|
||||||
|
server.repl_min_slaves_max_lag = REDIS_DEFAULT_MIN_SLAVES_MAX_LAG;
|
||||||
server.cluster_enabled = 0;
|
server.cluster_enabled = 0;
|
||||||
server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
|
server.cluster_node_timeout = REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT;
|
||||||
server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
|
server.cluster_configfile = zstrdup(REDIS_DEFAULT_CLUSTER_CONFIG_FILE);
|
||||||
@ -1469,6 +1473,7 @@ void initServer() {
|
|||||||
server.ops_sec_last_sample_ops = 0;
|
server.ops_sec_last_sample_ops = 0;
|
||||||
server.unixtime = time(NULL);
|
server.unixtime = time(NULL);
|
||||||
server.lastbgsave_status = REDIS_OK;
|
server.lastbgsave_status = REDIS_OK;
|
||||||
|
server.repl_good_slaves_count = 0;
|
||||||
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
||||||
redisPanic("Can't create the serverCron time event.");
|
redisPanic("Can't create the serverCron time event.");
|
||||||
exit(1);
|
exit(1);
|
||||||
@ -1803,6 +1808,16 @@ int processCommand(redisClient *c) {
|
|||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Don't accept write commands if there are not enough good slaves and
|
||||||
|
* used configured the min-slaves-to-write option. */
|
||||||
|
if (server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag &&
|
||||||
|
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
|
||||||
|
{
|
||||||
|
flagTransaction(c);
|
||||||
|
addReply(c, shared.noreplicaserr);
|
||||||
|
return REDIS_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* Don't accept write commands if this is a read only slave. But
|
/* Don't accept write commands if this is a read only slave. But
|
||||||
* accept write commands if this is our master. */
|
* accept write commands if this is our master. */
|
||||||
if (server.masterhost && server.repl_slave_ro &&
|
if (server.masterhost && server.repl_slave_ro &&
|
||||||
|
@ -118,6 +118,8 @@
|
|||||||
#define REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
#define REDIS_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
||||||
#define REDIS_DEFAULT_ACTIVE_REHASHING 1
|
#define REDIS_DEFAULT_ACTIVE_REHASHING 1
|
||||||
#define REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
#define REDIS_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
||||||
|
#define REDIS_DEFAULT_MIN_SLAVES_TO_WRITE 0
|
||||||
|
#define REDIS_DEFAULT_MIN_SLAVES_MAX_LAG 10
|
||||||
|
|
||||||
/* Protocol and I/O related defines */
|
/* Protocol and I/O related defines */
|
||||||
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
||||||
@ -480,7 +482,7 @@ struct sharedObjectsStruct {
|
|||||||
*colon, *nullbulk, *nullmultibulk, *queued,
|
*colon, *nullbulk, *nullmultibulk, *queued,
|
||||||
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
|
||||||
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
|
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
|
||||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr,
|
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||||
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
||||||
*lpush,
|
*lpush,
|
||||||
@ -831,6 +833,9 @@ struct redisServer {
|
|||||||
gets released. */
|
gets released. */
|
||||||
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
||||||
Only valid if server.slaves len is 0. */
|
Only valid if server.slaves len is 0. */
|
||||||
|
int repl_min_slaves_to_write; /* Min number of slaves to write. */
|
||||||
|
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
|
||||||
|
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
|
||||||
/* Replication (slave) */
|
/* Replication (slave) */
|
||||||
char *masterauth; /* AUTH with this password with master */
|
char *masterauth; /* AUTH with this password with master */
|
||||||
char *masterhost; /* Hostname of master */
|
char *masterhost; /* Hostname of master */
|
||||||
@ -1151,6 +1156,7 @@ void replicationCacheMaster(redisClient *c);
|
|||||||
void resizeReplicationBacklog(long long newsize);
|
void resizeReplicationBacklog(long long newsize);
|
||||||
void replicationSetMaster(char *ip, int port);
|
void replicationSetMaster(char *ip, int port);
|
||||||
void replicationUnsetMaster(void);
|
void replicationUnsetMaster(void);
|
||||||
|
void refreshGoodSlavesCount(void);
|
||||||
|
|
||||||
/* Generic persistence functions */
|
/* Generic persistence functions */
|
||||||
void startLoading(FILE *fp);
|
void startLoading(FILE *fp);
|
||||||
|
@ -1406,6 +1406,30 @@ void replicationResurrectCachedMaster(int newfd) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
|
||||||
|
|
||||||
|
/* This function counts the number of slaves with lag <= min-slaves-max-lag.
|
||||||
|
* If the option is active, the server will prevent writes if there are not
|
||||||
|
* enough connected slaves with the specified lag (or less). */
|
||||||
|
void refreshGoodSlavesCount(void) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
int good = 0;
|
||||||
|
|
||||||
|
if (!server.repl_min_slaves_to_write ||
|
||||||
|
!server.repl_min_slaves_max_lag) return;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
time_t lag = server.unixtime - slave->repl_ack_time;
|
||||||
|
|
||||||
|
if (slave->replstate == REDIS_REPL_ONLINE &&
|
||||||
|
lag <= server.repl_min_slaves_max_lag) good++;
|
||||||
|
}
|
||||||
|
server.repl_good_slaves_count = good;
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------- REPLICATION CRON ---------------------------- */
|
/* --------------------------- REPLICATION CRON ---------------------------- */
|
||||||
|
|
||||||
/* Replication cron funciton, called 1 time per second. */
|
/* Replication cron funciton, called 1 time per second. */
|
||||||
@ -1519,4 +1543,7 @@ void replicationCron(void) {
|
|||||||
(int) server.repl_backlog_time_limit);
|
(int) server.repl_backlog_time_limit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||||
|
refreshGoodSlavesCount();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user