From 515a26bbc157ca76d6e2441dd21fbac32e37bbd6 Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Fri, 21 Jun 2013 12:07:53 +0200
Subject: [PATCH] New API to force propagation.

The old REDIS_CMD_FORCE_REPLICATION flag was removed from the
implementation of Redis, now there is a new API to force specific
executions of a command to be propagated to AOF / Replication link:

    void forceCommandPropagation(int flags);

The new API is also compatible with Lua scripting, so a script that will
execute commands that are forced to be propagated, will also be
propagated itself accordingly even if no change to data is operated.

As a side effect, this new design fixes the issue with scripts not able
to propagate PUBLISH to slaves (issue #873).
---
 src/pubsub.c    |  1 +
 src/redis.c     | 32 ++++++++++++++++++++++++++++----
 src/redis.h     |  5 ++++-
 src/scripting.c |  1 +
 4 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/src/pubsub.c b/src/pubsub.c
index add9a4c5..a596dfc9 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -307,6 +307,7 @@ void punsubscribeCommand(redisClient *c) {
 void publishCommand(redisClient *c) {
     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
     if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
+    forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
     addReplyLongLong(c,receivers);
 }
 
diff --git a/src/redis.c b/src/redis.c
index eee6cc53..d3d426c0 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -239,7 +239,7 @@ struct redisCommand redisCommandTable[] = {
     {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
     {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
     {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
-    {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0},
+    {"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},
     {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
@@ -1528,7 +1528,6 @@ void populateCommandTable(void) {
             case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
             case 'a': c->flags |= REDIS_CMD_ADMIN; break;
             case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
-            case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break;
             case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
             case 'R': c->flags |= REDIS_CMD_RANDOM; break;
             case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
@@ -1652,9 +1651,18 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
 }
 
+/* It is possible to call the function forceCommandPropagation() inside a
+ * Redis command implementaiton in order to to force the propagation of a
+ * specific command execution into AOF / Replication. */
+void forceCommandPropagation(redisClient *c, int flags) {
+    if (flags & REDIS_PROPAGATE_REPL) c->flags |= REDIS_FORCE_REPL;
+    if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF;
+}
+
 /* Call() is the core of Redis execution of a command */
 void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
+    int client_old_flags = c->flags;
 
     /* Sent the command to clients in MONITOR mode, only if the commands are
      * not generated from reading an AOF. */
@@ -1666,6 +1674,7 @@ void call(redisClient *c, int flags) {
     }
 
     /* Call the command. */
+    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
     redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
@@ -1677,6 +1686,16 @@ void call(redisClient *c, int flags) {
     if (server.loading && c->flags & REDIS_LUA_CLIENT)
         flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
 
+    /* If the caller is Lua, we want to force the EVAL caller to propagate
+     * the script if the command flag or client flag are forcing the
+     * propagation. */
+    if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {
+        if (c->flags & REDIS_FORCE_REPL)
+            server.lua_caller->flags |= REDIS_FORCE_REPL;
+        if (c->flags & REDIS_FORCE_AOF)
+            server.lua_caller->flags |= REDIS_FORCE_AOF;
+    }
+
     /* Log the command into the Slow log if needed, and populate the
      * per-command statistics that we show in INFO commandstats. */
     if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)
@@ -1690,14 +1709,19 @@ void call(redisClient *c, int flags) {
     if (flags & REDIS_CALL_PROPAGATE) {
         int flags = REDIS_PROPAGATE_NONE;
 
-        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
-            flags |= REDIS_PROPAGATE_REPL;
+        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
+        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
         if (dirty)
             flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
         if (flags != REDIS_PROPAGATE_NONE)
             propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
     }
 
+    /* Restore the old FORCE_AOF/REPL flags, since call can be executed
+     * recursively. */
+    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
+    c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
+
     /* Handle the alsoPropagate() API to handle commands that want to propagate
      * multiple separated commands. */
     if (server.also_propagate.numops) {
diff --git a/src/redis.h b/src/redis.h
index b97fb073..0c054919 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -139,7 +139,7 @@
 #define REDIS_CMD_WRITE 1                   /* "w" flag */
 #define REDIS_CMD_READONLY 2                /* "r" flag */
 #define REDIS_CMD_DENYOOM 4                 /* "m" flag */
-#define REDIS_CMD_FORCE_REPLICATION 8       /* "f" flag */
+#define REDIS_CMD_NOT_USED_1 8              /* no longer used flag */
 #define REDIS_CMD_ADMIN 16                  /* "a" flag */
 #define REDIS_CMD_PUBSUB 32                 /* "p" flag */
 #define REDIS_CMD_NOSCRIPT  64              /* "s" flag */
@@ -217,6 +217,8 @@
 #define REDIS_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
 #define REDIS_DIRTY_EXEC (1<<12)  /* EXEC will fail for errors while queueing */
 #define REDIS_MASTER_FORCE_REPLY (1<<13)  /* Queue replies even if is master */
+#define REDIS_FORCE_AOF (1<<14)   /* Force AOF propagation of current cmd. */
+#define REDIS_FORCE_REPL (1<<15)  /* Force replication of current cmd. */
 
 /* Client request types */
 #define REDIS_REQ_INLINE 1
@@ -1211,6 +1213,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name);
 void call(redisClient *c, int flags);
 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
+void forceCommandPropagation(redisClient *c, int flags);
 int prepareForShutdown();
 #ifdef __GNUC__
 void redisLog(int level, const char *fmt, ...)
diff --git a/src/scripting.c b/src/scripting.c
index b94627c7..104bd3dd 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -1042,6 +1042,7 @@ void scriptCommand(redisClient *c) {
         }
         addReplyBulkCBuffer(c,funcname+2,40);
         sdsfree(sha);
+        forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
     } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
         if (server.lua_caller == NULL) {
             addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n"));