CG: XGROUPREAD group option parsing and groups lookup.

This commit is contained in:
antirez 2018-01-19 10:40:08 +01:00
parent 1fafe7def1
commit 2bbb2bf427
2 changed files with 66 additions and 8 deletions

View File

@ -47,7 +47,7 @@ typedef struct streamIterator {
/* Consumer group. */ /* Consumer group. */
typedef struct streamCG { typedef struct streamCG {
streamID lastid; /* Last delivered (not acknowledged) ID for this streamID last_id; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more group. Consumers that will just ask for more
messages will served with IDs > than this. */ messages will served with IDs > than this. */
rax *pel; /* Pending entries list. This is a radix tree that rax *pel; /* Pending entries list. This is a radix tree that

View File

@ -41,6 +41,7 @@
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname);
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks. * Low level stream encoding: a radix tree of listpacks.
@ -924,9 +925,13 @@ void xlenCommand(client *c) {
addReplyLongLong(c,s->length); addReplyLongLong(c,s->length);
} }
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
* [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N * ID_1 ID_2 ... ID_N
* ID_1 ID_2 ... ID_N */ *
* This function also implements the XREAD-GROUP command, which is like XREAD
* but accepting the [GROUP group-name consumer-name] additional option.
* This is useful because while XREAD is a read command and can be called
* on slaves, XREAD-GROUP is not. */
#define XREAD_BLOCKED_DEFAULT_COUNT 1000 #define XREAD_BLOCKED_DEFAULT_COUNT 1000
void xreadCommand(client *c) { void xreadCommand(client *c) {
long long timeout = -1; /* -1 means, no BLOCK argument given. */ long long timeout = -1; /* -1 means, no BLOCK argument given. */
@ -936,10 +941,14 @@ void xreadCommand(client *c) {
#define STREAMID_STATIC_VECTOR_LEN 8 #define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids; streamID *ids = static_ids;
streamCG **groups = NULL;
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
sds groupname = NULL;
sds consumername = NULL;
/* Parse arguments. */ /* Parse arguments. */
for (int i = 1; i < c->argc; i++) { for (int i = 1; i < c->argc; i++) {
int moreargs = i != c->argc-1; int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr; char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) { if (!strcasecmp(o,"BLOCK") && moreargs) {
i++; i++;
@ -961,6 +970,15 @@ void xreadCommand(client *c) {
} }
streams_count /= 2; /* We have two arguments for each stream. */ streams_count /= 2; /* We have two arguments for each stream. */
break; break;
} else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
if (!xreadgroup) {
addReplyError(c,"The GROUP option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
groupname = c->argv[i]->ptr;
consumername = c->argv[i+1]->ptr;
i += 2;
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
@ -973,17 +991,40 @@ void xreadCommand(client *c) {
return; return;
} }
/* Parse the IDs. */ /* Parse the IDs and resolve the group name. */
if (streams_count > STREAMID_STATIC_VECTOR_LEN) if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count); ids = zmalloc(sizeof(streamID)*streams_count);
if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
for (int i = streams_arg + streams_count; i < c->argc; i++) { for (int i = streams_arg + streams_count; i < c->argc; i++) {
/* Specifying "$" as last-known-id means that the client wants to be /* Specifying "$" as last-known-id means that the client wants to be
* served with just the messages that will arrive into the stream * served with just the messages that will arrive into the stream
* starting from now. */ * starting from now. */
int id_idx = i - streams_arg - streams_count; int id_idx = i - streams_arg - streams_count;
robj *key = c->argv[i-streams_count];
robj *o;
streamCG *group;
/* If a group was specified, than we need to be sure that the
* key and group actually exist. */
if (groupname) {
o = lookupKeyRead(c->db,key);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname)) == NULL)
{
addReplyErrorFormat(c, "No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP "
"option",
key->ptr,groupname);
goto cleanup;
}
groups[id_idx] = group;
}
if (strcmp(c->argv[i]->ptr,"$") == 0) { if (strcmp(c->argv[i]->ptr,"$") == 0) {
robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]); o = lookupKeyRead(c->db,key);
if (checkType(c,o,OBJ_STREAM)) goto cleanup;
if (o) { if (o) {
stream *s = o->ptr; stream *s = o->ptr;
ids[id_idx] = s->last_id; ids[id_idx] = s->last_id;
@ -992,6 +1033,14 @@ void xreadCommand(client *c) {
ids[id_idx].seq = 0; ids[id_idx].seq = 0;
} }
continue; continue;
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
if (!xreadgroup || groupname == NULL) {
addReplyError(c,"The > ID can be specified only when calling "
"XREADGROUP using the GROUP <group> "
"<consumer> option.");
goto cleanup;
}
ids[id_idx] = group->last_id;
} }
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
goto cleanup; goto cleanup;
@ -1056,6 +1105,7 @@ void xreadCommand(client *c) {
cleanup: cleanup:
/* Cleanup. */ /* Cleanup. */
if (ids != static_ids) zfree(ids); if (ids != static_ids) zfree(ids);
zfree(groups);
} }
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
@ -1082,7 +1132,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
streamCG *cg = zmalloc(sizeof(*cg)); streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew(); cg->pel = raxNew();
cg->consumers = raxNew(); cg->consumers = raxNew();
cg->lastid = id; cg->last_id = id;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg; return cg;
} }
@ -1094,6 +1144,14 @@ void streamFreeCG(streamCG *cg) {
zfree(cg); zfree(cg);
} }
/* Lookup the consumer group in the specified stream and returns its
* pointer, otherwise if there is no such group, NULL is returned. */
streamCG *streamLookupCG(stream *s, sds groupname) {
streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
sdslen(groupname));
return (cg == raxNotFound) ? NULL : cg;
}
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* Consumer groups commands * Consumer groups commands
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */