From b3e80d2f654a66358c53addffd34945363cce2bb Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Mon, 8 Oct 2018 21:23:38 +0800 Subject: [PATCH] Stream & AOF: rewrite stream in correct way --- src/aof.c | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/src/aof.c b/src/aof.c index f8f26bdf..3f914b77 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1121,23 +1121,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { streamID id; int64_t numfields; - /* Reconstruct the stream data using XADD commands. */ - while(streamIteratorGetID(&si,&id,&numfields)) { - /* Emit a two elements array for each item. The first is - * the ID, the second is an array of field-value pairs. */ + if (s->length) { + /* Reconstruct the stream data using XADD commands. */ + while(streamIteratorGetID(&si,&id,&numfields)) { + /* Emit a two elements array for each item. The first is + * the ID, the second is an array of field-value pairs. */ - /* Emit the XADD ...fields... command. */ - if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; - if (rioWriteBulkString(r,"XADD",4) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&id) == 0) return 0; - while(numfields--) { - unsigned char *field, *value; - int64_t field_len, value_len; - streamIteratorGetField(&si,&field,&value,&field_len,&value_len); - if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; - if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + /* Emit the XADD ...fields... command. */ + if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0; + if (rioWriteBulkString(r,"XADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; + while(numfields--) { + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(&si,&field,&value,&field_len,&value_len); + if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0; + if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; + } } + /* Append XSTREAM SETID after XADD, make sure lastid is correct, + * in case of XDEL lastid. */ + if (rioWriteBulkCount(r,'*',4) == 0) return 0; + if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; + if (rioWriteBulkString(r,"SETID",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + } else { + /* Using XSTREAM CREATE if the stream is empty. */ + if (rioWriteBulkCount(r,'*',4) == 0) return 0; + if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; + if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; } /* Create all the stream consumer groups. */