CG: XINFO STREAM.

This commit is contained in:
antirez 2018-03-13 16:54:23 +01:00
parent d7d8cd0b2f
commit 6614361615

View File

@ -1871,7 +1871,7 @@ NULL
char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */ char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */
/* Lookup the key now, this is common for all the subcommands but HELP. */ /* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 3 && strcasecmp(opt,"HELP")) { if (c->argc >= 2 && strcasecmp(opt,"HELP")) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL) return; if (o == NULL) return;
s = o->ptr; s = o->ptr;
@ -1880,7 +1880,6 @@ NULL
/* Dispatch the different subcommands. */ /* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
/* XINFO <key> CONSUMERS <group>. */ /* XINFO <key> CONSUMERS <group>. */
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
if (cg == NULL) { if (cg == NULL) {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
@ -1910,7 +1909,6 @@ NULL
raxStop(&ri); raxStop(&ri);
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
/* XINFO <key> GROUPS. */ /* XINFO <key> GROUPS. */
if (s->cgroups == NULL) { if (s->cgroups == NULL) {
addReplyMultiBulkLen(c,0); addReplyMultiBulkLen(c,0);
return; return;
@ -1931,6 +1929,34 @@ NULL
addReplyLongLong(c,raxSize(cg->pel)); addReplyLongLong(c,raxSize(cg->pel));
} }
raxStop(&ri); raxStop(&ri);
} else if (c->argc == 2 ||
(!strcasecmp(opt,"STREAM") && c->argc == 3))
{
/* XINFO <key> STREAM (or the alias XINFO <key>). */
addReplyMultiBulkLen(c,12);
addReplyStatus(c,"length");
addReplyLongLong(c,s->length);
addReplyStatus(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyStatus(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyStatus(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
/* To emit the first/last entry we us the streamReplyWithRange()
* API. */
int count;
streamID start, end;
start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX;
addReplyStatus(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES);
if (!count) addReply(c,shared.nullbulk);
addReplyStatus(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES);
if (!count) addReply(c,shared.nullbulk);
} else if (!strcasecmp(opt,"HELP")) { } else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help); addReplyHelp(c, help);
} else { } else {