From 26d4f8e3ec74811076e8a71cd384ea89b10e0c13 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Sep 2017 12:37:04 +0200 Subject: [PATCH] Streams: AOF rewriting + minor iterator improvements. --- src/aof.c | 33 +++++++++++++++++++++++++++++++++ src/t_stream.c | 14 +++++++++----- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/aof.c b/src/aof.c index 0593b270..5fbfdd69 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1031,6 +1031,37 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { return 1; } +/* Emit the commands needed to rebuild a stream object. + * The function returns 0 on error, 1 on success. */ +int rewriteStreamObject(rio *r, robj *key, robj *o) { + streamIterator si; + streamIteratorStart(&si,o->ptr,NULL,NULL); + streamID id; + int64_t numfields; + + while(streamIteratorGetID(&si,&id,&numfields)) { + /* Emit a two elements array for each item. The first is + * the ID, the second is an array of field-value pairs. */ + + /* Emit the XADD ...fields... command. */ + if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; + if (rioWriteBulkString(r,"XADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq); + if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0; + sdsfree(replyid); + while(numfields--) { + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(&si,&field,&value,&field_len,&value_len); + if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; + if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + } + } + streamIteratorStop(&si); + return 1; +} + /* Call the module type callback in order to rewrite a data type * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ @@ -1111,6 +1142,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { if (rewriteHashObject(aof,&key,o) == 0) goto werr; + } else if (o->type == OBJ_STREAM) { + if (rewriteStreamObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { if (rewriteModuleObject(aof,&key,o) == 0) goto werr; } else { diff --git a/src/t_stream.c b/src/t_stream.c index 3144adc7..76005008 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -218,19 +218,23 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) { /* Intialize the iterator and translates the iteration start/stop * elements into a 128 big big-endian number. */ - streamEncodeID(si->start_key,start); + if (start) { + streamEncodeID(si->start_key,start); + } else { + si->start_key[0] = 0; + si->start_key[0] = 0; + } + if (end) { streamEncodeID(si->end_key,end); } else { - /* We assume that UINT64_MAX is the same in little and big - * endian, that is, all bits set. */ si->end_key[0] = UINT64_MAX; si->end_key[0] = UINT64_MAX; } - raxStart(&si->ri,s->rax); /* Seek the correct node in the radix tree. */ - if (start->ms || start->seq) { + raxStart(&si->ri,s->rax); + if (start && (start->ms || start->seq)) { raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, sizeof(si->start_key)); if (raxEOF(&si->ri))