diff --git a/src/t_stream.c b/src/t_stream.c index 66c6cb89..485ea29a 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -371,6 +371,7 @@ void xrangeCommand(client *c) { if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) { if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK) return; + if (count < 0) count = 0; } else { addReply(c,shared.syntaxerr); return; @@ -397,6 +398,77 @@ void xlenCommand(client *c) { * [RETRY ] STREAMS key_1 ID_1 key_2 ID_2 ... * key_N ID_N */ void xreadCommand(client *c) { + long long block = 0; + long long count = 0; + int streams_count = 0; + int streams_argc = 0; + #define STREAMID_STATIC_VECTOR_LEN 8 + streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; + streamID *ids = static_ids; + + /* Parse arguments. */ + for (int i = 1; i < c->argc; i++) { + int moreargs = i != c->argc-1; + char *o = c->argv[i]->ptr; + if (!strcasecmp(o,"BLOCK") && moreargs) { + i++; + if (getLongLongFromObjectOrReply(c,c->argv[i],&block,NULL) != C_OK) + return; + if (block < 0) block = 0; + } else if (!strcasecmp(o,"COUNT") && moreargs) { + i++; + if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) + return; + if (count < 0) count = 0; + } else if (!strcasecmp(o,"STREAMS") && moreargs) { + streams_argc = i+1; + streams_count = (c->argc-streams_argc); + if ((streams_count % 2) != 0) { + addReplyError(c,"Unbalanced XREAD list of streams: " + "for each stream key an ID or '$' must be " + "specified."); + return; + } + streams_count /= 2; /* We have two arguments for each stream. */ + break; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + + /* STREAMS option is mandatory. */ + if (streams_argc == 0) { + addReply(c,shared.syntaxerr); + return; + } + + /* Parse the IDs. */ + if (streams_count > STREAMID_STATIC_VECTOR_LEN) + ids = zmalloc(sizeof(streamID)*streams_count); + + /* Try to serve the client synchronously. */ + for (int i = streams_argc + streams_count; i < c->argc; i++) { + /* Specifying "$" as last-known-id means that the client wants to be + * served with just the messages that will arrive into the stream + * starting from now. */ + if (strcmp(c->argv[i]->ptr,"$") == 0) { + robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]); + if (o) { + stream *s = o->ptr; + ids[i] = s->last_id; + } else { + ids[i].ms = 0; + ids[i].seq = 0; + } + continue; + } + if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup; + } + +cleanup: + /* Cleanup. */ + if (ids != static_ids) zfree(ids); }