From 09327f11dd718461b3a1d577729c1e9969f37080 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 10 Jul 2018 11:11:39 +0200 Subject: [PATCH] Streams: fix unblocking logic into a consumer group. When a client blocks for a consumer group, we don't know the actual ID we want to be served: other clients blocked in the same consumer group may be served first, so the consumer group latest delivered ID changes. This was not handled correctly, all the clients in the consumer group were unblocked without data but the first. --- src/blocked.c | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index e0dd5672..e62d7548 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -370,6 +370,29 @@ void handleClientsBlockedOnKeys(void) { if (receiver->btype != BLOCKED_STREAM) continue; streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key); + + /* If we blocked in the context of a consumer + * group, we need to resolve the group and update the + * last ID the client is blocked for: this is needed + * because serving other clients in the same consumer + * group will alter the "last ID" of the consumer + * group, and clients blocked in a consumer group are + * always blocked for the ">" ID: we need to deliver + * only new messages and avoid unblocking the client + * otherwise. */ + streamCG *group = NULL; + if (receiver->bpop.xread_group) { + group = streamLookupCG(s, + receiver->bpop.xread_group->ptr); + /* If the group was not found, send an error + * to the consumer. */ + if (!group) { + /* XXX: Fixme, send the error. */ + } else { + *gt = group->last_id; + } + } + if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) @@ -377,21 +400,8 @@ void handleClientsBlockedOnKeys(void) { streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ - /* If we blocked in the context of a consumer - * group, we need to resolve the group and - * consumer here. */ - streamCG *group = NULL; + /* Lookup the consuemr for the group, if any. */ streamConsumer *consumer = NULL; - if (receiver->bpop.xread_group) { - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* In theory if the group is not found we - * just perform the read without the group, - * but actually when the group, or the key - * itself is deleted (triggering the removal - * of the group), we check for blocked clients - * and send them an error. */ - } if (group) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr,