From a8c1bb310e13ae273cf47e09f42953bf38a26b1e Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 10 Jul 2018 12:01:52 +0200 Subject: [PATCH] Streams: fix new XREADGROUP sync logic. --- src/t_stream.c | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index a4702b0c..52687778 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1385,11 +1385,21 @@ void xreadCommand(client *c) { gt->seq != UINT64_MAX) { serve_synchronously = 1; + } else { + /* We also want to serve a consumer in a consumer group + * synchronously in case the group top item delivered is smaller + * than what the stream has inside. */ + streamID *last = &groups[i]->last_id; + if (s->last_id.ms > last->ms || + (s->last_id.ms == last->ms && s->last_id.seq > last->seq)) + { + serve_synchronously = 1; + *gt = *last; + } } } else { - /* For clients without a consumer group specified, we serve the - * client synchronously only if we can actually provide at least - * one item from the stream. */ + /* For consumers without a group, we serve synchronously if we can + * actually provide at least one item from the stream. */ if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) {