Streams: XREAD arguments parsing.

This commit is contained in:
antirez 2017-09-07 16:48:20 +02:00
parent 4086dff477
commit e65b4825f0

View File

@ -371,6 +371,7 @@ void xrangeCommand(client *c) {
if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) { if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) {
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK) if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK)
return; return;
if (count < 0) count = 0;
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
@ -397,6 +398,77 @@ void xlenCommand(client *c) {
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ... * [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
* key_N ID_N */ * key_N ID_N */
void xreadCommand(client *c) { void xreadCommand(client *c) {
long long block = 0;
long long count = 0;
int streams_count = 0;
int streams_argc = 0;
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
/* Parse arguments. */
for (int i = 1; i < c->argc; i++) {
int moreargs = i != c->argc-1;
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&block,NULL) != C_OK)
return;
if (block < 0) block = 0;
} else if (!strcasecmp(o,"COUNT") && moreargs) {
i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
return;
if (count < 0) count = 0;
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
streams_argc = i+1;
streams_count = (c->argc-streams_argc);
if ((streams_count % 2) != 0) {
addReplyError(c,"Unbalanced XREAD list of streams: "
"for each stream key an ID or '$' must be "
"specified.");
return;
}
streams_count /= 2; /* We have two arguments for each stream. */
break;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* STREAMS option is mandatory. */
if (streams_argc == 0) {
addReply(c,shared.syntaxerr);
return;
}
/* Parse the IDs. */
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count);
/* Try to serve the client synchronously. */
for (int i = streams_argc + streams_count; i < c->argc; i++) {
/* Specifying "$" as last-known-id means that the client wants to be
* served with just the messages that will arrive into the stream
* starting from now. */
if (strcmp(c->argv[i]->ptr,"$") == 0) {
robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]);
if (o) {
stream *s = o->ptr;
ids[i] = s->last_id;
} else {
ids[i].ms = 0;
ids[i].seq = 0;
}
continue;
}
if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup;
}
cleanup:
/* Cleanup. */
if (ids != static_ids) zfree(ids);
} }