From b65fe09bb8d00d9edf07f29274e0405a9d802fe3 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 26 Jan 2018 11:57:19 +0100 Subject: [PATCH] CG: Now XREADGROUP + blocking operations work. --- src/blocked.c | 30 ++++++++++++++++++++++++++---- src/stream.h | 2 ++ src/t_stream.c | 18 ++++++++++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 0bbbe6c6..8f472f2b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -313,9 +313,31 @@ void handleClientsBlockedOnKeys(void) { { streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ + + /* If we blocked in the context of a consumer + * group, we need to resolve the group and + * consumer here. */ + streamCG *group = NULL; + streamConsumer *consumer = NULL; + if (receiver->bpop.xread_group) { + group = streamLookupCG(s, + receiver->bpop.xread_group->ptr); + /* In theory if the group is not found we + * just perform the read without the group, + * but actually when the group, or the key + * itself is deleted (triggering the removal + * of the group), we check for blocked clients + * and send them an error. */ + } + if (group) { + consumer = streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr); + } + /* Note that after we unblock the client, 'gt' - * is no longer valid, so we must do it after - * we copied the ID into the 'start' variable. */ + * and other receiver->bpop stuff are no longer + * valid, so we must do the setup above before + * this call. */ unblockClient(receiver); /* Emit the two elements sub-array consisting of @@ -326,8 +348,8 @@ void handleClientsBlockedOnKeys(void) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,rl->key); streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count,0, - NULL,NULL,0); + receiver->bpop.xread_count, + 0, group, consumer, 0); } } } diff --git a/src/stream.h b/src/stream.h index 91739207..908e9ff7 100644 --- a/src/stream.h +++ b/src/stream.h @@ -94,5 +94,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); +streamCG *streamLookupCG(stream *s, sds groupname); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name); #endif diff --git a/src/t_stream.c b/src/t_stream.c index 3f87b576..f4babd3f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -41,8 +41,6 @@ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); -streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamNACK *streamCreateNACK(streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); @@ -1242,8 +1240,20 @@ void xreadCommand(client *c) { * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; - c->bpop.xread_group = groupname; - c->bpop.xread_consumer = consumername; + + /* If this is a XREADGROUP + GROUP we need to remember for which + * group and consumer name we are blocking, so later when one of the + * keys receive more data, we can call streamReplyWithRange() passing + * the right arguments. */ + if (groupname) { + incrRefCount(groupname); + incrRefCount(consumername); + c->bpop.xread_group = groupname; + c->bpop.xread_consumer = consumername; + } else { + c->bpop.xread_group = NULL; + c->bpop.xread_consumer = NULL; + } goto cleanup; }