From ec511fa709f3c8fc74d26ca52114dcb9add754cc Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 9 Oct 2018 13:11:02 +0800
Subject: [PATCH] Streams: add a new command XTREAM

XSTREAM CREATE <key> <id or *> -- Create a new empty stream.
XSTREAM SETID <key> <id or $>  -- Set the current stream ID.
---
 src/server.c   |  1 +
 src/server.h   |  1 +
 src/t_stream.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 67 insertions(+)

diff --git a/src/server.c b/src/server.c
index 78aee5db..7f9b80ff 100644
--- a/src/server.c
+++ b/src/server.c
@@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
     {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
     {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
     {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
+    {"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0},
     {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
     {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
     {"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},
diff --git a/src/server.h b/src/server.h
index 4c4c0ce5..4c7be2fe 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2107,6 +2107,7 @@ void xrevrangeCommand(client *c);
 void xlenCommand(client *c);
 void xreadCommand(client *c);
 void xgroupCommand(client *c);
+void xstreamCommand(client *c);
 void xackCommand(client *c);
 void xpendingCommand(client *c);
 void xclaimCommand(client *c);
diff --git a/src/t_stream.c b/src/t_stream.c
index 4387e08a..f94f0f60 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1746,6 +1746,71 @@ NULL
     }
 }
 
+/* XSTREAM CREATE <key> <id or *>
+ * XSTREAM SETID <key> <id or $> */
+void xstreamCommand(client *c) {
+    const char *help[] = {
+"CREATE <key> <id or *>  -- Create a new empty stream.",
+"SETID  <key> <id or $>  -- Set the current stream ID.",
+"HELP                    -- Prints this help.",
+NULL
+    };
+    stream *s = NULL;
+    char *opt = c->argv[1]->ptr; /* Subcommand name. */
+
+    /* Dispatch the different subcommands. */
+    if (!strcasecmp(opt,"CREATE") && c->argc == 4) {
+        robj *o = lookupKeyWrite(c->db,c->argv[2]);
+        if (o) {
+            addReplySds(c,
+                sdsnew("-BUSYSTREAM Stream already exists\r\n"));
+            return;
+        } else {
+            streamID id;
+            if (!strcmp(c->argv[3]->ptr,"*")) {
+                id.ms = mstime();
+                id.seq = 0;
+            } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
+                return;
+            }
+
+            o = createStreamObject();
+            s = o->ptr;
+            s->last_id = id;
+            dbAdd(c->db,c->argv[2],o);
+
+            addReply(c,shared.ok);
+            server.dirty++;
+            notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",
+                                c->argv[2],c->db->id);
+        }
+    } else if (!strcasecmp(opt,"SETID") && c->argc == 4) {
+        robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
+        if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
+        s = o->ptr;
+        streamID id;
+        if (!strcmp(c->argv[3]->ptr,"$")) {
+            id = s->last_id;
+        } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
+            return;
+        }
+        if (streamCompareID(&id,&s->last_id) < 0) {
+            addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the "
+                            "target stream top item");
+            return;
+        }
+        s->last_id = id;
+        addReply(c,shared.ok);
+        server.dirty++;
+        notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid",
+                            c->argv[2],c->db->id);
+    } else if (!strcasecmp(opt,"HELP")) {
+        addReplyHelp(c, help);
+    } else {
+        addReplySubcommandSyntaxError(c);
+    }
+}
+
 /* XACK <key> <group> <id> <id> ... <id>
  *
  * Acknowledge a message as processed. In practical terms we just check the