mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Streams: RDB loading. RDB saving modified.
After a few attempts it looked quite saner to just add the last item ID at the end of the serialized listpacks, instead of scanning the last listpack loaded from head to tail just to fetch it. It's a disk space VS CPU-and-simplicity tradeoff basically.
This commit is contained in:
parent
485014cc74
commit
edd70c1993
40
src/rdb.c
40
src/rdb.c
@ -785,6 +785,12 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
|
||||
nwritten += n;
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
||||
/* Save the last entry ID. */
|
||||
if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
|
||||
nwritten += n;
|
||||
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
|
||||
nwritten += n;
|
||||
} else if (o->type == OBJ_MODULE) {
|
||||
/* Save a module-specific value. */
|
||||
RedisModuleIO io;
|
||||
@ -1431,6 +1437,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||
break;
|
||||
}
|
||||
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
|
||||
o = createStreamObject();
|
||||
stream *s = o->ptr;
|
||||
uint64_t listpacks = rdbLoadLen(rdb,NULL);
|
||||
|
||||
while(listpacks--) {
|
||||
unsigned char *lp =
|
||||
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
|
||||
if (lp == NULL) return NULL;
|
||||
unsigned char *first = lpFirst(lp);
|
||||
if (first == NULL) {
|
||||
/* Serialized listpacks should never be free, since on
|
||||
* deletion we should remove the radix tree key if the
|
||||
* resulting listpack is emtpy. */
|
||||
rdbExitReportCorruptRDB("Empty listpack inside stream");
|
||||
}
|
||||
|
||||
/* Get the ID of the first entry: we'll use it as key to add the
|
||||
* listpack into the radix tree. */
|
||||
int64_t e_len;
|
||||
unsigned char buf[LP_INTBUF_SIZE];
|
||||
unsigned char *e = lpGet(first,&e_len,buf);
|
||||
if (e_len != sizeof(streamID)) {
|
||||
rdbExitReportCorruptRDB("Listpack first entry is not the "
|
||||
"size of a stream ID");
|
||||
}
|
||||
int retval = raxInsert(s->rax,e,sizeof(streamID),lp,NULL);
|
||||
if (!retval)
|
||||
rdbExitReportCorruptRDB("Listpack re-added with existing key");
|
||||
}
|
||||
|
||||
/* Load the last entry ID. */
|
||||
s->last_id.ms = rdbLoadLen(rdb,NULL);
|
||||
s->last_id.seq = rdbLoadLen(rdb,NULL);
|
||||
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||
|
Loading…
x
Reference in New Issue
Block a user