diff --git a/src/t_stream.c b/src/t_stream.c index 7944ab39..ad6d1c79 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1863,6 +1863,7 @@ void xinfoCommand(client *c) { " CONSUMERS -- Show consumer groups of group .", " GROUPS -- Show the stream consumer groups.", " STREAM -- Show information about the stream.", +" (without subcommand) -- Alias for STREAM.", " HELP -- Prints this help.", NULL }; @@ -1878,6 +1879,8 @@ NULL /* Dispatch the different subcommands. */ if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { + /* XINFO CONSUMERS . */ + streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); if (cg == NULL) { addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " @@ -1896,16 +1899,42 @@ NULL mstime_t idle = now - consumer->seen_time; if (idle < 0) idle = 0; - addReplyMultiBulkLen(c,3); + addReplyMultiBulkLen(c,6); + addReplyStatus(c,"name"); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + addReplyStatus(c,"pending"); addReplyLongLong(c,raxSize(consumer->pel)); + addReplyStatus(c,"idle"); addReplyLongLong(c,idle); } raxStop(&ri); + } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { + /* XINFO GROUPS. */ + + if (s->cgroups == NULL) { + addReplyMultiBulkLen(c,0); + return; + } + + addReplyMultiBulkLen(c,raxSize(s->cgroups)); + raxIterator ri; + raxStart(&ri,s->cgroups); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamCG *cg = ri.data; + addReplyMultiBulkLen(c,6); + addReplyStatus(c,"name"); + addReplyBulkCBuffer(c,ri.key,ri.key_len); + addReplyStatus(c,"consumers"); + addReplyLongLong(c,raxSize(cg->consumers)); + addReplyStatus(c,"pending"); + addReplyLongLong(c,raxSize(cg->pel)); + } + raxStop(&ri); } else if (!strcasecmp(opt,"HELP")) { addReplyHelp(c, help); } else { - addReply(c,shared.syntaxerr); + addReplyError(c,"syntax error, try 'XINFO anykey HELP'"); } }