Streams: XSTREAM SETID -> XSETID.

Keep vanilla stream commands at toplevel, see #5426.
This commit is contained in:
antirez 2018-10-16 13:17:14 +02:00
parent af09df08d7
commit e3446fea9e
3 changed files with 17 additions and 67 deletions

View File

@ -314,7 +314,7 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0}, {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0}, {"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0}, {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0}, {"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},

View File

@ -2111,7 +2111,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c); void xlenCommand(client *c);
void xreadCommand(client *c); void xreadCommand(client *c);
void xgroupCommand(client *c); void xgroupCommand(client *c);
void xstreamCommand(client *c); void xsetidCommand(client *c);
void xackCommand(client *c); void xackCommand(client *c);
void xpendingCommand(client *c); void xpendingCommand(client *c);
void xclaimCommand(client *c); void xclaimCommand(client *c);

View File

@ -1775,73 +1775,23 @@ NULL
} }
} }
/* XSTREAM CREATE <key> <id or *> /* Set the internal "last ID" of a stream. */
* XSTREAM SETID <key> <id or $> */ void xsetidCommand(client *c) {
void xstreamCommand(client *c) { robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
const char *help[] = {
"CREATE <key> <id or *> -- Create a new empty stream.",
"SETID <key> <id or $> -- Set the current stream ID.",
"HELP -- Prints this help.",
NULL
};
stream *s = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 4) {
robj *o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
addReplySds(c,
sdsnew("-BUSYSTREAM Stream already exists\r\n"));
return;
} else {
streamID id;
if (!strcmp(c->argv[3]->ptr,"*")) {
id.ms = mstime();
id.seq = 0;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
o = createStreamObject();
s = o->ptr;
s->last_id = id;
dbAdd(c->db,c->argv[2],o);
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,3,idarg);
decrRefCount(idarg);
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",
c->argv[2],c->db->id);
}
} else if (!strcasecmp(opt,"SETID") && c->argc == 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return; if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
stream *s = o->ptr;
streamID id; streamID id;
if (!strcmp(c->argv[3]->ptr,"$")) { if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) return;
id = s->last_id;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
if (streamCompareID(&id,&s->last_id) < 0) { if (streamCompareID(&id,&s->last_id) < 0) {
addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the " addReplyError(c,"The ID specified in XSETID is smaller than the "
"target stream top item"); "target stream top item");
return; return;
} }
s->last_id = id; s->last_id = id;
addReply(c,shared.ok); addReply(c,shared.ok);
server.dirty++; server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid", notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
c->argv[2],c->db->id);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {
addReplySubcommandSyntaxError(c);
}
} }
/* XACK <key> <group> <id> <id> ... <id> /* XACK <key> <group> <id> <id> ... <id>