diff --git a/src/blocked.c b/src/blocked.c index e0dd5672..e62d7548 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -370,6 +370,29 @@ void handleClientsBlockedOnKeys(void) { if (receiver->btype != BLOCKED_STREAM) continue; streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key); + + /* If we blocked in the context of a consumer + * group, we need to resolve the group and update the + * last ID the client is blocked for: this is needed + * because serving other clients in the same consumer + * group will alter the "last ID" of the consumer + * group, and clients blocked in a consumer group are + * always blocked for the ">" ID: we need to deliver + * only new messages and avoid unblocking the client + * otherwise. */ + streamCG *group = NULL; + if (receiver->bpop.xread_group) { + group = streamLookupCG(s, + receiver->bpop.xread_group->ptr); + /* If the group was not found, send an error + * to the consumer. */ + if (!group) { + /* XXX: Fixme, send the error. */ + } else { + *gt = group->last_id; + } + } + if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) @@ -377,21 +400,8 @@ void handleClientsBlockedOnKeys(void) { streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ - /* If we blocked in the context of a consumer - * group, we need to resolve the group and - * consumer here. */ - streamCG *group = NULL; + /* Lookup the consuemr for the group, if any. */ streamConsumer *consumer = NULL; - if (receiver->bpop.xread_group) { - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* In theory if the group is not found we - * just perform the read without the group, - * but actually when the group, or the key - * itself is deleted (triggering the removal - * of the group), we check for blocked clients - * and send them an error. */ - } if (group) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr,