CG: Now XREADGROUP + blocking operations work.

This commit is contained in:
antirez 2018-01-26 11:57:19 +01:00
parent 5ad29325fe
commit b65fe09bb8
3 changed files with 42 additions and 8 deletions

View File

@ -313,9 +313,31 @@ void handleClientsBlockedOnKeys(void) {
{ {
streamID start = *gt; streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */ start.seq++; /* Can't overflow, it's an uint64_t */
/* If we blocked in the context of a consumer
* group, we need to resolve the group and
* consumer here. */
streamCG *group = NULL;
streamConsumer *consumer = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* In theory if the group is not found we
* just perform the read without the group,
* but actually when the group, or the key
* itself is deleted (triggering the removal
* of the group), we check for blocked clients
* and send them an error. */
}
if (group) {
consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr);
}
/* Note that after we unblock the client, 'gt' /* Note that after we unblock the client, 'gt'
* is no longer valid, so we must do it after * and other receiver->bpop stuff are no longer
* we copied the ID into the 'start' variable. */ * valid, so we must do the setup above before
* this call. */
unblockClient(receiver); unblockClient(receiver);
/* Emit the two elements sub-array consisting of /* Emit the two elements sub-array consisting of
@ -326,8 +348,8 @@ void handleClientsBlockedOnKeys(void) {
addReplyMultiBulkLen(receiver,2); addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,rl->key); addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL, streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,0, receiver->bpop.xread_count,
NULL,NULL,0); 0, group, consumer, 0);
} }
} }
} }

View File

@ -94,5 +94,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
#endif #endif

View File

@ -41,8 +41,6 @@
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamNACK *streamCreateNACK(streamConsumer *consumer); streamNACK *streamCreateNACK(streamConsumer *consumer);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
@ -1242,8 +1240,20 @@ void xreadCommand(client *c) {
* in case the ID provided is too low, we do not want the server to * in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */ * block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername; /* If this is a XREADGROUP + GROUP we need to remember for which
* group and consumer name we are blocking, so later when one of the
* keys receive more data, we can call streamReplyWithRange() passing
* the right arguments. */
if (groupname) {
incrRefCount(groupname);
incrRefCount(consumername);
c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername;
} else {
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
}
goto cleanup; goto cleanup;
} }