Streams: fix unblocking logic into a consumer group.

When a client blocks for a consumer group, we don't know the actual ID
we want to be served: other clients blocked in the same consumer group
may be served first, so the consumer group latest delivered ID changes.
This was not handled correctly, all the clients in the consumer group
were unblocked without data but the first.
This commit is contained in:
antirez 2018-07-10 11:11:39 +02:00
parent b67f027699
commit 09327f11dd

View File

@ -370,6 +370,29 @@ void handleClientsBlockedOnKeys(void) {
if (receiver->btype != BLOCKED_STREAM) continue;
streamID *gt = dictFetchValue(receiver->bpop.keys,
rl->key);
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
* last ID the client is blocked for: this is needed
* because serving other clients in the same consumer
* group will alter the "last ID" of the consumer
* group, and clients blocked in a consumer group are
* always blocked for the ">" ID: we need to deliver
* only new messages and avoid unblocking the client
* otherwise. */
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* If the group was not found, send an error
* to the consumer. */
if (!group) {
/* XXX: Fixme, send the error. */
} else {
*gt = group->last_id;
}
}
if (s->last_id.ms > gt->ms ||
(s->last_id.ms == gt->ms &&
s->last_id.seq > gt->seq))
@ -377,21 +400,8 @@ void handleClientsBlockedOnKeys(void) {
streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */
/* If we blocked in the context of a consumer
* group, we need to resolve the group and
* consumer here. */
streamCG *group = NULL;
/* Lookup the consuemr for the group, if any. */
streamConsumer *consumer = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
/* In theory if the group is not found we
* just perform the read without the group,
* but actually when the group, or the key
* itself is deleted (triggering the removal
* of the group), we check for blocked clients
* and send them an error. */
}
if (group) {
consumer = streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr,