CG: XREADGROUP can fetch data from the consumer PEL.

This commit is contained in:
antirez 2018-01-25 11:30:28 +01:00
parent aa808394f6
commit 5bbd117c29

View File

@ -751,8 +751,6 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
addReplyBulkCBuffer(c,key,key_len);
addReplyBulkCBuffer(c,value,value_len);
}
arraylen++;
if (count && count == arraylen) break;
/* If a group is passed, we need to create an entry in the
* PEL (pending entries list) of this group *and* this consumer.
@ -769,6 +767,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
serverAssert(retval == 2); /* Make sure entry was inserted. */
}
arraylen++;
if (count && count == arraylen) break;
}
streamIteratorStop(&si);
if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
@ -793,15 +794,17 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
streamEncodeID(startkey,start);
if (end) streamEncodeID(endkey,start);
if (end) streamEncodeID(endkey,end);
size_t arraylen = 0;
void *arraylen_ptr = addDeferredMultiBulkLength(c);
raxStart(&ri,consumer->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
while(raxNext(&ri)) {
if (end && memcmp(end,ri.key,ri.key_len) > 0) break;
if (streamReplyWithRange(c,s,start,end,1,0,NULL,NULL,
while(raxNext(&ri) && (!count || arraylen < count)) {
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
@ -813,10 +816,16 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
streamDecodeID(ri.key,&id);
addReplyStreamID(c,&id);
addReply(c,shared.nullmultibulk);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
nack->delivery_count++;
}
arraylen++;
}
raxStop(&ri);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
return arraylen;
}
/* -----------------------------------------------------------------------
@ -1319,15 +1328,17 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name,
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (c == raxNotFound) {
c = zmalloc(sizeof(*c));
c->name = sdsdup(name);
c->pel = raxNew();
if (consumer == raxNotFound) {
consumer = zmalloc(sizeof(*consumer));
consumer->name = sdsdup(name);
consumer->pel = raxNew();
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
c->seen_time = mstime();
return c;
consumer->seen_time = mstime();
return consumer;
}
/* -----------------------------------------------------------------------