Streams: synchronous xread fixes and improvements.

This commit is contained in:
antirez 2017-09-08 12:09:02 +02:00
parent a7d898334a
commit 6a1c92d52d

View File

@ -395,8 +395,8 @@ void xlenCommand(client *c) {
} }
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ... * [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
* key_N ID_N */ * ID_1 ID_2 ... ID_N */
void xreadCommand(client *c) { void xreadCommand(client *c) {
long long timeout = 0; long long timeout = 0;
long long count = 0; long long count = 0;
@ -453,12 +453,13 @@ void xreadCommand(client *c) {
* starting from now. */ * starting from now. */
if (strcmp(c->argv[i]->ptr,"$") == 0) { if (strcmp(c->argv[i]->ptr,"$") == 0) {
robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]); robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]);
int id_idx = i - streams_arg - streams_count;
if (o) { if (o) {
stream *s = o->ptr; stream *s = o->ptr;
ids[i] = s->last_id; ids[id_idx] = s->last_id;
} else { } else {
ids[i].ms = 0; ids[id_idx].ms = 0;
ids[i].seq = 0; ids[id_idx].seq = 0;
} }
continue; continue;
} }
@ -466,24 +467,38 @@ void xreadCommand(client *c) {
} }
/* Try to serve the client synchronously. */ /* Try to serve the client synchronously. */
size_t arraylen = 0;
void *arraylen_ptr = NULL;
for (int i = 0; i < streams_count; i++) { for (int i = 0; i < streams_count; i++) {
robj *o = lookupKeyRead(c->db,c->argv[i+streams_arg]); robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
if (o == NULL) continue; if (o == NULL) continue;
stream *s = o->ptr; stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */ streamID *gt = ids+i; /* ID must be greater than this. */
if (s->last_id.ms > gt->ms || if (s->last_id.ms > gt->ms ||
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
{ {
arraylen++;
if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
/* streamReplyWithRange() handles the 'start' ID as inclusive, /* streamReplyWithRange() handles the 'start' ID as inclusive,
* so start from the next ID, since we want only messages with * so start from the next ID, since we want only messages with
* IDs greater than start. */ * IDs greater than start. */
streamID start = *gt; streamID start = *gt;
start.seq++; /* Can't overflow, it's an uint64_t */ start.seq++; /* Can't overflow, it's an uint64_t */
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[i+streams_arg]);
streamReplyWithRange(c,s,&start,NULL,count); streamReplyWithRange(c,s,&start,NULL,count);
goto cleanup;
} }
} }
/* We replied synchronously! Set the top array len and return to caller. */
if (arraylen) {
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
goto cleanup;
}
/* Block if needed. */ /* Block if needed. */
if (timeout) { if (timeout) {
/* If we are inside a MULTI/EXEC and the list is empty the only thing /* If we are inside a MULTI/EXEC and the list is empty the only thing