mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
CG: XPENDING with start/stop/count variant implemented.
This commit is contained in:
parent
1bc31666da
commit
f3708af7f9
@ -1447,7 +1447,7 @@ void xackCommand(client *c) {
|
|||||||
addReplyLongLong(c,acknowledged);
|
addReplyLongLong(c,acknowledged);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* XPENDING <key> <group> [<start> <stop> <max>] [<consumer>]
|
/* XPENDING <key> <group> [<start> <stop> <count>] [<consumer>]
|
||||||
*
|
*
|
||||||
* If start and stop are omitted, the command just outputs information about
|
* If start and stop are omitted, the command just outputs information about
|
||||||
* the amount of pending messages for the key/group pair, together with
|
* the amount of pending messages for the key/group pair, together with
|
||||||
@ -1462,6 +1462,8 @@ void xpendingCommand(client *c) {
|
|||||||
robj *key = c->argv[1];
|
robj *key = c->argv[1];
|
||||||
robj *groupname = c->argv[2];
|
robj *groupname = c->argv[2];
|
||||||
robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
|
robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
|
||||||
|
streamID startid, endid;
|
||||||
|
long long count;
|
||||||
|
|
||||||
/* Start and stop, and the consumer, can be omitted. */
|
/* Start and stop, and the consumer, can be omitted. */
|
||||||
if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
|
if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
|
||||||
@ -1469,6 +1471,17 @@ void xpendingCommand(client *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Parse start/end/count arguments ASAP if needed, in order to report
|
||||||
|
* syntax errors before any other error. */
|
||||||
|
if (c->argc >= 6) {
|
||||||
|
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
|
||||||
|
return;
|
||||||
|
if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
|
||||||
|
return;
|
||||||
|
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Lookup the key and the group inside the stream. */
|
/* Lookup the key and the group inside the stream. */
|
||||||
robj *o = lookupKeyRead(c->db,c->argv[1]);
|
robj *o = lookupKeyRead(c->db,c->argv[1]);
|
||||||
streamCG *group;
|
streamCG *group;
|
||||||
@ -1494,8 +1507,6 @@ void xpendingCommand(client *c) {
|
|||||||
addReply(c,shared.nullbulk); /* End. */
|
addReply(c,shared.nullbulk); /* End. */
|
||||||
addReply(c,shared.nullmultibulk); /* Clients. */
|
addReply(c,shared.nullmultibulk); /* Clients. */
|
||||||
} else {
|
} else {
|
||||||
streamID startid,endid;
|
|
||||||
|
|
||||||
/* Start. */
|
/* Start. */
|
||||||
raxIterator ri;
|
raxIterator ri;
|
||||||
raxStart(&ri,group->pel);
|
raxStart(&ri,group->pel);
|
||||||
@ -1530,6 +1541,47 @@ void xpendingCommand(client *c) {
|
|||||||
}
|
}
|
||||||
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
||||||
else {
|
else {
|
||||||
|
streamConsumer *consumer = consumername ?
|
||||||
|
streamLookupConsumer(group,consumername->ptr):
|
||||||
|
NULL;
|
||||||
|
rax *pel = consumer ? consumer->pel : group->pel;
|
||||||
|
unsigned char startkey[sizeof(streamID)];
|
||||||
|
unsigned char endkey[sizeof(streamID)];
|
||||||
|
raxIterator ri;
|
||||||
|
mstime_t now = mstime();
|
||||||
|
|
||||||
|
streamEncodeID(startkey,&startid);
|
||||||
|
streamEncodeID(endkey,&endid);
|
||||||
|
raxStart(&ri,pel);
|
||||||
|
raxSeek(&ri,">=",startkey,sizeof(startkey));
|
||||||
|
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||||
|
size_t arraylen = 0;
|
||||||
|
|
||||||
|
while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
|
||||||
|
streamNACK *nack = ri.data;
|
||||||
|
|
||||||
|
arraylen++;
|
||||||
|
addReplyMultiBulkLen(c,4);
|
||||||
|
|
||||||
|
/* Entry ID. */
|
||||||
|
streamID id;
|
||||||
|
streamDecodeID(ri.key,&id);
|
||||||
|
addReplyStreamID(c,&id);
|
||||||
|
|
||||||
|
/* Consumer name. */
|
||||||
|
addReplyBulkCBuffer(c,nack->consumer->name,
|
||||||
|
sdslen(nack->consumer->name));
|
||||||
|
|
||||||
|
/* Milliseconds elapsed since last delivery. */
|
||||||
|
mstime_t elapsed = now - nack->delivery_time;
|
||||||
|
if (elapsed < 0) elapsed = 0;
|
||||||
|
addReplyLongLong(c,elapsed);
|
||||||
|
|
||||||
|
/* Number of deliveries. */
|
||||||
|
addReplyLongLong(c,nack->delivery_count);
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user