diff --git a/src/t_stream.c b/src/t_stream.c index 0277f72a..4cc1bb02 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -751,8 +751,6 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end addReplyBulkCBuffer(c,key,key_len); addReplyBulkCBuffer(c,value,value_len); } - arraylen++; - if (count && count == arraylen) break; /* If a group is passed, we need to create an entry in the * PEL (pending entries list) of this group *and* this consumer. @@ -769,6 +767,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); serverAssert(retval == 2); /* Make sure entry was inserted. */ } + + arraylen++; + if (count && count == arraylen) break; } streamIteratorStop(&si); if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); @@ -793,15 +794,17 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start unsigned char startkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)]; streamEncodeID(startkey,start); - if (end) streamEncodeID(endkey,start); + if (end) streamEncodeID(endkey,end); size_t arraylen = 0; void *arraylen_ptr = addDeferredMultiBulkLength(c); raxStart(&ri,consumer->pel); raxSeek(&ri,">=",startkey,sizeof(startkey)); - while(raxNext(&ri)) { - if (end && memcmp(end,ri.key,ri.key_len) > 0) break; - if (streamReplyWithRange(c,s,start,end,1,0,NULL,NULL, + while(raxNext(&ri) && (!count || arraylen < count)) { + if (end && memcmp(ri.key,end,ri.key_len) > 0) break; + streamID thisid; + streamDecodeID(ri.key,&thisid); + if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES) == 0) { /* Note that we may have a not acknowledged entry in the PEL @@ -813,10 +816,16 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamDecodeID(ri.key,&id); addReplyStreamID(c,&id); addReply(c,shared.nullmultibulk); + } else { + streamNACK *nack = ri.data; + nack->delivery_time = mstime(); + nack->delivery_count++; } + arraylen++; } raxStop(&ri); setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + return arraylen; } /* ----------------------------------------------------------------------- @@ -1319,15 +1328,17 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { - streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name, - sdslen(name)); - if (c == raxNotFound) { - c = zmalloc(sizeof(*c)); - c->name = sdsdup(name); - c->pel = raxNew(); + streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, + sdslen(name)); + if (consumer == raxNotFound) { + consumer = zmalloc(sizeof(*consumer)); + consumer->name = sdsdup(name); + consumer->pel = raxNew(); + raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), + consumer,NULL); } - c->seen_time = mstime(); - return c; + consumer->seen_time = mstime(); + return consumer; } /* -----------------------------------------------------------------------