From edd70c1993b79d85bfc2812b0bf4bf4771ff40ed Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 5 Sep 2017 16:24:11 +0200 Subject: [PATCH] Streams: RDB loading. RDB saving modified. After a few attempts it looked quite saner to just add the last item ID at the end of the serialized listpacks, instead of scanning the last listpack loaded from head to tail just to fetch it. It's a disk space VS CPU-and-simplicity tradeoff basically. --- src/rdb.c | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index c79bfa8d..acc6ca87 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -785,6 +785,12 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; } raxStop(&ri); + + /* Save the last entry ID. */ + if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1; + nwritten += n; } else if (o->type == OBJ_MODULE) { /* Save a module-specific value. */ RedisModuleIO io; @@ -1431,6 +1437,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } + } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) { + o = createStreamObject(); + stream *s = o->ptr; + uint64_t listpacks = rdbLoadLen(rdb,NULL); + + while(listpacks--) { + unsigned char *lp = + rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); + if (lp == NULL) return NULL; + unsigned char *first = lpFirst(lp); + if (first == NULL) { + /* Serialized listpacks should never be free, since on + * deletion we should remove the radix tree key if the + * resulting listpack is emtpy. */ + rdbExitReportCorruptRDB("Empty listpack inside stream"); + } + + /* Get the ID of the first entry: we'll use it as key to add the + * listpack into the radix tree. */ + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(first,&e_len,buf); + if (e_len != sizeof(streamID)) { + rdbExitReportCorruptRDB("Listpack first entry is not the " + "size of a stream ID"); + } + int retval = raxInsert(s->rax,e,sizeof(streamID),lp,NULL); + if (!retval) + rdbExitReportCorruptRDB("Listpack re-added with existing key"); + } + + /* Load the last entry ID. */ + s->last_id.ms = rdbLoadLen(rdb,NULL); + s->last_id.seq = rdbLoadLen(rdb,NULL); } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); moduleType *mt = moduleTypeLookupModuleByID(moduleid);