From 37a10cef028f0ed16e6768dabdd3ffa56fc77761 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 22 Nov 2019 15:32:43 +0800 Subject: [PATCH 1/4] Propagation: wrap commands in also_propagate array with MULIT/EXEC Random command like SPOP with count is replicated as some SREM operations, and store them in also_propagate array to propagate after the call, but this would break atomicity. To keep the command's atomicity, wrap also_propagate array with MULTI/EXEC. --- src/module.c | 5 +---- src/multi.c | 10 ++++++---- src/scripting.c | 5 +---- src/server.c | 18 ++++++++++++++++++ src/server.h | 2 ++ 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/module.c b/src/module.c index be64af36..b6a87ab2 100644 --- a/src/module.c +++ b/src/module.c @@ -574,11 +574,8 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* Handle the replication of the final EXEC, since whatever a command * emits is always wrapped around MULTI/EXEC. */ - robj *propargv[1]; - propargv[0] = createStringObject("EXEC",4); - alsoPropagate(server.execCommand,c->db->id,propargv,1, + alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(propargv[0]); /* If this is not a module command context (but is instead a simple * callback context), we have to handle directly the "also propagate" diff --git a/src/multi.c b/src/multi.c index f885fa19..df11225b 100644 --- a/src/multi.c +++ b/src/multi.c @@ -106,11 +106,13 @@ void discardCommand(client *c) { /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ void execCommandPropagateMulti(client *c) { - robj *multistring = createStringObject("MULTI",5); - - propagate(server.multiCommand,c->db->id,&multistring,1, + propagate(server.multiCommand,c->db->id,&shared.multi,1, + PROPAGATE_AOF|PROPAGATE_REPL); +} + +void execCommandPropagateExec(client *c) { + propagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(multistring); } void execCommand(client *c) { diff --git a/src/scripting.c b/src/scripting.c index 7cf21f40..dc2510f9 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1591,11 +1591,8 @@ void evalGenericCommand(client *c, int evalsha) { if (server.lua_replicate_commands) { preventCommandPropagation(c); if (server.lua_multi_emitted) { - robj *propargv[1]; - propargv[0] = createStringObject("EXEC",4); - alsoPropagate(server.execCommand,c->db->id,propargv,1, + alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(propargv[0]); } } diff --git a/src/server.c b/src/server.c index 803cfc80..3ad8664f 100644 --- a/src/server.c +++ b/src/server.c @@ -2244,6 +2244,8 @@ void createSharedObjects(void) { shared.rpoplpush = createStringObject("RPOPLPUSH",9); shared.zpopmin = createStringObject("ZPOPMIN",7); shared.zpopmax = createStringObject("ZPOPMAX",7); + shared.multi = createStringObject("MULTI",5); + shared.exec = createStringObject("EXEC",4); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -3366,6 +3368,18 @@ void call(client *c, int flags) { redisOp *rop; if (flags & CMD_CALL_PROPAGATE) { + int multi_emitted = 0; + /* Wrap the commands in server.also_propagate array, + * but don't wrap it if we are already in MULIT context, + * in case the nested MULIT/EXEC. + * + * And if the array contains only one command, no need to + * wrap it, since the single command is atomic. */ + if (server.also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) { + execCommandPropagateMulti(c); + multi_emitted = 1; + } + for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; @@ -3375,6 +3389,10 @@ void call(client *c, int flags) { if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } + + if (multi_emitted) { + execCommandPropagateExec(c); + } } redisOpArrayFree(&server.also_propagate); } diff --git a/src/server.h b/src/server.h index 5a95d4c1..22e48446 100644 --- a/src/server.h +++ b/src/server.h @@ -923,6 +923,7 @@ struct sharedObjectsStruct { *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, + *multi, *exec, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1751,6 +1752,7 @@ void touchWatchedKeysOnFlush(int dbid); void discardTransaction(client *c); void flagTransaction(client *c); void execCommandPropagateMulti(client *c); +void execCommandPropagateExec(client *c); /* Redis object implementation */ void decrRefCount(robj *o); From c73d70fb4654f1758c73e4d23e278e1c01575ffa Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 22 Nov 2019 15:45:21 +0800 Subject: [PATCH 2/4] Propagation: propagate EXEC directly in lua script --- src/scripting.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/scripting.c b/src/scripting.c index dc2510f9..b1eaae12 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1591,8 +1591,7 @@ void evalGenericCommand(client *c, int evalsha) { if (server.lua_replicate_commands) { preventCommandPropagation(c); if (server.lua_multi_emitted) { - alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, - PROPAGATE_AOF|PROPAGATE_REPL); + execCommandPropagateExec(c); } } From 2c970532dc47e68efee55082718502e4fa591c7d Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 22 Nov 2019 16:20:27 +0800 Subject: [PATCH 3/4] Propagation: flag module client as CLIENT_MULTI if needed in case of nested MULTI/EXEC --- src/module.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/module.c b/src/module.c index b6a87ab2..17786449 100644 --- a/src/module.c +++ b/src/module.c @@ -3243,6 +3243,11 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch * a Lua script in the context of AOF and slaves. */ if (replicate) moduleReplicateMultiIfNeeded(ctx); + if (ctx->client->flags & CLIENT_MULTI || + ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) { + c->flags |= CLIENT_MULTI; + } + /* Run the command */ int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; if (replicate) { From 6b056d29f31c01188c4758ade8900c847bbd025c Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 22 Nov 2019 16:38:49 +0800 Subject: [PATCH 4/4] block: propagate BRPOPLPUSH as RPOPLPUSH when unblock --- src/server.c | 1 + src/server.h | 2 +- src/t_list.c | 17 +++++------------ 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/server.c b/src/server.c index 3ad8664f..210c4415 100644 --- a/src/server.c +++ b/src/server.c @@ -2464,6 +2464,7 @@ void initServerConfig(void) { server.pexpireCommand = lookupCommandByCString("pexpire"); server.xclaimCommand = lookupCommandByCString("xclaim"); server.xgroupCommand = lookupCommandByCString("xgroup"); + server.rpoplpushCommand = lookupCommandByCString("rpoplpush"); /* Slow log */ server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; diff --git a/src/server.h b/src/server.h index 22e48446..1975646e 100644 --- a/src/server.h +++ b/src/server.h @@ -1159,7 +1159,7 @@ struct redisServer { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand; + *xgroupCommand, *rpoplpushCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ diff --git a/src/t_list.c b/src/t_list.c index 9bbd61de..eaeaa8e4 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -653,20 +653,13 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb if (!(dstobj && checkType(receiver,dstobj,OBJ_LIST))) { - /* Propagate the RPOP operation. */ - argv[0] = shared.rpop; - argv[1] = key; - propagate(server.rpopCommand, - db->id,argv,2, - PROPAGATE_AOF| - PROPAGATE_REPL); rpoplpushHandlePush(receiver,dstkey,dstobj, value); - /* Propagate the LPUSH operation. */ - argv[0] = shared.lpush; - argv[1] = dstkey; - argv[2] = value; - propagate(server.lpushCommand, + /* Propagate the RPOPLPUSH operation. */ + argv[0] = shared.rpoplpush; + argv[1] = key; + argv[2] = dstkey; + propagate(server.rpoplpushCommand, db->id,argv,3, PROPAGATE_AOF| PROPAGATE_REPL);