diff --git a/src/t_stream.c b/src/t_stream.c index 0820a743..92c62077 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -395,8 +395,8 @@ void xlenCommand(client *c) { } /* XREAD [BLOCK ] [COUNT ] [GROUP ] - * [RETRY ] STREAMS key_1 ID_1 key_2 ID_2 ... - * key_N ID_N */ + * [RETRY ] STREAMS key_1 key_2 ... key_N + * ID_1 ID_2 ... ID_N */ void xreadCommand(client *c) { long long timeout = 0; long long count = 0; @@ -453,12 +453,13 @@ void xreadCommand(client *c) { * starting from now. */ if (strcmp(c->argv[i]->ptr,"$") == 0) { robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]); + int id_idx = i - streams_arg - streams_count; if (o) { stream *s = o->ptr; - ids[i] = s->last_id; + ids[id_idx] = s->last_id; } else { - ids[i].ms = 0; - ids[i].seq = 0; + ids[id_idx].ms = 0; + ids[id_idx].seq = 0; } continue; } @@ -466,24 +467,38 @@ void xreadCommand(client *c) { } /* Try to serve the client synchronously. */ + size_t arraylen = 0; + void *arraylen_ptr = NULL; for (int i = 0; i < streams_count; i++) { - robj *o = lookupKeyRead(c->db,c->argv[i+streams_arg]); + robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) { + arraylen++; + if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); /* streamReplyWithRange() handles the 'start' ID as inclusive, * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ + + /* Emit the two elements sub-array consisting of the name + * of the stream and the data we extracted from it. */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,c->argv[i+streams_arg]); streamReplyWithRange(c,s,&start,NULL,count); - goto cleanup; } } + /* We replied synchronously! Set the top array len and return to caller. */ + if (arraylen) { + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + goto cleanup; + } + /* Block if needed. */ if (timeout) { /* If we are inside a MULTI/EXEC and the list is empty the only thing