CG: XPENDING should not create consumers and obey to count.

This commit is contained in:
antirez 2018-01-29 18:32:38 +01:00
parent f3708af7f9
commit e76fb4ab25
3 changed files with 15 additions and 6 deletions

View File

@ -331,7 +331,8 @@ void handleClientsBlockedOnKeys(void) {
} }
if (group) { if (group) {
consumer = streamLookupConsumer(group, consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr); receiver->bpop.xread_consumer->ptr,
1);
} }
/* Note that after we unblock the client, 'gt' /* Note that after we unblock the client, 'gt'

View File

@ -95,6 +95,6 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
#endif #endif

View File

@ -1213,7 +1213,7 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[i+streams_arg]); addReplyBulk(c,c->argv[i+streams_arg]);
streamConsumer *consumer = NULL; streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i], if (groups) consumer = streamLookupConsumer(groups[i],
consumername->ptr); consumername->ptr,1);
streamReplyWithRange(c,s,&start,NULL,count,0, streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL, groups ? groups[i] : NULL,
consumer, noack); consumer, noack);
@ -1337,10 +1337,11 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect * consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and * of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */ * the existing consumer reference returned. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name)); sdslen(name));
if (consumer == raxNotFound) { if (consumer == raxNotFound) {
if (!create) return NULL;
consumer = zmalloc(sizeof(*consumer)); consumer = zmalloc(sizeof(*consumer));
consumer->name = sdsdup(name); consumer->name = sdsdup(name);
consumer->pel = raxNew(); consumer->pel = raxNew();
@ -1542,8 +1543,14 @@ void xpendingCommand(client *c) {
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */ /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else { else {
streamConsumer *consumer = consumername ? streamConsumer *consumer = consumername ?
streamLookupConsumer(group,consumername->ptr): streamLookupConsumer(group,consumername->ptr,0):
NULL; NULL;
/* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */
if (consumername && consumer == NULL)
addReplyMultiBulkLen(c,0);
rax *pel = consumer ? consumer->pel : group->pel; rax *pel = consumer ? consumer->pel : group->pel;
unsigned char startkey[sizeof(streamID)]; unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)];
@ -1557,10 +1564,11 @@ void xpendingCommand(client *c) {
void *arraylen_ptr = addDeferredMultiBulkLength(c); void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0; size_t arraylen = 0;
while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
streamNACK *nack = ri.data; streamNACK *nack = ri.data;
arraylen++; arraylen++;
count--;
addReplyMultiBulkLen(c,4); addReplyMultiBulkLen(c,4);
/* Entry ID. */ /* Entry ID. */