From 1392c83fb829d8586af8cdd1ef778b211be40e4a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 23 Mar 2018 17:21:31 +0100 Subject: [PATCH] CG: AOF rewriting implemented. --- src/aof.c | 89 +++++++++++++++++++++++++++++++++++++++++++++++++--- src/stream.h | 1 + 2 files changed, 86 insertions(+), 4 deletions(-) diff --git a/src/aof.c b/src/aof.c index 4a7d749d..8b735e24 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1074,14 +1074,52 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { return 1; } +/* Helper for rewriteStreamObject() that generates a bulk string into the + * AOF representing the ID 'id'. */ +int rioWriteBulkStreamID(rio *r,streamID *id) { + int retval; + + sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); + if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0; + sdsfree(replyid); + return retval; +} + +/* Helper for rewriteStreamObject(): emit the XCLAIM needed in order to + * add the message described by 'nack' having the id 'rawid', into the pending + * list of the specified consumer. All this in the context of the specified + * key and group. */ +int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) { + /* XCLAIM 0 TIME + RETRYCOUNT JUSTID FORCE. */ + streamID id; + streamDecodeID(rawid,&id); + if (rioWriteBulkCount(r,'*',12) == 0) return 0; + if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0; + if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0; + if (rioWriteBulkString(r,"0",1) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; + if (rioWriteBulkString(r,"TIME",4) == 0) return 0; + if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0; + if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0; + if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0; + if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0; + if (rioWriteBulkString(r,"FORCE",5) == 0) return 0; + return 1; +} + /* Emit the commands needed to rebuild a stream object. * The function returns 0 on error, 1 on success. */ int rewriteStreamObject(rio *r, robj *key, robj *o) { + stream *s = o->ptr; streamIterator si; - streamIteratorStart(&si,o->ptr,NULL,NULL,0); + streamIteratorStart(&si,s,NULL,NULL,0); streamID id; int64_t numfields; + /* Reconstruct the stream data using XADD commands. */ while(streamIteratorGetID(&si,&id,&numfields)) { /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ @@ -1090,9 +1128,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; - sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); - if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0; - sdsfree(replyid); + if (rioWriteBulkStreamID(r,&id) == 0) return 0; while(numfields--) { unsigned char *field, *value; int64_t field_len, value_len; @@ -1101,6 +1137,51 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; } } + + /* Create all the stream consumer groups. */ + if (s->cgroups) { + raxIterator ri; + raxStart(&ri,s->cgroups); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamCG *group = ri.data; + /* Emit the XGROUP CREATE in order to create the group. */ + if (rioWriteBulkCount(r,'*',5) == 0) return 0; + if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0; + if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0; + if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0; + + /* Generate XCLAIMs for each consumer that happens to + * have pending entries. Empty consumers have no semantical + * value so they are discarded. */ + raxIterator ri_cons; + raxStart(&ri_cons,group->consumers); + raxSeek(&ri_cons,"^",NULL,0); + while(raxNext(&ri_cons)) { + streamConsumer *consumer = ri_cons.data; + /* For the current consumer, iterate all the PEL entries + * to emit the XCLAIM protocol. */ + raxIterator ri_pel; + raxStart(&ri_pel,consumer->pel); + raxSeek(&ri_pel,"^",NULL,0); + while(raxNext(&ri_pel)) { + streamNACK *nack = ri_pel.data; + if (rioWriteStreamPendingEntry(r,key,(char*)ri.key, + ri.key_len,consumer, + ri_pel.key,nack) == 0) + { + return 0; + } + } + raxStop(&ri_pel); + } + raxStop(&ri_cons); + } + raxStop(&ri); + } + streamIteratorStop(&si); return 1; } diff --git a/src/stream.h b/src/stream.h index 7cc44ae7..8a019e93 100644 --- a/src/stream.h +++ b/src/stream.h @@ -105,5 +105,6 @@ streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); +void streamDecodeID(void *buf, streamID *id); #endif