diff --git a/src/server.c b/src/server.c index 78aee5db..7f9b80ff 100644 --- a/src/server.c +++ b/src/server.c @@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = { {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, + {"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0}, {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0}, diff --git a/src/server.h b/src/server.h index 4c4c0ce5..4c7be2fe 100644 --- a/src/server.h +++ b/src/server.h @@ -2107,6 +2107,7 @@ void xrevrangeCommand(client *c); void xlenCommand(client *c); void xreadCommand(client *c); void xgroupCommand(client *c); +void xstreamCommand(client *c); void xackCommand(client *c); void xpendingCommand(client *c); void xclaimCommand(client *c); diff --git a/src/t_stream.c b/src/t_stream.c index 4387e08a..f94f0f60 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1746,6 +1746,71 @@ NULL } } +/* XSTREAM CREATE + * XSTREAM SETID */ +void xstreamCommand(client *c) { + const char *help[] = { +"CREATE -- Create a new empty stream.", +"SETID -- Set the current stream ID.", +"HELP -- Prints this help.", +NULL + }; + stream *s = NULL; + char *opt = c->argv[1]->ptr; /* Subcommand name. */ + + /* Dispatch the different subcommands. */ + if (!strcasecmp(opt,"CREATE") && c->argc == 4) { + robj *o = lookupKeyWrite(c->db,c->argv[2]); + if (o) { + addReplySds(c, + sdsnew("-BUSYSTREAM Stream already exists\r\n")); + return; + } else { + streamID id; + if (!strcmp(c->argv[3]->ptr,"*")) { + id.ms = mstime(); + id.seq = 0; + } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) { + return; + } + + o = createStreamObject(); + s = o->ptr; + s->last_id = id; + dbAdd(c->db,c->argv[2],o); + + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create", + c->argv[2],c->db->id); + } + } else if (!strcasecmp(opt,"SETID") && c->argc == 4) { + robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr); + if (o == NULL || checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; + streamID id; + if (!strcmp(c->argv[3]->ptr,"$")) { + id = s->last_id; + } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) { + return; + } + if (streamCompareID(&id,&s->last_id) < 0) { + addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the " + "target stream top item"); + return; + } + s->last_id = id; + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid", + c->argv[2],c->db->id); + } else if (!strcasecmp(opt,"HELP")) { + addReplyHelp(c, help); + } else { + addReplySubcommandSyntaxError(c); + } +} + /* XACK ... * * Acknowledge a message as processed. In practical terms we just check the