diff --git a/src/server.c b/src/server.c index 1b19fd59..4ac1f5bd 100644 --- a/src/server.c +++ b/src/server.c @@ -314,7 +314,7 @@ struct redisCommand redisCommandTable[] = { {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, - {"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0}, + {"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0}, diff --git a/src/server.h b/src/server.h index 048692a5..86d6e2e4 100644 --- a/src/server.h +++ b/src/server.h @@ -2111,7 +2111,7 @@ void xrevrangeCommand(client *c); void xlenCommand(client *c); void xreadCommand(client *c); void xgroupCommand(client *c); -void xstreamCommand(client *c); +void xsetidCommand(client *c); void xackCommand(client *c); void xpendingCommand(client *c); void xclaimCommand(client *c); diff --git a/src/t_stream.c b/src/t_stream.c index e6812c8d..2a7e710f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1775,73 +1775,23 @@ NULL } } -/* XSTREAM CREATE - * XSTREAM SETID */ -void xstreamCommand(client *c) { - const char *help[] = { -"CREATE -- Create a new empty stream.", -"SETID -- Set the current stream ID.", -"HELP -- Prints this help.", -NULL - }; - stream *s = NULL; - char *opt = c->argv[1]->ptr; /* Subcommand name. */ +/* Set the internal "last ID" of a stream. */ +void xsetidCommand(client *c) { + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,OBJ_STREAM)) return; - /* Dispatch the different subcommands. */ - if (!strcasecmp(opt,"CREATE") && c->argc == 4) { - robj *o = lookupKeyWrite(c->db,c->argv[2]); - if (o) { - addReplySds(c, - sdsnew("-BUSYSTREAM Stream already exists\r\n")); - return; - } else { - streamID id; - if (!strcmp(c->argv[3]->ptr,"*")) { - id.ms = mstime(); - id.seq = 0; - } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) { - return; - } - - o = createStreamObject(); - s = o->ptr; - s->last_id = id; - dbAdd(c->db,c->argv[2],o); - - robj *idarg = createObjectFromStreamID(&id); - rewriteClientCommandArgument(c,3,idarg); - decrRefCount(idarg); - - addReply(c,shared.ok); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create", - c->argv[2],c->db->id); - } - } else if (!strcasecmp(opt,"SETID") && c->argc == 4) { - robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr); - if (o == NULL || checkType(c,o,OBJ_STREAM)) return; - s = o->ptr; - streamID id; - if (!strcmp(c->argv[3]->ptr,"$")) { - id = s->last_id; - } else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) { - return; - } - if (streamCompareID(&id,&s->last_id) < 0) { - addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the " - "target stream top item"); - return; - } - s->last_id = id; - addReply(c,shared.ok); - server.dirty++; - notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid", - c->argv[2],c->db->id); - } else if (!strcasecmp(opt,"HELP")) { - addReplyHelp(c, help); - } else { - addReplySubcommandSyntaxError(c); + stream *s = o->ptr; + streamID id; + if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) return; + if (streamCompareID(&id,&s->last_id) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than the " + "target stream top item"); + return; } + s->last_id = id; + addReply(c,shared.ok); + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); } /* XACK ...