CG: data structures design + XGROUP CREATE implementation.

This commit is contained in:
antirez 2018-01-16 15:38:22 +01:00
parent d8207d09ee
commit 58f0c000a5
4 changed files with 143 additions and 2 deletions

View File

@ -276,7 +276,7 @@ struct redisCommand redisCommandTable[] = {
{"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
{"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
{"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
{"object",objectCommand,-2,"r",0,NULL,2,2,1,0,0},
{"memory",memoryCommand,-2,"r",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
{"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
@ -307,6 +307,7 @@ struct redisCommand redisCommandTable[] = {
{"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

View File

@ -2024,6 +2024,7 @@ void xrangeCommand(client *c);
void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
void xgroupCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -17,6 +17,7 @@ typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
@ -44,8 +45,46 @@ typedef struct streamIterator {
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
/* Prototypes of exported APIs. */
/* Consumer group. */
typedef struct streamCG {
streamID lastid; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more
messages will served with IDs > than this. */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNotAcked structure.*/
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
} streamCG;
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
sds *name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
rax *pel; /* Consumer specific pending entries list: all
the pending messages delivered to this
consumer not yet acknowledged. Keys are
big endian message IDs, while values are
the same streamNotAcked structure referenced
in the "pel" of the conumser group structure
itself, so the value is shared. */
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNotAcked {
mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */
} streamNotAcked;
/* Prototypes of exported APIs. */
struct client;
stream *streamNew(void);

View File

@ -40,6 +40,8 @@
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */
@ -51,12 +53,15 @@ stream *streamNew(void) {
s->length = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
/* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
if (s->cgroups)
raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
zfree(s);
}
@ -1053,4 +1058,99 @@ cleanup:
if (ids != static_ids) zfree(ids);
}
/* -----------------------------------------------------------------------
* Low level implementation of consumer groups
* ----------------------------------------------------------------------- */
void streamNotAckedFree(streamNotAcked *na) {
zfree(na);
}
void streamConsumerFree(streamConsumer *sc) {
zfree(sc);
}
/* Create a new consumer group in the context of the stream 's', having the
* specified name and last server ID. If a consumer group with the same name
* already existed NULL is returned, otherwise the pointer to the consumer
* group is returned. */
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen)) return NULL;
streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew();
cg->consumers = raxNew();
cg->lastid = id;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
/* Free a consumer group and all its associated data. */
void streamFreeCG(streamCG *cg) {
raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree);
raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree);
zfree(cg);
}
/* -----------------------------------------------------------------------
* Consumer groups commands
* ----------------------------------------------------------------------- */
/* XGROUP CREATE <key> <groupname> <id or $>
* XGROUP SETID <key> <id or $>
* XGROUP DELGROUP <key> <groupname>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
void xgroupCommand(client *c) {
const char *help[] = {
"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
"SETID <key> <groupname> <id or $> -- Set the current group ID.",
"DELGROUP <key> <groupname> -- Remove the specified group.",
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified conusmer.",
"HELP -- Prints this help.",
NULL
};
stream *s = NULL;
sds grpname = NULL;
/* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL) return;
s = o->ptr;
grpname = c->argv[3]->ptr;
}
char *opt = c->argv[1]->ptr;
if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return;
}
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),id);
if (cg) {
addReply(c,shared.ok);
} else {
addReplyError(c,"Consumer Group name already exists");
}
} else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
} else if (!strcasecmp(opt,"DELGROUP") && c->argc == 4) {
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {
addReply(c,shared.syntaxerr);
}
}
/* XPENDING <key> [<start> <stop>]. */
/* XCLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ...*/
/* XACK <stream-key> */
/* XREAD-GROUP will be implemented by xreadGenericCommand() */
/* XINFO <key> [CONSUMERS|GROUPS|STREAM]. STREAM is the default */