Streams: add a new command XTREAM

XSTREAM CREATE <key> <id or *> -- Create a new empty stream.
XSTREAM SETID <key> <id or $>  -- Set the current stream ID.
This commit is contained in:
zhaozhao.zz 2018-10-09 13:11:02 +08:00
parent 3e78344d87
commit ec511fa709
3 changed files with 67 additions and 0 deletions

View File

@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0},
{"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},

View File

@ -2107,6 +2107,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
void xgroupCommand(client *c);
void xstreamCommand(client *c);
void xackCommand(client *c);
void xpendingCommand(client *c);
void xclaimCommand(client *c);

View File

@ -1746,6 +1746,71 @@ NULL
}
}
/* XSTREAM CREATE <key> <id or *>
* XSTREAM SETID <key> <id or $> */
void xstreamCommand(client *c) {
const char *help[] = {
"CREATE <key> <id or *> -- Create a new empty stream.",
"SETID <key> <id or $> -- Set the current stream ID.",
"HELP -- Prints this help.",
NULL
};
stream *s = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 4) {
robj *o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
addReplySds(c,
sdsnew("-BUSYSTREAM Stream already exists\r\n"));
return;
} else {
streamID id;
if (!strcmp(c->argv[3]->ptr,"*")) {
id.ms = mstime();
id.seq = 0;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
o = createStreamObject();
s = o->ptr;
s->last_id = id;
dbAdd(c->db,c->argv[2],o);
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",
c->argv[2],c->db->id);
}
} else if (!strcasecmp(opt,"SETID") && c->argc == 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
streamID id;
if (!strcmp(c->argv[3]->ptr,"$")) {
id = s->last_id;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
if (streamCompareID(&id,&s->last_id) < 0) {
addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the "
"target stream top item");
return;
}
s->last_id = id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid",
c->argv[2],c->db->id);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {
addReplySubcommandSyntaxError(c);
}
}
/* XACK <key> <group> <id> <id> ... <id>
*
* Acknowledge a message as processed. In practical terms we just check the