From c1689166b71f298aec40db877e125c76c178b0ba Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 16 Oct 2018 16:48:31 +0200 Subject: [PATCH] Streams: rewrite empty streams with XADD MAXLEN 0. Use XSETID. Related to #5426. --- src/aof.c | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/aof.c b/src/aof.c index 4877172a..7d31183e 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1158,22 +1158,28 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0; } } - /* Append XSTREAM SETID after XADD, make sure lastid is correct, - * in case of XDEL lastid. */ - if (rioWriteBulkCount(r,'*',4) == 0) return 0; - if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; - if (rioWriteBulkString(r,"SETID",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; } else { - /* Using XSTREAM CREATE if the stream is empty. */ - if (rioWriteBulkCount(r,'*',4) == 0) return 0; - if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0; - if (rioWriteBulkString(r,"CREATE",6) == 0) return 0; + /* Use the XADD MAXLEN 0 trick to generate an empty stream if + * the key we are serializing is an empty string, which is possible + * for the Stream type. */ + if (rioWriteBulkCount(r,'*',7) == 0) return 0; + if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; + if (rioWriteBulkString(r,"0",1) == 0) return 0; + if (rioWriteBulkString(r,"*",1) == 0) return 0; + if (rioWriteBulkString(r,"x",1) == 0) return 0; + if (rioWriteBulkString(r,"y",1) == 0) return 0; } + /* Append XSETID after XADD, make sure lastid is correct, + * in case of XDEL lastid. */ + if (rioWriteBulkCount(r,'*',3) == 0) return 0; + if (rioWriteBulkString(r,"XSETID",6) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + + /* Create all the stream consumer groups. */ if (s->cgroups) { raxIterator ri;