CG: AOF rewriting implemented.

This commit is contained in:
antirez 2018-03-23 17:21:31 +01:00
parent 6c4cb1670a
commit 1392c83fb8
2 changed files with 86 additions and 4 deletions

View File

@ -1074,14 +1074,52 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
return 1; return 1;
} }
/* Helper for rewriteStreamObject() that generates a bulk string into the
* AOF representing the ID 'id'. */
int rioWriteBulkStreamID(rio *r,streamID *id) {
int retval;
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0;
sdsfree(replyid);
return retval;
}
/* Helper for rewriteStreamObject(): emit the XCLAIM needed in order to
* add the message described by 'nack' having the id 'rawid', into the pending
* list of the specified consumer. All this in the context of the specified
* key and group. */
int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
/* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid,&id);
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
return 1;
}
/* Emit the commands needed to rebuild a stream object. /* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */ * The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) { int rewriteStreamObject(rio *r, robj *key, robj *o) {
stream *s = o->ptr;
streamIterator si; streamIterator si;
streamIteratorStart(&si,o->ptr,NULL,NULL,0); streamIteratorStart(&si,s,NULL,NULL,0);
streamID id; streamID id;
int64_t numfields; int64_t numfields;
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is /* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */ * the ID, the second is an array of field-value pairs. */
@ -1090,9 +1128,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0;
sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0;
sdsfree(replyid);
while(numfields--) { while(numfields--) {
unsigned char *field, *value; unsigned char *field, *value;
int64_t field_len, value_len; int64_t field_len, value_len;
@ -1101,6 +1137,51 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
} }
} }
/* Create all the stream consumer groups. */
if (s->cgroups) {
raxIterator ri;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *group = ri.data;
/* Emit the XGROUP CREATE in order to create the group. */
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
/* Generate XCLAIMs for each consumer that happens to
* have pending entries. Empty consumers have no semantical
* value so they are discarded. */
raxIterator ri_cons;
raxStart(&ri_cons,group->consumers);
raxSeek(&ri_cons,"^",NULL,0);
while(raxNext(&ri_cons)) {
streamConsumer *consumer = ri_cons.data;
/* For the current consumer, iterate all the PEL entries
* to emit the XCLAIM protocol. */
raxIterator ri_pel;
raxStart(&ri_pel,consumer->pel);
raxSeek(&ri_pel,"^",NULL,0);
while(raxNext(&ri_pel)) {
streamNACK *nack = ri_pel.data;
if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
ri.key_len,consumer,
ri_pel.key,nack) == 0)
{
return 0;
}
}
raxStop(&ri_pel);
}
raxStop(&ri_cons);
}
raxStop(&ri);
}
streamIteratorStop(&si); streamIteratorStop(&si);
return 1; return 1;
} }

View File

@ -105,5 +105,6 @@ streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer); streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
#endif #endif