mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
CG: XINFO CONSUMERS implemented.
This commit is contained in:
parent
67eeeb0b10
commit
0cf6b1e3ae
@ -312,6 +312,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
|
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
|
||||||
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
|
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
|
||||||
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
|
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
|
||||||
|
{"xinfo",xinfoCommand,-2,"r",0,NULL,1,1,1,0,0},
|
||||||
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||||
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||||
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
|
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
|
||||||
|
@ -2029,6 +2029,7 @@ void xgroupCommand(client *c);
|
|||||||
void xackCommand(client *c);
|
void xackCommand(client *c);
|
||||||
void xpendingCommand(client *c);
|
void xpendingCommand(client *c);
|
||||||
void xclaimCommand(client *c);
|
void xclaimCommand(client *c);
|
||||||
|
void xinfoCommand(client *c);
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||||
|
@ -1858,3 +1858,54 @@ void xclaimCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */
|
/* XINFO <key> [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */
|
||||||
|
void xinfoCommand(client *c) {
|
||||||
|
const char *help[] = {
|
||||||
|
"<key> CONSUMERS <groupname> -- Show consumer groups of group <groupname>.",
|
||||||
|
"<key> GROUPS -- Show the stream consumer groups.",
|
||||||
|
"<key> STREAM -- Show information about the stream.",
|
||||||
|
"<key> HELP -- Prints this help.",
|
||||||
|
NULL
|
||||||
|
};
|
||||||
|
stream *s = NULL;
|
||||||
|
char *opt = c->argc > 2 ? c->argv[2]->ptr : "STREAM"; /* Subcommand. */
|
||||||
|
|
||||||
|
/* Lookup the key now, this is common for all the subcommands but HELP. */
|
||||||
|
if (c->argc >= 3 && strcasecmp(opt,"HELP")) {
|
||||||
|
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
|
||||||
|
if (o == NULL) return;
|
||||||
|
s = o->ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Dispatch the different subcommands. */
|
||||||
|
if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
|
||||||
|
streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
|
||||||
|
if (cg == NULL) {
|
||||||
|
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
|
||||||
|
"for key name '%s'",
|
||||||
|
c->argv[3]->ptr, c->argv[1]->ptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
addReplyMultiBulkLen(c,raxSize(cg->consumers));
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,cg->consumers);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
mstime_t now = mstime();
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
streamConsumer *consumer = ri.data;
|
||||||
|
mstime_t idle = now - consumer->seen_time;
|
||||||
|
if (idle < 0) idle = 0;
|
||||||
|
|
||||||
|
addReplyMultiBulkLen(c,3);
|
||||||
|
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
|
||||||
|
addReplyLongLong(c,raxSize(consumer->pel));
|
||||||
|
addReplyLongLong(c,idle);
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
} else if (!strcasecmp(opt,"HELP")) {
|
||||||
|
addReplyHelp(c, help);
|
||||||
|
} else {
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user