diff --git a/src/t_stream.c b/src/t_stream.c index 5c03c972..a4702b0c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1355,7 +1355,11 @@ void xreadCommand(client *c) { " option."); goto cleanup; } - ids[id_idx] = group->last_id; + /* We use just the maximum ID to signal this is a ">" ID, anyway + * the code handling the blocking clients will have to update the + * ID later in order to match the changing consumer group last ID. */ + ids[id_idx].ms = UINT64_MAX; + ids[id_idx].seq = UINT64_MAX; continue; } if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) @@ -1370,9 +1374,30 @@ void xreadCommand(client *c) { if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ - if (s->last_id.ms > gt->ms || - (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) - { + int serve_synchronously = 0; + + /* Check if there are the conditions to serve the client synchronously. */ + if (groups) { + /* If the consumer is blocked on a group, we always serve it + * synchronously (serving its local history) if the ID specified + * was not the special ">" ID. */ + if (gt->ms != UINT64_MAX || + gt->seq != UINT64_MAX) + { + serve_synchronously = 1; + } + } else { + /* For clients without a consumer group specified, we serve the + * client synchronously only if we can actually provide at least + * one item from the stream. */ + if (s->last_id.ms > gt->ms || + (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) + { + serve_synchronously = 1; + } + } + + if (serve_synchronously) { arraylen++; if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); /* streamReplyWithRange() handles the 'start' ID as inclusive,