From ec511fa709f3c8fc74d26ca52114dcb9add754cc Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 9 Oct 2018 13:11:02 +0800
Subject: [PATCH 1/5] Streams: add a new command XTREAM

XSTREAM CREATE <key> <id or *> -- Create a new empty stream.
XSTREAM SETID <key> <id or $>  -- Set the current stream ID.
---
 src/server.c   |  1 +
 src/server.h   |  1 +
 src/t_stream.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 67 insertions(+)

diff --git a/src/server.c b/src/server.c
index 78aee5db..7f9b80ff 100644
--- a/src/server.c
+++ b/src/server.c
@@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
     {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
     {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
     {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
+    {"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0},
     {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
     {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
     {"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},
diff --git a/src/server.h b/src/server.h
index 4c4c0ce5..4c7be2fe 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2107,6 +2107,7 @@ void xrevrangeCommand(client *c);
 void xlenCommand(client *c);
 void xreadCommand(client *c);
 void xgroupCommand(client *c);
+void xstreamCommand(client *c);
 void xackCommand(client *c);
 void xpendingCommand(client *c);
 void xclaimCommand(client *c);
diff --git a/src/t_stream.c b/src/t_stream.c
index 4387e08a..f94f0f60 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1746,6 +1746,71 @@ NULL
     }
 }
 
+/* XSTREAM CREATE <key> <id or *>
+ * XSTREAM SETID <key> <id or $> */
+void xstreamCommand(client *c) {
+    const char *help[] = {
+"CREATE <key> <id or *>  -- Create a new empty stream.",
+"SETID  <key> <id or $>  -- Set the current stream ID.",
+"HELP                    -- Prints this help.",
+NULL
+    };
+    stream *s = NULL;
+    char *opt = c->argv[1]->ptr; /* Subcommand name. */
+
+    /* Dispatch the different subcommands. */
+    if (!strcasecmp(opt,"CREATE") && c->argc == 4) {
+        robj *o = lookupKeyWrite(c->db,c->argv[2]);
+        if (o) {
+            addReplySds(c,
+                sdsnew("-BUSYSTREAM Stream already exists\r\n"));
+            return;
+        } else {
+            streamID id;
+            if (!strcmp(c->argv[3]->ptr,"*")) {
+                id.ms = mstime();
+                id.seq = 0;
+            } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
+                return;
+            }
+
+            o = createStreamObject();
+            s = o->ptr;
+            s->last_id = id;
+            dbAdd(c->db,c->argv[2],o);
+
+            addReply(c,shared.ok);
+            server.dirty++;
+            notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",
+                                c->argv[2],c->db->id);
+        }
+    } else if (!strcasecmp(opt,"SETID") && c->argc == 4) {
+        robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
+        if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
+        s = o->ptr;
+        streamID id;
+        if (!strcmp(c->argv[3]->ptr,"$")) {
+            id = s->last_id;
+        } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
+            return;
+        }
+        if (streamCompareID(&id,&s->last_id) < 0) {
+            addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the "
+                            "target stream top item");
+            return;
+        }
+        s->last_id = id;
+        addReply(c,shared.ok);
+        server.dirty++;
+        notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid",
+                            c->argv[2],c->db->id);
+    } else if (!strcasecmp(opt,"HELP")) {
+        addReplyHelp(c, help);
+    } else {
+        addReplySubcommandSyntaxError(c);
+    }
+}
+
 /* XACK <key> <group> <id> <id> ... <id>
  *
  * Acknowledge a message as processed. In practical terms we just check the

From 5f3adbee33d555d436b03d19f32ce903da36252b Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 9 Oct 2018 15:21:08 +0800
Subject: [PATCH 2/5] Streams: add tests for XSTREAM command

---
 tests/unit/type/stream.tcl | 39 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 2b69a2e9..769ab571 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -363,3 +363,42 @@ start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries
         assert {[r xlen mystream] == 90}
     }
 }
+
+start_server {tags {"xstream command"}} {
+    test {XSTREAM can CREATE an empty stream} {
+        r XSTREAM CREATE mystream *
+        assert {[dict get [r xinfo stream mystream] length] == 0}
+    }
+
+    test {XSTREAM cannot CREATE on an busy stream} {
+        catch {r XSTREAM CREATE mystream *} err
+        set _ $err
+    } {BUSY*}
+
+    test {XSTREAM can CREATE an empty stream with specific ID} {
+        r del mystream
+        r XSTREAM CREATE mystream "100-100"
+        assert {[dict get [r xinfo stream mystream] length] == 0}
+        assert {[dict get [r xinfo stream mystream] last-generated-id] == "100-100"}
+    }
+
+    test {XSTREAM can SETID with $} {
+        r XSTREAM SETID mystream $
+        assert {[dict get [r xinfo stream mystream] last-generated-id] == "100-100"}
+    }
+
+    test {XSTREAM can SETID with specific ID} {
+        r XSTREAM SETID mystream "200-0"
+        assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
+    }
+
+    test {XSTREAM cannot SETID with smaller ID} {
+        catch {r XSTREAM SETID mystream "0-0"} err
+        set _ $err
+    } {ERR*smaller*}
+
+    test {XSTREAM cannot SETID on non-existent key} {
+        catch {r XSTREAM SETID stream $} err
+        set _ $err
+    } {ERR no such key}
+}

From b3e80d2f654a66358c53addffd34945363cce2bb Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Mon, 8 Oct 2018 21:23:38 +0800
Subject: [PATCH 3/5] Stream & AOF: rewrite stream in correct way

---
 src/aof.c | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/src/aof.c b/src/aof.c
index f8f26bdf..3f914b77 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1121,23 +1121,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
     streamID id;
     int64_t numfields;
 
-    /* Reconstruct the stream data using XADD commands. */
-    while(streamIteratorGetID(&si,&id,&numfields)) {
-        /* Emit a two elements array for each item. The first is
-         * the ID, the second is an array of field-value pairs. */
+    if (s->length) {
+        /* Reconstruct the stream data using XADD commands. */
+        while(streamIteratorGetID(&si,&id,&numfields)) {
+            /* Emit a two elements array for each item. The first is
+             * the ID, the second is an array of field-value pairs. */
 
-        /* Emit the XADD <key> <id> ...fields... command. */
-        if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
-        if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
-        if (rioWriteBulkObject(r,key) == 0) return 0;
-        if (rioWriteBulkStreamID(r,&id) == 0) return 0;
-        while(numfields--) {
-            unsigned char *field, *value;
-            int64_t field_len, value_len;
-            streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
-            if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
-            if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+            /* Emit the XADD <key> <id> ...fields... command. */
+            if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
+            if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
+            if (rioWriteBulkObject(r,key) == 0) return 0;
+            if (rioWriteBulkStreamID(r,&id) == 0) return 0;
+            while(numfields--) {
+                unsigned char *field, *value;
+                int64_t field_len, value_len;
+                streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
+                if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
+                if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+            }
         }
+        /* Append XSTREAM SETID after XADD, make sure lastid is correct,
+         * in case of XDEL lastid. */
+        if (rioWriteBulkCount(r,'*',4) == 0) return 0;
+        if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
+        if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
+        if (rioWriteBulkObject(r,key) == 0) return 0;
+        if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
+    } else {
+        /* Using XSTREAM CREATE if the stream is empty. */
+        if (rioWriteBulkCount(r,'*',4) == 0) return 0;
+        if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
+        if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
+        if (rioWriteBulkObject(r,key) == 0) return 0;
+        if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
     }
 
     /* Create all the stream consumer groups. */

From 3094eb36263e602731840ededd301ff21e7981d4 Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 9 Oct 2018 15:45:58 +0800
Subject: [PATCH 4/5] Streams: add tests for aof rewrite

---
 tests/unit/type/stream.tcl | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 769ab571..9acf11c7 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -402,3 +402,26 @@ start_server {tags {"xstream command"}} {
         set _ $err
     } {ERR no such key}
 }
+
+start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
+    test {Empty stream can be rewrite into AOF correctly} {
+        r XSTREAM CREATE mystream 0
+        assert {[dict get [r xinfo stream mystream] length] == 0}
+        r bgrewriteaof
+        waitForBgrewriteaof r
+        r debug loadaof
+        assert {[dict get [r xinfo stream mystream] length] == 0}
+    }
+
+    test {Stream can be rewrite into AOF correctly after XDEL lastid} {
+        r XADD mystream 1-1 a b
+        r XADD mystream 2-2 a b
+        assert {[dict get [r xinfo stream mystream] length] == 2}
+        r XDEL mystream 2-2
+        r bgrewriteaof
+        waitForBgrewriteaof r
+        r debug loadaof
+        assert {[dict get [r xinfo stream mystream] length] == 1}
+        assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
+    }
+}

From 480e2994369de43e3b13019ee9995906d68dd14e Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 9 Oct 2018 16:22:30 +0800
Subject: [PATCH 5/5] Streams: rewrite id in XSTREAM CREATE *

---
 src/t_stream.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/t_stream.c b/src/t_stream.c
index f94f0f60..b20cf125 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1779,6 +1779,10 @@ NULL
             s->last_id = id;
             dbAdd(c->db,c->argv[2],o);
 
+            robj *idarg = createObjectFromStreamID(&id);
+            rewriteClientCommandArgument(c,3,idarg);
+            decrRefCount(idarg);
+
             addReply(c,shared.ok);
             server.dirty++;
             notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",