From 1a02b5f6eea523d8020b189ef6e96a515bd3f23d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 10 Jul 2018 11:34:17 +0200 Subject: [PATCH] Streams: make blocking for > a truly special case. To simplify the semantics of blocking for a group, this commit changes the implementation to better match the description we provide of conusmer groups: blocking for > will make the consumer waiting for new elements in the group. However blocking for any other ID will always serve the local history of the consumer. However it must be noted that the > ID is actually an alias for the special ID ms/seq of UINT64_MAX,UINT64_MAX. --- src/t_stream.c | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 5c03c972..a4702b0c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1355,7 +1355,11 @@ void xreadCommand(client *c) { " option."); goto cleanup; } - ids[id_idx] = group->last_id; + /* We use just the maximum ID to signal this is a ">" ID, anyway + * the code handling the blocking clients will have to update the + * ID later in order to match the changing consumer group last ID. */ + ids[id_idx].ms = UINT64_MAX; + ids[id_idx].seq = UINT64_MAX; continue; } if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) @@ -1370,9 +1374,30 @@ void xreadCommand(client *c) { if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ - if (s->last_id.ms > gt->ms || - (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) - { + int serve_synchronously = 0; + + /* Check if there are the conditions to serve the client synchronously. */ + if (groups) { + /* If the consumer is blocked on a group, we always serve it + * synchronously (serving its local history) if the ID specified + * was not the special ">" ID. */ + if (gt->ms != UINT64_MAX || + gt->seq != UINT64_MAX) + { + serve_synchronously = 1; + } + } else { + /* For clients without a consumer group specified, we serve the + * client synchronously only if we can actually provide at least + * one item from the stream. */ + if (s->last_id.ms > gt->ms || + (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) + { + serve_synchronously = 1; + } + } + + if (serve_synchronously) { arraylen++; if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); /* streamReplyWithRange() handles the 'start' ID as inclusive,