diff --git a/src/stream.h b/src/stream.h index 75c9c57e..4b9e6888 100644 --- a/src/stream.h +++ b/src/stream.h @@ -47,7 +47,7 @@ typedef struct streamIterator { /* Consumer group. */ typedef struct streamCG { - streamID lastid; /* Last delivered (not acknowledged) ID for this + streamID last_id; /* Last delivered (not acknowledged) ID for this group. Consumers that will just ask for more messages will served with IDs > than this. */ rax *pel; /* Pending entries list. This is a radix tree that diff --git a/src/t_stream.c b/src/t_stream.c index ffa5130d..38b59a06 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -41,6 +41,7 @@ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); +streamCG *streamLookupCG(stream *s, sds groupname); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -924,9 +925,13 @@ void xlenCommand(client *c) { addReplyLongLong(c,s->length); } -/* XREAD [BLOCK ] [COUNT ] [GROUP ] - * [RETRY ] STREAMS key_1 key_2 ... key_N - * ID_1 ID_2 ... ID_N */ +/* XREAD [BLOCK ] [COUNT ] STREAMS key_1 key_2 ... key_N + * ID_1 ID_2 ... ID_N + * + * This function also implements the XREAD-GROUP command, which is like XREAD + * but accepting the [GROUP group-name consumer-name] additional option. + * This is useful because while XREAD is a read command and can be called + * on slaves, XREAD-GROUP is not. */ #define XREAD_BLOCKED_DEFAULT_COUNT 1000 void xreadCommand(client *c) { long long timeout = -1; /* -1 means, no BLOCK argument given. */ @@ -936,10 +941,14 @@ void xreadCommand(client *c) { #define STREAMID_STATIC_VECTOR_LEN 8 streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID *ids = static_ids; + streamCG **groups = NULL; + int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ + sds groupname = NULL; + sds consumername = NULL; /* Parse arguments. */ for (int i = 1; i < c->argc; i++) { - int moreargs = i != c->argc-1; + int moreargs = c->argc-i-1; char *o = c->argv[i]->ptr; if (!strcasecmp(o,"BLOCK") && moreargs) { i++; @@ -961,6 +970,15 @@ void xreadCommand(client *c) { } streams_count /= 2; /* We have two arguments for each stream. */ break; + } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { + if (!xreadgroup) { + addReplyError(c,"The GROUP option is only supported by " + "XREADGROUP. You called XREAD instead."); + return; + } + groupname = c->argv[i]->ptr; + consumername = c->argv[i+1]->ptr; + i += 2; } else { addReply(c,shared.syntaxerr); return; @@ -973,17 +991,40 @@ void xreadCommand(client *c) { return; } - /* Parse the IDs. */ + /* Parse the IDs and resolve the group name. */ if (streams_count > STREAMID_STATIC_VECTOR_LEN) ids = zmalloc(sizeof(streamID)*streams_count); + if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); for (int i = streams_arg + streams_count; i < c->argc; i++) { /* Specifying "$" as last-known-id means that the client wants to be * served with just the messages that will arrive into the stream * starting from now. */ int id_idx = i - streams_arg - streams_count; + robj *key = c->argv[i-streams_count]; + robj *o; + streamCG *group; + + /* If a group was specified, than we need to be sure that the + * key and group actually exist. */ + if (groupname) { + o = lookupKeyRead(c->db,key); + if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; + if (o == NULL || + (group = streamLookupCG(o->ptr,groupname)) == NULL) + { + addReplyErrorFormat(c, "No such key '%s' or consumer " + "group '%s' in XREADGROUP with GROUP " + "option", + key->ptr,groupname); + goto cleanup; + } + groups[id_idx] = group; + } + if (strcmp(c->argv[i]->ptr,"$") == 0) { - robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]); + o = lookupKeyRead(c->db,key); + if (checkType(c,o,OBJ_STREAM)) goto cleanup; if (o) { stream *s = o->ptr; ids[id_idx] = s->last_id; @@ -992,6 +1033,14 @@ void xreadCommand(client *c) { ids[id_idx].seq = 0; } continue; + } else if (strcmp(c->argv[i]->ptr,">") == 0) { + if (!xreadgroup || groupname == NULL) { + addReplyError(c,"The > ID can be specified only when calling " + "XREADGROUP using the GROUP " + " option."); + goto cleanup; + } + ids[id_idx] = group->last_id; } if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) goto cleanup; @@ -1056,6 +1105,7 @@ void xreadCommand(client *c) { cleanup: /* Cleanup. */ if (ids != static_ids) zfree(ids); + zfree(groups); } /* ----------------------------------------------------------------------- @@ -1082,7 +1132,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { streamCG *cg = zmalloc(sizeof(*cg)); cg->pel = raxNew(); cg->consumers = raxNew(); - cg->lastid = id; + cg->last_id = id; raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); return cg; } @@ -1094,6 +1144,14 @@ void streamFreeCG(streamCG *cg) { zfree(cg); } +/* Lookup the consumer group in the specified stream and returns its + * pointer, otherwise if there is no such group, NULL is returned. */ +streamCG *streamLookupCG(stream *s, sds groupname) { + streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, + sdslen(groupname)); + return (cg == raxNotFound) ? NULL : cg; +} + /* ----------------------------------------------------------------------- * Consumer groups commands * ----------------------------------------------------------------------- */