Streams: make blocking for > a truly special case.

To simplify the semantics of blocking for a group, this commit changes
the implementation to better match the description we provide of
conusmer groups: blocking for > will make the consumer waiting for new
elements in the group. However blocking for any other ID will always
serve the local history of the consumer.

However it must be noted that the > ID is actually an alias for the
special ID ms/seq of UINT64_MAX,UINT64_MAX.
This commit is contained in:
antirez 2018-07-10 11:34:17 +02:00
parent a71e814853
commit 1a02b5f6ee

View File

@ -1355,7 +1355,11 @@ void xreadCommand(client *c) {
"<consumer> option."); "<consumer> option.");
goto cleanup; goto cleanup;
} }
ids[id_idx] = group->last_id; /* We use just the maximum ID to signal this is a ">" ID, anyway
* the code handling the blocking clients will have to update the
* ID later in order to match the changing consumer group last ID. */
ids[id_idx].ms = UINT64_MAX;
ids[id_idx].seq = UINT64_MAX;
continue; continue;
} }
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
@ -1370,9 +1374,30 @@ void xreadCommand(client *c) {
if (o == NULL) continue; if (o == NULL) continue;
stream *s = o->ptr; stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */ streamID *gt = ids+i; /* ID must be greater than this. */
if (s->last_id.ms > gt->ms || int serve_synchronously = 0;
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
{ /* Check if there are the conditions to serve the client synchronously. */
if (groups) {
/* If the consumer is blocked on a group, we always serve it
* synchronously (serving its local history) if the ID specified
* was not the special ">" ID. */
if (gt->ms != UINT64_MAX ||
gt->seq != UINT64_MAX)
{
serve_synchronously = 1;
}
} else {
/* For clients without a consumer group specified, we serve the
* client synchronously only if we can actually provide at least
* one item from the stream. */
if (s->last_id.ms > gt->ms ||
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
{
serve_synchronously = 1;
}
}
if (serve_synchronously) {
arraylen++; arraylen++;
if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
/* streamReplyWithRange() handles the 'start' ID as inclusive, /* streamReplyWithRange() handles the 'start' ID as inclusive,