diff --git a/src/blocked.c b/src/blocked.c index 8f472f2b..2de79837 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -331,7 +331,8 @@ void handleClientsBlockedOnKeys(void) { } if (group) { consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr); + receiver->bpop.xread_consumer->ptr, + 1); } /* Note that after we unblock the client, 'gt' diff --git a/src/stream.h b/src/stream.h index 908e9ff7..bd999d77 100644 --- a/src/stream.h +++ b/src/stream.h @@ -95,6 +95,6 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); #endif diff --git a/src/t_stream.c b/src/t_stream.c index c659a701..95691157 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1213,7 +1213,7 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[i+streams_arg]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr); + consumername->ptr,1); streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, consumer, noack); @@ -1337,10 +1337,11 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { + if (!create) return NULL; consumer = zmalloc(sizeof(*consumer)); consumer->name = sdsdup(name); consumer->pel = raxNew(); @@ -1542,8 +1543,14 @@ void xpendingCommand(client *c) { /* XPENDING [] variant. */ else { streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr): + streamLookupConsumer(group,consumername->ptr,0): NULL; + + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumername && consumer == NULL) + addReplyMultiBulkLen(c,0); + rax *pel = consumer ? consumer->pel : group->pel; unsigned char startkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)]; @@ -1557,10 +1564,11 @@ void xpendingCommand(client *c) { void *arraylen_ptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; - while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { + while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { streamNACK *nack = ri.data; arraylen++; + count--; addReplyMultiBulkLen(c,4); /* Entry ID. */