mirror of
https://github.com/fluencelabs/redis
synced 2025-05-05 15:32:14 +00:00
Merge pull request #6615 from soloestoy/wrap-also-propagate-as-multi
Wrap also propagate as multi
This commit is contained in:
commit
d3a9dff6b9
10
src/module.c
10
src/module.c
@ -590,11 +590,8 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
|||||||
|
|
||||||
/* Handle the replication of the final EXEC, since whatever a command
|
/* Handle the replication of the final EXEC, since whatever a command
|
||||||
* emits is always wrapped around MULTI/EXEC. */
|
* emits is always wrapped around MULTI/EXEC. */
|
||||||
robj *propargv[1];
|
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||||
propargv[0] = createStringObject("EXEC",4);
|
|
||||||
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
|
||||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
decrRefCount(propargv[0]);
|
|
||||||
|
|
||||||
/* If this is not a module command context (but is instead a simple
|
/* If this is not a module command context (but is instead a simple
|
||||||
* callback context), we have to handle directly the "also propagate"
|
* callback context), we have to handle directly the "also propagate"
|
||||||
@ -3300,6 +3297,11 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
|||||||
* a Lua script in the context of AOF and slaves. */
|
* a Lua script in the context of AOF and slaves. */
|
||||||
if (replicate) moduleReplicateMultiIfNeeded(ctx);
|
if (replicate) moduleReplicateMultiIfNeeded(ctx);
|
||||||
|
|
||||||
|
if (ctx->client->flags & CLIENT_MULTI ||
|
||||||
|
ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
|
||||||
|
c->flags |= CLIENT_MULTI;
|
||||||
|
}
|
||||||
|
|
||||||
/* Run the command */
|
/* Run the command */
|
||||||
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
||||||
if (replicate) {
|
if (replicate) {
|
||||||
|
10
src/multi.c
10
src/multi.c
@ -106,11 +106,13 @@ void discardCommand(client *c) {
|
|||||||
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
|
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
|
||||||
* implementation for more information. */
|
* implementation for more information. */
|
||||||
void execCommandPropagateMulti(client *c) {
|
void execCommandPropagateMulti(client *c) {
|
||||||
robj *multistring = createStringObject("MULTI",5);
|
propagate(server.multiCommand,c->db->id,&shared.multi,1,
|
||||||
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
propagate(server.multiCommand,c->db->id,&multistring,1,
|
}
|
||||||
|
|
||||||
|
void execCommandPropagateExec(client *c) {
|
||||||
|
propagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
decrRefCount(multistring);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void execCommand(client *c) {
|
void execCommand(client *c) {
|
||||||
|
@ -1615,11 +1615,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
if (server.lua_replicate_commands) {
|
if (server.lua_replicate_commands) {
|
||||||
preventCommandPropagation(c);
|
preventCommandPropagation(c);
|
||||||
if (server.lua_multi_emitted) {
|
if (server.lua_multi_emitted) {
|
||||||
robj *propargv[1];
|
execCommandPropagateExec(c);
|
||||||
propargv[0] = createStringObject("EXEC",4);
|
|
||||||
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
|
||||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
|
||||||
decrRefCount(propargv[0]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
19
src/server.c
19
src/server.c
@ -2244,6 +2244,8 @@ void createSharedObjects(void) {
|
|||||||
shared.rpoplpush = createStringObject("RPOPLPUSH",9);
|
shared.rpoplpush = createStringObject("RPOPLPUSH",9);
|
||||||
shared.zpopmin = createStringObject("ZPOPMIN",7);
|
shared.zpopmin = createStringObject("ZPOPMIN",7);
|
||||||
shared.zpopmax = createStringObject("ZPOPMAX",7);
|
shared.zpopmax = createStringObject("ZPOPMAX",7);
|
||||||
|
shared.multi = createStringObject("MULTI",5);
|
||||||
|
shared.exec = createStringObject("EXEC",4);
|
||||||
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
||||||
shared.integers[j] =
|
shared.integers[j] =
|
||||||
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
|
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
|
||||||
@ -2372,6 +2374,7 @@ void initServerConfig(void) {
|
|||||||
server.pexpireCommand = lookupCommandByCString("pexpire");
|
server.pexpireCommand = lookupCommandByCString("pexpire");
|
||||||
server.xclaimCommand = lookupCommandByCString("xclaim");
|
server.xclaimCommand = lookupCommandByCString("xclaim");
|
||||||
server.xgroupCommand = lookupCommandByCString("xgroup");
|
server.xgroupCommand = lookupCommandByCString("xgroup");
|
||||||
|
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
|
||||||
|
|
||||||
/* Debugging */
|
/* Debugging */
|
||||||
server.assert_failed = "<no assertion failed>";
|
server.assert_failed = "<no assertion failed>";
|
||||||
@ -3272,6 +3275,18 @@ void call(client *c, int flags) {
|
|||||||
redisOp *rop;
|
redisOp *rop;
|
||||||
|
|
||||||
if (flags & CMD_CALL_PROPAGATE) {
|
if (flags & CMD_CALL_PROPAGATE) {
|
||||||
|
int multi_emitted = 0;
|
||||||
|
/* Wrap the commands in server.also_propagate array,
|
||||||
|
* but don't wrap it if we are already in MULIT context,
|
||||||
|
* in case the nested MULIT/EXEC.
|
||||||
|
*
|
||||||
|
* And if the array contains only one command, no need to
|
||||||
|
* wrap it, since the single command is atomic. */
|
||||||
|
if (server.also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) {
|
||||||
|
execCommandPropagateMulti(c);
|
||||||
|
multi_emitted = 1;
|
||||||
|
}
|
||||||
|
|
||||||
for (j = 0; j < server.also_propagate.numops; j++) {
|
for (j = 0; j < server.also_propagate.numops; j++) {
|
||||||
rop = &server.also_propagate.ops[j];
|
rop = &server.also_propagate.ops[j];
|
||||||
int target = rop->target;
|
int target = rop->target;
|
||||||
@ -3281,6 +3296,10 @@ void call(client *c, int flags) {
|
|||||||
if (target)
|
if (target)
|
||||||
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
|
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (multi_emitted) {
|
||||||
|
execCommandPropagateExec(c);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
redisOpArrayFree(&server.also_propagate);
|
redisOpArrayFree(&server.also_propagate);
|
||||||
}
|
}
|
||||||
|
@ -848,6 +848,7 @@ struct sharedObjectsStruct {
|
|||||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||||
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
|
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
|
||||||
|
*multi, *exec,
|
||||||
*select[PROTO_SHARED_SELECT_CMDS],
|
*select[PROTO_SHARED_SELECT_CMDS],
|
||||||
*integers[OBJ_SHARED_INTEGERS],
|
*integers[OBJ_SHARED_INTEGERS],
|
||||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||||
@ -1083,7 +1084,7 @@ struct redisServer {
|
|||||||
*lpopCommand, *rpopCommand, *zpopminCommand,
|
*lpopCommand, *rpopCommand, *zpopminCommand,
|
||||||
*zpopmaxCommand, *sremCommand, *execCommand,
|
*zpopmaxCommand, *sremCommand, *execCommand,
|
||||||
*expireCommand, *pexpireCommand, *xclaimCommand,
|
*expireCommand, *pexpireCommand, *xclaimCommand,
|
||||||
*xgroupCommand;
|
*xgroupCommand, *rpoplpushCommand;
|
||||||
/* Fields used only for stats */
|
/* Fields used only for stats */
|
||||||
time_t stat_starttime; /* Server start time */
|
time_t stat_starttime; /* Server start time */
|
||||||
long long stat_numcommands; /* Number of processed commands */
|
long long stat_numcommands; /* Number of processed commands */
|
||||||
@ -1680,6 +1681,7 @@ void touchWatchedKeysOnFlush(int dbid);
|
|||||||
void discardTransaction(client *c);
|
void discardTransaction(client *c);
|
||||||
void flagTransaction(client *c);
|
void flagTransaction(client *c);
|
||||||
void execCommandPropagateMulti(client *c);
|
void execCommandPropagateMulti(client *c);
|
||||||
|
void execCommandPropagateExec(client *c);
|
||||||
|
|
||||||
/* Redis object implementation */
|
/* Redis object implementation */
|
||||||
void decrRefCount(robj *o);
|
void decrRefCount(robj *o);
|
||||||
|
17
src/t_list.c
17
src/t_list.c
@ -653,20 +653,13 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
if (!(dstobj &&
|
if (!(dstobj &&
|
||||||
checkType(receiver,dstobj,OBJ_LIST)))
|
checkType(receiver,dstobj,OBJ_LIST)))
|
||||||
{
|
{
|
||||||
/* Propagate the RPOP operation. */
|
|
||||||
argv[0] = shared.rpop;
|
|
||||||
argv[1] = key;
|
|
||||||
propagate(server.rpopCommand,
|
|
||||||
db->id,argv,2,
|
|
||||||
PROPAGATE_AOF|
|
|
||||||
PROPAGATE_REPL);
|
|
||||||
rpoplpushHandlePush(receiver,dstkey,dstobj,
|
rpoplpushHandlePush(receiver,dstkey,dstobj,
|
||||||
value);
|
value);
|
||||||
/* Propagate the LPUSH operation. */
|
/* Propagate the RPOPLPUSH operation. */
|
||||||
argv[0] = shared.lpush;
|
argv[0] = shared.rpoplpush;
|
||||||
argv[1] = dstkey;
|
argv[1] = key;
|
||||||
argv[2] = value;
|
argv[2] = dstkey;
|
||||||
propagate(server.lpushCommand,
|
propagate(server.rpoplpushCommand,
|
||||||
db->id,argv,3,
|
db->id,argv,3,
|
||||||
PROPAGATE_AOF|
|
PROPAGATE_AOF|
|
||||||
PROPAGATE_REPL);
|
PROPAGATE_REPL);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user