In Redis RDB check: initial POC.

So far we used an external program (later executed within Redis) and
parser in order to check RDB files for correctness. This forces, at each
RDB format update, to have two copies of the same format implementation
that are hard to keep in sync. Morover the former RDB checker only
checked the very high-level format of the file, without actually trying
to load things in memory. Certain corruptions can only be handled by
really loading key-value pairs.

This first commit attempts to unify the Redis RDB loadig code with the
task of checking the RDB file for correctness. More work is needed but
it looks like a sounding direction so far.
This commit is contained in:
antirez 2016-06-30 23:44:44 +02:00
parent 24bd9b19f6
commit e97fadb045
2 changed files with 193 additions and 664 deletions

View File

@ -1,6 +1,5 @@
/* /*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -28,696 +27,226 @@
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "server.h" #include "server.h"
#include "rdb.h" #include "rdb.h"
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include "lzf.h"
#include "crc64.h"
#define ERROR(...) { \ #include <stdarg.h>
serverLog(LL_WARNING, __VA_ARGS__); \
exit(1); \ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len);
long long rdbLoadMillisecondTime(rio *rdb);
struct {
rio *rio;
robj *key; /* Current key we are reading. */
unsigned long keys; /* Number of keys processed. */
unsigned long expires; /* Number of keys with an expire. */
unsigned long already_expired; /* Number of keys already expired. */
int doing; /* The state while reading the RDB. */
} rdbstate;
#define RDB_CHECK_DOING_START 0
#define RDB_CHECK_DOING_READ_EXPIRE 1
#define RDB_CHECK_DOING_READ_KEY 2
#define RDB_CHECK_DOING_READ_VALUE 3
/* Called on RDB errors. Provides details about the RDB and the offset
* we were when the error was detected. */
void rdbCheckError(const char *fmt, ...) {
char msg[1024];
va_list ap;
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
printf("*** RDB CHECK FAILED: %s ***\n", msg);
printf("AT RDB OFFSET: %llu\n",
(unsigned long long) (rdbstate.rio ?
rdbstate.rio->processed_bytes : 0));
if (rdbstate.key)
printf("READING KEY: %s\n", (char*)rdbstate.key->ptr);
} }
/* data type to hold offset in file and size */ /* Print informations during RDB checking. */
typedef struct { void rdbCheckInfo(const char *fmt, ...) {
void *data; char msg[1024];
size_t size; va_list ap;
size_t offset;
} pos;
static unsigned char level = 0; va_start(ap, fmt);
static pos positions[16]; vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
#define CURR_OFFSET (positions[level].offset) printf("[offset %llu] %s\n",
(unsigned long long) (rdbstate.rio ?
/* Hold a stack of errors */ rdbstate.rio->processed_bytes : 0), msg);
typedef struct {
char error[16][1024];
size_t offset[16];
size_t level;
} errors_t;
static errors_t errors;
#define SHIFT_ERROR(provided_offset, ...) { \
sprintf(errors.error[errors.level], __VA_ARGS__); \
errors.offset[errors.level] = provided_offset; \
errors.level++; \
} }
/* Data type to hold opcode with optional key name an success status */ /* During RDB check we setup a special signal handler for memory violations
typedef struct { * and similar conditions, so that we can log the offending part of the RDB
char* key; * if the crash is due to broken content. */
int type; void rdbCheckHandleCrash(int sig, siginfo_t *info, void *secret) {
char success; UNUSED(sig);
} entry; UNUSED(info);
UNUSED(secret);
#define MAX_TYPES_NUM 256 rdbCheckError("Server crash checking the specified RDB file!");
#define MAX_TYPE_NAME_LEN 16
/* store string types for output */
static char types[MAX_TYPES_NUM][MAX_TYPE_NAME_LEN];
/* Return true if 't' is a valid object type. */
static int rdbCheckType(unsigned char t) {
/* In case a new object type is added, update the following
* condition as necessary. */
return
(t >= RDB_TYPE_HASH_ZIPMAP && t <= RDB_TYPE_HASH_ZIPLIST) ||
t <= RDB_TYPE_HASH ||
t >= RDB_OPCODE_EXPIRETIME_MS;
}
/* when number of bytes to read is negative, do a peek */
static int readBytes(void *target, long num) {
char peek = (num < 0) ? 1 : 0;
num = (num < 0) ? -num : num;
pos p = positions[level];
if (p.offset + num > p.size) {
return 0;
} else {
memcpy(target, (void*)((size_t)p.data + p.offset), num);
if (!peek) positions[level].offset += num;
}
return 1;
}
int processHeader(void) {
char buf[10] = "_________";
int dump_version;
if (!readBytes(buf, 9)) {
ERROR("Cannot read header");
}
/* expect the first 5 bytes to equal REDIS */
if (memcmp(buf,"REDIS",5) != 0) {
ERROR("Wrong signature in header");
}
dump_version = (int)strtol(buf + 5, NULL, 10);
if (dump_version < 1 || dump_version > 6) {
ERROR("Unknown RDB format version: %d", dump_version);
}
return dump_version;
}
static int loadType(entry *e) {
uint32_t offset = CURR_OFFSET;
/* this byte needs to qualify as type */
unsigned char t;
if (readBytes(&t, 1)) {
if (rdbCheckType(t)) {
e->type = t;
return 1;
} else {
SHIFT_ERROR(offset, "Unknown type (0x%02x)", t);
}
} else {
SHIFT_ERROR(offset, "Could not read type");
}
/* failure */
return 0;
}
static int peekType() {
unsigned char t;
if (readBytes(&t, -1) && (rdbCheckType(t)))
return t;
return -1;
}
/* discard time, just consume the bytes */
static int processTime(int type) {
uint32_t offset = CURR_OFFSET;
unsigned char t[8];
int timelen = (type == RDB_OPCODE_EXPIRETIME_MS) ? 8 : 4;
if (readBytes(t,timelen)) {
return 1;
} else {
SHIFT_ERROR(offset, "Could not read time");
}
/* failure */
return 0;
}
static uint64_t loadLength(int *isencoded) {
unsigned char buf[2];
uint32_t len;
int type;
if (isencoded) *isencoded = 0;
if (!readBytes(buf, 1)) return RDB_LENERR;
type = (buf[0] & 0xC0) >> 6;
if (type == RDB_6BITLEN) {
/* Read a 6 bit len */
return buf[0] & 0x3F;
} else if (type == RDB_ENCVAL) {
/* Read a 6 bit len encoding type */
if (isencoded) *isencoded = 1;
return buf[0] & 0x3F;
} else if (type == RDB_14BITLEN) {
/* Read a 14 bit len */
if (!readBytes(buf+1,1)) return RDB_LENERR;
return ((buf[0] & 0x3F) << 8) | buf[1];
} else if (buf[0] == RDB_32BITLEN) {
/* Read a 32 bit len */
if (!readBytes(&len, 4)) return RDB_LENERR;
return ntohl(len);
} else if (buf[0] == RDB_64BITLEN) {
/* Read a 64 bit len */
if (!readBytes(&len, 8)) return RDB_LENERR;
return ntohu64(len);
} else {
return RDB_LENERR;
}
}
static char *loadIntegerObject(int enctype) {
uint32_t offset = CURR_OFFSET;
unsigned char enc[4];
long long val;
if (enctype == RDB_ENC_INT8) {
uint8_t v;
if (!readBytes(enc, 1)) return NULL;
v = enc[0];
val = (int8_t)v;
} else if (enctype == RDB_ENC_INT16) {
uint16_t v;
if (!readBytes(enc, 2)) return NULL;
v = enc[0]|(enc[1]<<8);
val = (int16_t)v;
} else if (enctype == RDB_ENC_INT32) {
uint32_t v;
if (!readBytes(enc, 4)) return NULL;
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
SHIFT_ERROR(offset, "Unknown integer encoding (0x%02x)", enctype);
return NULL;
}
/* convert val into string */
char *buf;
buf = zmalloc(sizeof(char) * 128);
sprintf(buf, "%lld", val);
return buf;
}
static char* loadLzfStringObject() {
uint64_t slen, clen;
char *c, *s;
if ((clen = loadLength(NULL)) == RDB_LENERR) return NULL;
if ((slen = loadLength(NULL)) == RDB_LENERR) return NULL;
c = zmalloc(clen);
if (!readBytes(c, clen)) {
zfree(c);
return NULL;
}
s = zmalloc(slen+1);
if (lzf_decompress(c,clen,s,slen) == 0) {
zfree(c); zfree(s);
return NULL;
}
zfree(c);
return s;
}
/* returns NULL when not processable, char* when valid */
static char* loadStringObject() {
uint64_t offset = CURR_OFFSET;
uint64_t len;
int isencoded;
len = loadLength(&isencoded);
if (isencoded) {
switch(len) {
case RDB_ENC_INT8:
case RDB_ENC_INT16:
case RDB_ENC_INT32:
return loadIntegerObject(len);
case RDB_ENC_LZF:
return loadLzfStringObject();
default:
/* unknown encoding */
SHIFT_ERROR(offset, "Unknown string encoding (0x%02llx)",
(unsigned long long) len);
return NULL;
}
}
if (len == RDB_LENERR) return NULL;
char *buf = zmalloc(sizeof(char) * (len+1));
if (buf == NULL) return NULL;
buf[len] = '\0';
if (!readBytes(buf, len)) {
zfree(buf);
return NULL;
}
return buf;
}
static int processStringObject(char** store) {
unsigned long offset = CURR_OFFSET;
char *key = loadStringObject();
if (key == NULL) {
SHIFT_ERROR(offset, "Error reading string object");
zfree(key);
return 0;
}
if (store != NULL) {
*store = key;
} else {
zfree(key);
}
return 1;
}
static double* loadDoubleValue() {
char buf[256];
unsigned char len;
double* val;
if (!readBytes(&len,1)) return NULL;
val = zmalloc(sizeof(double));
switch(len) {
case 255: *val = R_NegInf; return val;
case 254: *val = R_PosInf; return val;
case 253: *val = R_Nan; return val;
default:
if (!readBytes(buf, len)) {
zfree(val);
return NULL;
}
buf[len] = '\0';
sscanf(buf, "%lg", val);
return val;
}
}
static int processDoubleValue(double** store) {
unsigned long offset = CURR_OFFSET;
double *val = loadDoubleValue();
if (val == NULL) {
SHIFT_ERROR(offset, "Error reading double value");
zfree(val);
return 0;
}
if (store != NULL) {
*store = val;
} else {
zfree(val);
}
return 1;
}
static int loadPair(entry *e) {
uint64_t offset = CURR_OFFSET;
uint64_t i;
/* read key first */
char *key;
if (processStringObject(&key)) {
e->key = key;
} else {
SHIFT_ERROR(offset, "Error reading entry key");
return 0;
}
uint64_t length = 0;
if (e->type == RDB_TYPE_LIST ||
e->type == RDB_TYPE_SET ||
e->type == RDB_TYPE_ZSET ||
e->type == RDB_TYPE_HASH) {
if ((length = loadLength(NULL)) == RDB_LENERR) {
SHIFT_ERROR(offset, "Error reading %s length", types[e->type]);
return 0;
}
}
switch(e->type) {
case RDB_TYPE_STRING:
case RDB_TYPE_HASH_ZIPMAP:
case RDB_TYPE_LIST_ZIPLIST:
case RDB_TYPE_SET_INTSET:
case RDB_TYPE_ZSET_ZIPLIST:
case RDB_TYPE_HASH_ZIPLIST:
if (!processStringObject(NULL)) {
SHIFT_ERROR(offset, "Error reading entry value");
return 0;
}
break;
case RDB_TYPE_LIST:
case RDB_TYPE_SET:
for (i = 0; i < length; i++) {
offset = CURR_OFFSET;
if (!processStringObject(NULL)) {
SHIFT_ERROR(offset, "Error reading element at index %llu (length: %llu)",
(unsigned long long) i, (unsigned long long) length);
return 0;
}
}
break;
case RDB_TYPE_ZSET:
for (i = 0; i < length; i++) {
offset = CURR_OFFSET;
if (!processStringObject(NULL)) {
SHIFT_ERROR(offset, "Error reading element key at index %llu (length: %llu)",
(unsigned long long) i, (unsigned long long) length);
return 0;
}
offset = CURR_OFFSET;
if (!processDoubleValue(NULL)) {
SHIFT_ERROR(offset, "Error reading element value at index %llu (length: %llu)",
(unsigned long long) i, (unsigned long long) length);
return 0;
}
}
break;
case RDB_TYPE_HASH:
for (i = 0; i < length; i++) {
offset = CURR_OFFSET;
if (!processStringObject(NULL)) {
SHIFT_ERROR(offset, "Error reading element key at index %llu (length: %llu)",
(unsigned long long) i, (unsigned long long) length);
return 0;
}
offset = CURR_OFFSET;
if (!processStringObject(NULL)) {
SHIFT_ERROR(offset, "Error reading element value at index %llu (length: %llu)",
(unsigned long long) i, (unsigned long long) length);
return 0;
}
}
break;
default:
SHIFT_ERROR(offset, "Type not implemented");
return 0;
}
/* because we're done, we assume success */
e->success = 1;
return 1;
}
static entry loadEntry() {
entry e = { NULL, -1, 0 };
uint64_t length, offset[4];
/* reset error container */
errors.level = 0;
offset[0] = CURR_OFFSET;
if (!loadType(&e)) {
return e;
}
offset[1] = CURR_OFFSET;
if (e.type == RDB_OPCODE_SELECTDB) {
if ((length = loadLength(NULL)) == RDB_LENERR) {
SHIFT_ERROR(offset[1], "Error reading database number");
return e;
}
if (length > 63) {
SHIFT_ERROR(offset[1], "Database number out of range (%llu)",
(unsigned long long) length);
return e;
}
} else if (e.type == RDB_OPCODE_EOF) {
if (positions[level].offset < positions[level].size) {
SHIFT_ERROR(offset[0], "Unexpected EOF");
} else {
e.success = 1;
}
return e;
} else {
/* optionally consume expire */
if (e.type == RDB_OPCODE_EXPIRETIME ||
e.type == RDB_OPCODE_EXPIRETIME_MS) {
if (!processTime(e.type)) return e;
if (!loadType(&e)) return e;
}
offset[1] = CURR_OFFSET;
if (!loadPair(&e)) {
SHIFT_ERROR(offset[1], "Error for type %s", types[e.type]);
return e;
}
}
/* all entries are followed by a valid type:
* e.g. a new entry, SELECTDB, EXPIRE, EOF */
offset[2] = CURR_OFFSET;
if (peekType() == -1) {
SHIFT_ERROR(offset[2], "Followed by invalid type");
SHIFT_ERROR(offset[0], "Error for type %s", types[e.type]);
e.success = 0;
} else {
e.success = 1;
}
return e;
}
static void printCentered(int indent, int width, char* body) {
char head[256], tail[256];
memset(head, '\0', 256);
memset(tail, '\0', 256);
memset(head, '=', indent);
memset(tail, '=', width - 2 - indent - strlen(body));
serverLog(LL_WARNING, "%s %s %s", head, body, tail);
}
static void printValid(uint64_t ops, uint64_t bytes) {
char body[80];
sprintf(body, "Processed %llu valid opcodes (in %llu bytes)",
(unsigned long long) ops, (unsigned long long) bytes);
printCentered(4, 80, body);
}
static void printSkipped(uint64_t bytes, uint64_t offset) {
char body[80];
sprintf(body, "Skipped %llu bytes (resuming at 0x%08llx)",
(unsigned long long) bytes, (unsigned long long) offset);
printCentered(4, 80, body);
}
static void printErrorStack(entry *e) {
unsigned int i;
char body[64];
if (e->type == -1) {
sprintf(body, "Error trace");
} else if (e->type >= 253) {
sprintf(body, "Error trace (%s)", types[e->type]);
} else if (!e->key) {
sprintf(body, "Error trace (%s: (unknown))", types[e->type]);
} else {
char tmp[41];
strncpy(tmp, e->key, 40);
/* display truncation at the last 3 chars */
if (strlen(e->key) > 40) {
memset(&tmp[37], '.', 3);
}
/* display unprintable characters as ? */
for (i = 0; i < strlen(tmp); i++) {
if (tmp[i] <= 32) tmp[i] = '?';
}
sprintf(body, "Error trace (%s: %s)", types[e->type], tmp);
}
printCentered(4, 80, body);
/* display error stack */
for (i = 0; i < errors.level; i++) {
serverLog(LL_WARNING, "0x%08lx - %s",
(unsigned long) errors.offset[i], errors.error[i]);
}
}
void process(void) {
uint64_t num_errors = 0, num_valid_ops = 0, num_valid_bytes = 0;
entry entry = { NULL, -1, 0 };
int dump_version = processHeader();
/* Exclude the final checksum for RDB >= 5. Will be checked at the end. */
if (dump_version >= 5) {
if (positions[0].size < 8) {
serverLog(LL_WARNING, "RDB version >= 5 but no room for checksum.");
exit(1); exit(1);
}
positions[0].size -= 8;
}
level = 1;
while(positions[0].offset < positions[0].size) {
positions[1] = positions[0];
entry = loadEntry();
if (!entry.success) {
printValid(num_valid_ops, num_valid_bytes);
printErrorStack(&entry);
num_errors++;
num_valid_ops = 0;
num_valid_bytes = 0;
/* search for next valid entry */
uint64_t offset = positions[0].offset + 1;
int i = 0;
while (!entry.success && offset < positions[0].size) {
positions[1].offset = offset;
/* find 3 consecutive valid entries */
for (i = 0; i < 3; i++) {
entry = loadEntry();
if (!entry.success) break;
}
/* check if we found 3 consecutive valid entries */
if (i < 3) {
offset++;
}
}
/* print how many bytes we have skipped to find a new valid opcode */
if (offset < positions[0].size) {
printSkipped(offset - positions[0].offset, offset);
}
positions[0].offset = offset;
} else {
num_valid_ops++;
num_valid_bytes += positions[1].offset - positions[0].offset;
/* advance position */
positions[0] = positions[1];
}
zfree(entry.key);
}
/* because there is another potential error,
* print how many valid ops we have processed */
printValid(num_valid_ops, num_valid_bytes);
/* expect an eof */
if (entry.type != RDB_OPCODE_EOF) {
/* last byte should be EOF, add error */
errors.level = 0;
SHIFT_ERROR(positions[0].offset, "Expected EOF, got %s", types[entry.type]);
/* this is an EOF error so reset type */
entry.type = -1;
printErrorStack(&entry);
num_errors++;
}
/* Verify checksum */
if (dump_version >= 5) {
uint64_t crc = crc64(0,positions[0].data,positions[0].size);
uint64_t crc2;
unsigned char *p = (unsigned char*)positions[0].data+positions[0].size;
crc2 = ((uint64_t)p[0] << 0) |
((uint64_t)p[1] << 8) |
((uint64_t)p[2] << 16) |
((uint64_t)p[3] << 24) |
((uint64_t)p[4] << 32) |
((uint64_t)p[5] << 40) |
((uint64_t)p[6] << 48) |
((uint64_t)p[7] << 56);
if (crc != crc2) {
SHIFT_ERROR(positions[0].offset, "RDB CRC64 does not match.");
} else {
serverLog(LL_WARNING, "CRC64 checksum is OK");
}
}
/* print summary on errors */
if (num_errors) {
serverLog(LL_WARNING, "Total unprocessable opcodes: %llu",
(unsigned long long) num_errors);
}
} }
void rdbCheckSetupSignals(void) {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
#warning "Uncomment here"
// act.sa_sigaction = rdbCheckHandleCrash;
sigaction(SIGSEGV, &act, NULL);
sigaction(SIGBUS, &act, NULL);
sigaction(SIGFPE, &act, NULL);
sigaction(SIGILL, &act, NULL);
}
/* Check the specified RDB file. */
int redis_check_rdb(char *rdbfilename) { int redis_check_rdb(char *rdbfilename) {
int fd; uint64_t dbid;
off_t size; int type, rdbver;
struct stat stat; char buf[1024];
void *data; long long expiretime, now = mstime();
FILE *fp;
rio rdb;
fd = open(rdbfilename, O_RDONLY); if ((fp = fopen(rdbfilename,"r")) == NULL) return C_ERR;
if (fd < 1) {
ERROR("Cannot open file: %s", rdbfilename); rioInitWithFile(&rdb,fp);
rdbstate.rio = &rdb;
rdb.update_cksum = rdbLoadProgressCallback;
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
rdbCheckError("Wrong signature trying to load DB from file");
return 1;
} }
if (fstat(fd, &stat) == -1) { rdbver = atoi(buf+5);
ERROR("Cannot stat: %s", rdbfilename); if (rdbver < 1 || rdbver > RDB_VERSION) {
} else { rdbCheckError("Can't handle RDB format version %d",rdbver);
size = stat.st_size; return 1;
} }
if (sizeof(size_t) == sizeof(int32_t) && size >= INT_MAX) { startLoading(fp);
ERROR("Cannot check dump files >2GB on a 32-bit platform"); while(1) {
robj *key, *val;
expiretime = -1;
/* Read type. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
/* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to
* load the actual type, and continue. */
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds. */
expiretime *= 1000;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
} else if (type == RDB_OPCODE_EOF) {
/* EOF: End of file, exit the main loop. */
break;
} else if (type == RDB_OPCODE_SELECTDB) {
/* SELECTDB: Select the specified database. */
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
rdbCheckInfo("Selecting DB ID %d", dbid);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
* selected data base, in order to avoid useless rehashing. */
uint64_t db_size, expires_size;
if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
continue; /* Read type again. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
* which is backward compatible. Implementations of RDB loading
* are requierd to skip AUX fields they don't understand.
*
* An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
rdbCheckInfo("%s = '%s'", (char*)auxkey->ptr, (char*)auxval->ptr);
decrRefCount(auxkey);
decrRefCount(auxval);
continue; /* Read type again. */
} }
data = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0); /* Read key */
if (data == MAP_FAILED) { if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
ERROR("Cannot mmap: %s", rdbfilename); rdbstate.key = key;
rdbstate.keys++;
/* Read value */
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && expiretime != -1 && expiretime < now)
rdbstate.already_expired++;
if (expiretime != -1) rdbstate.expires++;
rdbstate.key = NULL;
decrRefCount(key);
decrRefCount(val);
}
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum;
if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
memrev64ifbe(&cksum);
if (cksum == 0) {
rdbCheckInfo("RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
rdbCheckError("RDB CRC error");
}
} }
/* Initialize static vars */ fclose(fp);
positions[0].data = data;
positions[0].size = size;
positions[0].offset = 0;
errors.level = 0;
/* Object types */
sprintf(types[RDB_TYPE_STRING], "STRING");
sprintf(types[RDB_TYPE_LIST], "LIST");
sprintf(types[RDB_TYPE_SET], "SET");
sprintf(types[RDB_TYPE_ZSET], "ZSET");
sprintf(types[RDB_TYPE_HASH], "HASH");
/* Object types only used for dumping to disk */
sprintf(types[RDB_OPCODE_EXPIRETIME], "EXPIRETIME");
sprintf(types[RDB_OPCODE_SELECTDB], "SELECTDB");
sprintf(types[RDB_OPCODE_EOF], "EOF");
process();
munmap(data, size);
close(fd);
return 0; return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
rdbCheckError("Unexpected EOF reading RDB file");
return 1;
} }
/* RDB check main: called form redis.c when Redis is executed with the /* RDB check main: called form redis.c when Redis is executed with the
* redis-check-rdb alias. */ * redis-check-rdb alias.
*
* The function never returns, but exits with the status code according
* to success (RDB is sane) or error (RDB is corrupted). */
int redis_check_rdb_main(char **argv, int argc) { int redis_check_rdb_main(char **argv, int argc) {
if (argc != 2) { if (argc != 2) {
fprintf(stderr, "Usage: %s <rdb-file-name>\n", argv[0]); fprintf(stderr, "Usage: %s <rdb-file-name>\n", argv[0]);
exit(1); exit(1);
} }
serverLog(LL_WARNING, "Checking RDB file %s", argv[1]); createSharedObjects(); /* Needed for loading. */
exit(redis_check_rdb(argv[1])); server.loading_process_events_interval_bytes = 0;
return 0; rdbCheckInfo("Checking RDB file %s", argv[1]);
rdbCheckSetupSignals();
int retval = redis_check_rdb(argv[1]);
if (retval == 0) {
rdbCheckInfo("\\o/ RDB looks OK! \\o/");
}
exit(retval);
} }

View File

@ -4033,7 +4033,7 @@ int main(int argc, char **argv) {
* the program main. However the program is part of the Redis executable * the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */ * so that we can easily execute an RDB check on loading errors. */
if (strstr(argv[0],"redis-check-rdb") != NULL) if (strstr(argv[0],"redis-check-rdb") != NULL)
exit(redis_check_rdb_main(argv,argc)); redis_check_rdb_main(argv,argc);
if (argc >= 2) { if (argc >= 2) {
j = 1; /* First option to parse in argv[] */ j = 1; /* First option to parse in argv[] */