From cb27dd1a6802000eb896e0a0b5c8d50ea994655a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 17 Oct 2018 11:27:09 +0200 Subject: [PATCH] XGROUP CREATE: MKSTREAM option for automatic stream creation. --- src/t_stream.c | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index f7fbe0ba..c366578c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1683,13 +1683,14 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) { * Consumer groups commands * ----------------------------------------------------------------------- */ -/* XGROUP CREATE +/* XGROUP CREATE [MKSTREAM] * XGROUP SETID * XGROUP DESTROY * XGROUP DELCONSUMER */ void xgroupCommand(client *c) { const char *help[] = { -"CREATE -- Create a new consumer group.", +"CREATE [opt] -- Create a new consumer group.", +" option MKSTREAM: create the empty stream if it does not exist.", "SETID -- Set the current group ID.", "DESTROY -- Remove the specified group.", "DELCONSUMER -- Remove the specified consumer.", @@ -1703,8 +1704,31 @@ NULL /* Lookup the key now, this is common for all the subcommands but HELP. */ if (c->argc >= 4) { - robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr); - if (o == NULL || checkType(c,o,OBJ_STREAM)) return; + robj *o = lookupKeyWrite(c->db,c->argv[2]); + + /* CREATE has an MKSTREAM option that creates the stream if it + * does not exist. */ + if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { + if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { + addReplySubcommandSyntaxError(c); + return; + } + if (o == NULL) { + o = createStreamObject(); + dbAdd(c->db,c->argv[2],o); + } + } + + /* At this point key must exist, or there is an error. */ + if (o == NULL) { + addReplyError(c, + "The XGROUP subcommand requires the key to exist. " + "Note that for CREATE you may want to use the MKSTREAM " + "option to create an empty stream automatically."); + return; + } + + if (checkType(c,o,OBJ_STREAM)) return; s = o->ptr; grpname = c->argv[3]->ptr; @@ -1721,7 +1745,7 @@ NULL } /* Dispatch the different subcommands. */ - if (!strcasecmp(opt,"CREATE") && c->argc == 5) { + if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { id = s->last_id;