Lua script selective replication fixes.

This commit is contained in:
antirez 2015-10-29 15:57:41 +01:00
parent a3e8de0430
commit 514a234722
3 changed files with 32 additions and 20 deletions

View File

@ -385,11 +385,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (server.lua_replicate_commands) {
call_flags |= CMD_CALL_PROPAGATE;
/* Don't propagate AOf / replication stream if a redis.set_repl()
* call changed the default replication policy. */
if (!(server.lua_repl & PROPAGATE_AOF)) preventCommandAOF(c);
if (!(server.lua_repl & PROPAGATE_REPL)) preventCommandReplication(c);
/* Set flags according to redis.set_repl() settings. */
if (server.lua_repl & PROPAGATE_AOF)
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (server.lua_repl & PROPAGATE_REPL)
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c,call_flags);

View File

@ -2224,29 +2224,34 @@ void call(client *c, int flags) {
}
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) == 0) {
int flags = PROPAGATE_NONE;
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty)
flags |= (PROPAGATE_REPL | PROPAGATE_AOF);
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the command forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) flags |= PROPAGATE_AOF;
/* However calls to preventCommandPropagation() or its selective
* variants preventCommandAOF() and preventCommandReplicaiton()
* will clear the flags to avoid propagation. */
if (c->flags & CLIENT_PREVENT_REPL_PROP) flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP) flags &= ~PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementatino called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. */
if (flags != PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
if (propagate_flags != PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
@ -2265,7 +2270,12 @@ void call(client *c, int flags) {
if (flags & CMD_CALL_PROPAGATE) {
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,rop->target);
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
redisOpArrayFree(&server.also_propagate);

View File

@ -382,9 +382,11 @@ typedef long long mstime_t; /* millisecond time type. */
/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
#define CMD_CALL_SLOWLOG 1
#define CMD_CALL_STATS 2
#define CMD_CALL_PROPAGATE 4
#define CMD_CALL_SLOWLOG (1<<0)
#define CMD_CALL_STATS (1<<1)
#define CMD_CALL_PROPAGATE_AOF (1<<2)
#define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
/* Command propagation flags, see propagate() function */