From 58f0c000a5630d22c221bc4291b46f2fc1654ead Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 16 Jan 2018 15:38:22 +0100 Subject: [PATCH] CG: data structures design + XGROUP CREATE implementation. --- src/server.c | 3 +- src/server.h | 1 + src/stream.h | 41 +++++++++++++++++++- src/t_stream.c | 100 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 2 deletions(-) diff --git a/src/server.c b/src/server.c index 85f05f1f..f718dfdd 100644 --- a/src/server.c +++ b/src/server.c @@ -276,7 +276,7 @@ struct redisCommand redisCommandTable[] = { {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0}, {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0}, - {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, + {"object",objectCommand,-2,"r",0,NULL,2,2,1,0,0}, {"memory",memoryCommand,-2,"r",0,NULL,0,0,0,0,0}, {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0}, {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, @@ -307,6 +307,7 @@ struct redisCommand redisCommandTable[] = { {"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0}, + {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 0668c375..72a6563a 100644 --- a/src/server.h +++ b/src/server.h @@ -2024,6 +2024,7 @@ void xrangeCommand(client *c); void xrevrangeCommand(client *c); void xlenCommand(client *c); void xreadCommand(client *c); +void xgroupCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/stream.h b/src/stream.h index 214b6d9a..75c9c57e 100644 --- a/src/stream.h +++ b/src/stream.h @@ -17,6 +17,7 @@ typedef struct stream { rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ + rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; /* We define an iterator to iterate stream items in an abstract way, without @@ -44,8 +45,46 @@ typedef struct streamIterator { unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator; -/* Prototypes of exported APIs. */ +/* Consumer group. */ +typedef struct streamCG { + streamID lastid; /* Last delivered (not acknowledged) ID for this + group. Consumers that will just ask for more + messages will served with IDs > than this. */ + rax *pel; /* Pending entries list. This is a radix tree that + has every message delivered to consumers (without + the NOACK option) that was yet not acknowledged + as processed. The key of the radix tree is the + ID as a 64 bit big endian number, while the + associated value is a streamNotAcked structure.*/ + rax *consumers; /* A radix tree representing the consumers by name + and their associated representation in the form + of streamConsumer structures. */ +} streamCG; +/* A specific consumer in a consumer group. */ +typedef struct streamConsumer { + mstime_t seen_time; /* Last time this consumer was active. */ + sds *name; /* Consumer name. This is how the consumer + will be identified in the consumer group + protocol. Case sensitive. */ + rax *pel; /* Consumer specific pending entries list: all + the pending messages delivered to this + consumer not yet acknowledged. Keys are + big endian message IDs, while values are + the same streamNotAcked structure referenced + in the "pel" of the conumser group structure + itself, so the value is shared. */ +} streamConsumer; + +/* Pending (yet not acknowledged) message in a consumer group. */ +typedef struct streamNotAcked { + mstime_t delivery_time; /* Last time this message was delivered. */ + uint64_t delivery_count; /* Number of times this message was delivered.*/ + streamConsumer *consumer; /* The consumer this message was delivered to + in the last delivery. */ +} streamNotAcked; + +/* Prototypes of exported APIs. */ struct client; stream *streamNew(void); diff --git a/src/t_stream.c b/src/t_stream.c index 1f2e2094..b9c0c4bd 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -40,6 +40,8 @@ #define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ +void streamFreeCG(streamCG *cg); + /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. * ----------------------------------------------------------------------- */ @@ -51,12 +53,15 @@ stream *streamNew(void) { s->length = 0; s->last_id.ms = 0; s->last_id.seq = 0; + s->cgroups = NULL; /* Created on demand to save memory when not used. */ return s; } /* Free a stream, including the listpacks stored inside the radix tree. */ void freeStream(stream *s) { raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); + if (s->cgroups) + raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); zfree(s); } @@ -1053,4 +1058,99 @@ cleanup: if (ids != static_ids) zfree(ids); } +/* ----------------------------------------------------------------------- + * Low level implementation of consumer groups + * ----------------------------------------------------------------------- */ +void streamNotAckedFree(streamNotAcked *na) { + zfree(na); +} + +void streamConsumerFree(streamConsumer *sc) { + zfree(sc); +} + +/* Create a new consumer group in the context of the stream 's', having the + * specified name and last server ID. If a consumer group with the same name + * already existed NULL is returned, otherwise the pointer to the consumer + * group is returned. */ +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { + if (s->cgroups == NULL) s->cgroups = raxNew(); + if (raxFind(s->cgroups,(unsigned char*)name,namelen)) return NULL; + + streamCG *cg = zmalloc(sizeof(*cg)); + cg->pel = raxNew(); + cg->consumers = raxNew(); + cg->lastid = id; + raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); + return cg; +} + +/* Free a consumer group and all its associated data. */ +void streamFreeCG(streamCG *cg) { + raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree); + raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree); + zfree(cg); +} + +/* ----------------------------------------------------------------------- + * Consumer groups commands + * ----------------------------------------------------------------------- */ + +/* XGROUP CREATE + * XGROUP SETID + * XGROUP DELGROUP + * XGROUP DELCONSUMER */ +void xgroupCommand(client *c) { + const char *help[] = { +"CREATE -- Create a new consumer group.", +"SETID -- Set the current group ID.", +"DELGROUP -- Remove the specified group.", +"DELCONSUMER -- Remove the specified conusmer.", +"HELP -- Prints this help.", +NULL + }; + stream *s = NULL; + sds grpname = NULL; + + /* Lookup the key now, this is common for all the subcommands but HELP. */ + if (c->argc >= 4) { + robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr); + if (o == NULL) return; + s = o->ptr; + grpname = c->argv[3]->ptr; + } + + char *opt = c->argv[1]->ptr; + if (!strcasecmp(opt,"CREATE") && c->argc == 5) { + streamID id; + if (!strcmp(c->argv[4]->ptr,"$")) { + id = s->last_id; + } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { + return; + } + streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),id); + if (cg) { + addReply(c,shared.ok); + } else { + addReplyError(c,"Consumer Group name already exists"); + } + } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { + } else if (!strcasecmp(opt,"DELGROUP") && c->argc == 4) { + } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { + } else if (!strcasecmp(opt,"HELP")) { + addReplyHelp(c, help); + } else { + addReply(c,shared.syntaxerr); + } +} + +/* XPENDING [ ]. */ + +/* XCLAIM ...*/ + +/* XACK */ + +/* XREAD-GROUP will be implemented by xreadGenericCommand() */ + +/* XINFO [CONSUMERS|GROUPS|STREAM]. STREAM is the default */