mirror of
https://github.com/fluencelabs/redis
synced 2025-04-21 16:42:13 +00:00
prevent diskless replica from terminating on short read
now that replica can read rdb directly from the socket, it should avoid exiting on short read and instead try to re-sync. this commit tries to have minimal effects on non-diskless rdb reading. and includes a test that tries to trigger this scenario on various read cases.
This commit is contained in:
parent
241d18d954
commit
c56b4ddc6f
188
src/rdb.c
188
src/rdb.c
@ -42,31 +42,35 @@
|
|||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/param.h>
|
#include <sys/param.h>
|
||||||
|
|
||||||
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
|
/* This macro is called when the internal RDB stracture is corrupt */
|
||||||
|
#define rdbExitReportCorruptRDB(...) rdbReportReadError(0, __LINE__,__VA_ARGS__)
|
||||||
|
/* This macro is called when RDB read failed (possibly a short read) */
|
||||||
|
#define rdbReportReadError(...) rdbReportError(1, __LINE__,__VA_ARGS__)
|
||||||
|
|
||||||
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
|
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
|
||||||
extern int rdbCheckMode;
|
extern int rdbCheckMode;
|
||||||
void rdbCheckError(const char *fmt, ...);
|
void rdbCheckError(const char *fmt, ...);
|
||||||
void rdbCheckSetError(const char *fmt, ...);
|
void rdbCheckSetError(const char *fmt, ...);
|
||||||
|
|
||||||
void rdbCheckThenExit(int linenum, char *reason, ...) {
|
void rdbReportError(int read_error, int linenum, char *reason, ...) {
|
||||||
va_list ap;
|
va_list ap;
|
||||||
char msg[1024];
|
char msg[1024];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
len = snprintf(msg,sizeof(msg),
|
len = snprintf(msg,sizeof(msg),
|
||||||
"Internal error in RDB reading function at rdb.c:%d -> ", linenum);
|
"Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
|
||||||
|
(unsigned long long)server.loading_loaded_bytes, linenum);
|
||||||
va_start(ap,reason);
|
va_start(ap,reason);
|
||||||
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
|
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
|
||||||
va_end(ap);
|
va_end(ap);
|
||||||
|
|
||||||
if (!rdbCheckMode) {
|
if (!rdbCheckMode) {
|
||||||
|
if (rdbFileBeingLoaded || !read_error) {
|
||||||
serverLog(LL_WARNING, "%s", msg);
|
serverLog(LL_WARNING, "%s", msg);
|
||||||
if (rdbFileBeingLoaded) {
|
|
||||||
char *argv[2] = {"",rdbFileBeingLoaded};
|
char *argv[2] = {"",rdbFileBeingLoaded};
|
||||||
redis_check_rdb_main(2,argv,NULL);
|
redis_check_rdb_main(2,argv,NULL);
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
|
serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
|
|||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is just a wrapper for the low level function rioRead() that will
|
|
||||||
* automatically abort if it is not possible to read the specified amount
|
|
||||||
* of bytes. */
|
|
||||||
void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
|
|
||||||
if (rioRead(rdb,buf,len) == 0) {
|
|
||||||
rdbExitReportCorruptRDB(
|
|
||||||
"Impossible to read %llu bytes in rdbLoadRaw()",
|
|
||||||
(unsigned long long) len);
|
|
||||||
return; /* Not reached. */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int rdbSaveType(rio *rdb, unsigned char type) {
|
int rdbSaveType(rio *rdb, unsigned char type) {
|
||||||
return rdbWriteRaw(rdb,&type,1);
|
return rdbWriteRaw(rdb,&type,1);
|
||||||
}
|
}
|
||||||
@ -110,10 +102,11 @@ int rdbLoadType(rio *rdb) {
|
|||||||
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
|
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
|
||||||
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
|
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
|
||||||
* opcode. */
|
* opcode. */
|
||||||
time_t rdbLoadTime(rio *rdb) {
|
int rdbLoadTime(rio *rdb, time_t *t) {
|
||||||
int32_t t32;
|
int32_t t32;
|
||||||
rdbLoadRaw(rdb,&t32,4);
|
if (rioRead(rdb,&t32,4) == 0) return C_ERR;
|
||||||
return (time_t)t32;
|
*t = (time_t)t32;
|
||||||
|
return C_OK;;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rdbSaveMillisecondTime(rio *rdb, long long t) {
|
int rdbSaveMillisecondTime(rio *rdb, long long t) {
|
||||||
@ -133,12 +126,13 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
|
|||||||
* own old RDB files. Because of that, we instead fix the function only for new
|
* own old RDB files. Because of that, we instead fix the function only for new
|
||||||
* RDB versions, and load older RDB versions as we used to do in the past,
|
* RDB versions, and load older RDB versions as we used to do in the past,
|
||||||
* allowing big endian systems to load their own old RDB files. */
|
* allowing big endian systems to load their own old RDB files. */
|
||||||
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
|
int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver) {
|
||||||
int64_t t64;
|
int64_t t64;
|
||||||
rdbLoadRaw(rdb,&t64,8);
|
if (rioRead(rdb,&t64,8) == 0) return C_ERR;
|
||||||
if (rdbver >= 9) /* Check the top comment of this function. */
|
if (rdbver >= 9) /* Check the top comment of this function. */
|
||||||
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
|
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
|
||||||
return (long long)t64;
|
*t = (long long)t64;
|
||||||
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Saves an encoded length. The first two bits in the first byte are used to
|
/* Saves an encoded length. The first two bits in the first byte are used to
|
||||||
@ -216,9 +210,8 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
|
|||||||
if (rioRead(rdb,&len,8) == 0) return -1;
|
if (rioRead(rdb,&len,8) == 0) return -1;
|
||||||
*lenptr = ntohu64(len);
|
*lenptr = ntohu64(len);
|
||||||
} else {
|
} else {
|
||||||
rdbExitReportCorruptRDB(
|
serverLog(LL_WARNING, "Unknown length encoding %d in rdbLoadLen()",type);
|
||||||
"Unknown length encoding %d in rdbLoadLen()",type);
|
return -1;
|
||||||
return -1; /* Never reached. */
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -284,8 +277,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
|
|||||||
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
|
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
|
||||||
val = (int32_t)v;
|
val = (int32_t)v;
|
||||||
} else {
|
} else {
|
||||||
val = 0; /* anti-warning */
|
serverLog(LL_WARNING, "Unknown RDB integer encoding type %d", enctype);
|
||||||
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
|
return NULL;
|
||||||
}
|
}
|
||||||
if (plain || sds) {
|
if (plain || sds) {
|
||||||
char buf[LONG_STR_SIZE], *p;
|
char buf[LONG_STR_SIZE], *p;
|
||||||
@ -502,7 +495,8 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
|
|||||||
case RDB_ENC_LZF:
|
case RDB_ENC_LZF:
|
||||||
return rdbLoadLzfStringObject(rdb,flags,lenptr);
|
return rdbLoadLzfStringObject(rdb,flags,lenptr);
|
||||||
default:
|
default:
|
||||||
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
|
serverLog(LL_WARNING, "Unknown RDB encoding type %llu", (unsigned long long)len);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1644,6 +1638,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
hashTypeConvert(o, OBJ_ENCODING_HT);
|
hashTypeConvert(o, OBJ_ENCODING_HT);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
/* totally unreachable */
|
||||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1651,6 +1646,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
o = createStreamObject();
|
o = createStreamObject();
|
||||||
stream *s = o->ptr;
|
stream *s = o->ptr;
|
||||||
uint64_t listpacks = rdbLoadLen(rdb,NULL);
|
uint64_t listpacks = rdbLoadLen(rdb,NULL);
|
||||||
|
if (listpacks == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream listpacks len loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
while(listpacks--) {
|
while(listpacks--) {
|
||||||
/* Get the master ID, the one we'll use as key of the radix tree
|
/* Get the master ID, the one we'll use as key of the radix tree
|
||||||
@ -1658,7 +1658,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
* relatively to this ID. */
|
* relatively to this ID. */
|
||||||
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||||
if (nodekey == NULL) {
|
if (nodekey == NULL) {
|
||||||
rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
|
rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
if (sdslen(nodekey) != sizeof(streamID)) {
|
if (sdslen(nodekey) != sizeof(streamID)) {
|
||||||
rdbExitReportCorruptRDB("Stream node key entry is not the "
|
rdbExitReportCorruptRDB("Stream node key entry is not the "
|
||||||
@ -1668,7 +1670,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
/* Load the listpack. */
|
/* Load the listpack. */
|
||||||
unsigned char *lp =
|
unsigned char *lp =
|
||||||
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
|
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
|
||||||
if (lp == NULL) return NULL;
|
if (lp == NULL) {
|
||||||
|
rdbReportReadError("Stream listpacks loading failed.");
|
||||||
|
sdsfree(nodekey);
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
unsigned char *first = lpFirst(lp);
|
unsigned char *first = lpFirst(lp);
|
||||||
if (first == NULL) {
|
if (first == NULL) {
|
||||||
/* Serialized listpacks should never be empty, since on
|
/* Serialized listpacks should never be empty, since on
|
||||||
@ -1685,13 +1692,26 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
rdbExitReportCorruptRDB("Listpack re-added with existing key");
|
rdbExitReportCorruptRDB("Listpack re-added with existing key");
|
||||||
}
|
}
|
||||||
/* Load total number of items inside the stream. */
|
/* Load total number of items inside the stream. */
|
||||||
s->length = rdbLoadLen(rdb,NULL);
|
if (rdbLoadLenByRef(rdb,NULL,&s->length)) {
|
||||||
|
rdbReportReadError("Stream item count loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
/* Load the last entry ID. */
|
/* Load the last entry ID. */
|
||||||
s->last_id.ms = rdbLoadLen(rdb,NULL);
|
if (rdbLoadLenByRef(rdb,NULL,&s->last_id.ms) ||
|
||||||
s->last_id.seq = rdbLoadLen(rdb,NULL);
|
rdbLoadLenByRef(rdb,NULL,&s->last_id.seq)) {
|
||||||
|
rdbReportReadError("Stream last entry ID loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Consumer groups loading */
|
/* Consumer groups loading */
|
||||||
size_t cgroups_count = rdbLoadLen(rdb,NULL);
|
uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
|
||||||
|
if (cgroups_count == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream cgroup count loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
while(cgroups_count--) {
|
while(cgroups_count--) {
|
||||||
/* Get the consumer group name and ID. We can then create the
|
/* Get the consumer group name and ID. We can then create the
|
||||||
* consumer group ASAP and populate its structure as
|
* consumer group ASAP and populate its structure as
|
||||||
@ -1699,11 +1719,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
streamID cg_id;
|
streamID cg_id;
|
||||||
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||||
if (cgname == NULL) {
|
if (cgname == NULL) {
|
||||||
rdbExitReportCorruptRDB(
|
rdbReportReadError(
|
||||||
"Error reading the consumer group name from Stream");
|
"Error reading the consumer group name from Stream");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (rdbLoadLenByRef(rdb,NULL,&cg_id.ms) ||
|
||||||
|
rdbLoadLenByRef(rdb,NULL,&cg_id.seq)) {
|
||||||
|
rdbReportReadError("Stream cgroup ID loading failed.");
|
||||||
|
sdsfree(cgname);
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
cg_id.ms = rdbLoadLen(rdb,NULL);
|
|
||||||
cg_id.seq = rdbLoadLen(rdb,NULL);
|
|
||||||
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
|
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
|
||||||
if (cgroup == NULL)
|
if (cgroup == NULL)
|
||||||
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
|
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
|
||||||
@ -1715,13 +1742,32 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
* owner, since consumers for this group and their messages will
|
* owner, since consumers for this group and their messages will
|
||||||
* be read as a next step. So for now leave them not resolved
|
* be read as a next step. So for now leave them not resolved
|
||||||
* and later populate it. */
|
* and later populate it. */
|
||||||
size_t pel_size = rdbLoadLen(rdb,NULL);
|
uint64_t pel_size = rdbLoadLen(rdb,NULL);
|
||||||
|
if (pel_size == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream PEL size loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
while(pel_size--) {
|
while(pel_size--) {
|
||||||
unsigned char rawid[sizeof(streamID)];
|
unsigned char rawid[sizeof(streamID)];
|
||||||
rdbLoadRaw(rdb,rawid,sizeof(rawid));
|
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
|
||||||
|
rdbReportReadError("Stream PEL ID loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
streamNACK *nack = streamCreateNACK(NULL);
|
streamNACK *nack = streamCreateNACK(NULL);
|
||||||
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
if (rdbLoadMillisecondTime(rdb, &nack->delivery_time,RDB_VERSION) == C_ERR) {
|
||||||
nack->delivery_count = rdbLoadLen(rdb,NULL);
|
rdbReportReadError("Stream PEL nack loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
streamFreeNACK(nack);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if ((nack->delivery_count = rdbLoadLen(rdb,NULL)) == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream nack deliveries loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
streamFreeNACK(nack);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
|
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
|
||||||
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
|
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
|
||||||
"loading stream consumer group");
|
"loading stream consumer group");
|
||||||
@ -1729,24 +1775,44 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
|
|
||||||
/* Now that we loaded our global PEL, we need to load the
|
/* Now that we loaded our global PEL, we need to load the
|
||||||
* consumers and their local PELs. */
|
* consumers and their local PELs. */
|
||||||
size_t consumers_num = rdbLoadLen(rdb,NULL);
|
uint64_t consumers_num = rdbLoadLen(rdb,NULL);
|
||||||
|
if (consumers_num == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream consumers num loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
while(consumers_num--) {
|
while(consumers_num--) {
|
||||||
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||||
if (cname == NULL) {
|
if (cname == NULL) {
|
||||||
rdbExitReportCorruptRDB(
|
rdbReportReadError(
|
||||||
"Error reading the consumer name from Stream group");
|
"Error reading the consumer name from Stream group.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
|
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
|
||||||
1);
|
1);
|
||||||
sdsfree(cname);
|
sdsfree(cname);
|
||||||
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
if (rdbLoadMillisecondTime(rdb, &consumer->seen_time,RDB_VERSION) == C_ERR) {
|
||||||
|
rdbReportReadError("Stream short read reading seen time.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Load the PEL about entries owned by this specific
|
/* Load the PEL about entries owned by this specific
|
||||||
* consumer. */
|
* consumer. */
|
||||||
pel_size = rdbLoadLen(rdb,NULL);
|
pel_size = rdbLoadLen(rdb,NULL);
|
||||||
|
if (pel_size == RDB_LENERR) {
|
||||||
|
rdbReportReadError("Stream consumer PEL num loading failed.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
while(pel_size--) {
|
while(pel_size--) {
|
||||||
unsigned char rawid[sizeof(streamID)];
|
unsigned char rawid[sizeof(streamID)];
|
||||||
rdbLoadRaw(rdb,rawid,sizeof(rawid));
|
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
|
||||||
|
rdbReportReadError("Stream short read reading PEL streamID.");
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
|
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
|
||||||
if (nack == raxNotFound)
|
if (nack == raxNotFound)
|
||||||
rdbExitReportCorruptRDB("Consumer entry not found in "
|
rdbExitReportCorruptRDB("Consumer entry not found in "
|
||||||
@ -1764,7 +1830,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
||||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
uint64_t moduleid;
|
||||||
|
if (rdbLoadLenByRef(rdb,NULL, &moduleid)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||||
char name[10];
|
char name[10];
|
||||||
|
|
||||||
@ -1792,6 +1861,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
/* Module v2 serialization has an EOF mark at the end. */
|
/* Module v2 serialization has an EOF mark at the end. */
|
||||||
if (io.ver == 2) {
|
if (io.ver == 2) {
|
||||||
uint64_t eof = rdbLoadLen(rdb,NULL);
|
uint64_t eof = rdbLoadLen(rdb,NULL);
|
||||||
|
if (eof == RDB_LENERR) {
|
||||||
|
o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
|
||||||
|
decrRefCount(o);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
if (eof != RDB_MODULE_OPCODE_EOF) {
|
if (eof != RDB_MODULE_OPCODE_EOF) {
|
||||||
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
||||||
exit(1);
|
exit(1);
|
||||||
@ -1805,7 +1879,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
|||||||
}
|
}
|
||||||
o = createModuleObject(mt,ptr);
|
o = createModuleObject(mt,ptr);
|
||||||
} else {
|
} else {
|
||||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
@ -1902,13 +1977,14 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
/* EXPIRETIME: load an expire associated with the next key
|
/* EXPIRETIME: load an expire associated with the next key
|
||||||
* to load. Note that after loading an expire we need to
|
* to load. Note that after loading an expire we need to
|
||||||
* load the actual type, and continue. */
|
* load the actual type, and continue. */
|
||||||
expiretime = rdbLoadTime(rdb);
|
time_t t;
|
||||||
expiretime *= 1000;
|
if (rdbLoadTime(rdb, &t) == C_ERR) goto eoferr;
|
||||||
|
expiretime = t * 1000;
|
||||||
continue; /* Read next opcode. */
|
continue; /* Read next opcode. */
|
||||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||||
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
||||||
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
||||||
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
|
if (rdbLoadMillisecondTime(rdb, &expiretime, rdbver) == C_ERR) goto eoferr;
|
||||||
continue; /* Read next opcode. */
|
continue; /* Read next opcode. */
|
||||||
} else if (type == RDB_OPCODE_FREQ) {
|
} else if (type == RDB_OPCODE_FREQ) {
|
||||||
/* FREQ: LFU frequency. */
|
/* FREQ: LFU frequency. */
|
||||||
@ -2017,7 +2093,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
* we have the ability to read a MODULE_AUX opcode followed by an
|
* we have the ability to read a MODULE_AUX opcode followed by an
|
||||||
* identifier of the module, and a serialized value in "MODULE V2"
|
* identifier of the module, and a serialized value in "MODULE V2"
|
||||||
* format. */
|
* format. */
|
||||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
uint64_t moduleid;
|
||||||
|
if (rdbLoadLenByRef(rdb,NULL,&moduleid)) goto eoferr;
|
||||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||||
char name[10];
|
char name[10];
|
||||||
moduleTypeNameByID(name,moduleid);
|
moduleTypeNameByID(name,moduleid);
|
||||||
@ -2090,8 +2167,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
|
|
||||||
eoferr: /* unexpected end of file is handled here with a fatal exit */
|
eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||||
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||||
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
|
rdbReportReadError("Unexpected EOF reading RDB file");
|
||||||
return C_ERR; /* Just to avoid warning */
|
err:
|
||||||
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||||
|
@ -127,10 +127,10 @@
|
|||||||
int rdbSaveType(rio *rdb, unsigned char type);
|
int rdbSaveType(rio *rdb, unsigned char type);
|
||||||
int rdbLoadType(rio *rdb);
|
int rdbLoadType(rio *rdb);
|
||||||
int rdbSaveTime(rio *rdb, time_t t);
|
int rdbSaveTime(rio *rdb, time_t t);
|
||||||
time_t rdbLoadTime(rio *rdb);
|
int rdbLoadTime(rio *rdb, time_t *t);
|
||||||
int rdbSaveLen(rio *rdb, uint64_t len);
|
int rdbSaveLen(rio *rdb, uint64_t len);
|
||||||
int rdbSaveMillisecondTime(rio *rdb, long long t);
|
int rdbSaveMillisecondTime(rio *rdb, long long t);
|
||||||
long long rdbLoadMillisecondTime(rio *rdb, int rdbver);
|
int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver);
|
||||||
uint64_t rdbLoadLen(rio *rdb, int *isencoded);
|
uint64_t rdbLoadLen(rio *rdb, int *isencoded);
|
||||||
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
|
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
|
||||||
int rdbSaveObjectType(rio *rdb, robj *o);
|
int rdbSaveObjectType(rio *rdb, robj *o);
|
||||||
|
@ -212,18 +212,19 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
|
|||||||
|
|
||||||
/* Handle special types. */
|
/* Handle special types. */
|
||||||
if (type == RDB_OPCODE_EXPIRETIME) {
|
if (type == RDB_OPCODE_EXPIRETIME) {
|
||||||
|
time_t t;
|
||||||
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
|
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
|
||||||
/* EXPIRETIME: load an expire associated with the next key
|
/* EXPIRETIME: load an expire associated with the next key
|
||||||
* to load. Note that after loading an expire we need to
|
* to load. Note that after loading an expire we need to
|
||||||
* load the actual type, and continue. */
|
* load the actual type, and continue. */
|
||||||
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
|
if (rdbLoadTime(&rdb, &t) == C_ERR) goto eoferr;
|
||||||
expiretime *= 1000;
|
expiretime = t * 1000;
|
||||||
continue; /* Read next opcode. */
|
continue; /* Read next opcode. */
|
||||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||||
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
||||||
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
||||||
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
|
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
|
||||||
if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr;
|
if (rdbLoadMillisecondTime(&rdb, &expiretime, rdbver) == C_ERR) goto eoferr;
|
||||||
continue; /* Read next opcode. */
|
continue; /* Read next opcode. */
|
||||||
} else if (type == RDB_OPCODE_FREQ) {
|
} else if (type == RDB_OPCODE_FREQ) {
|
||||||
/* FREQ: LFU frequency. */
|
/* FREQ: LFU frequency. */
|
||||||
|
@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
|||||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||||
void streamDecodeID(void *buf, streamID *id);
|
void streamDecodeID(void *buf, streamID *id);
|
||||||
int streamCompareID(streamID *a, streamID *b);
|
int streamCompareID(streamID *a, streamID *b);
|
||||||
|
void streamFreeNACK(streamNACK *na);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -192,12 +192,6 @@ foreach mdl {no yes} {
|
|||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
set master_port [srv 0 port]
|
set master_port [srv 0 port]
|
||||||
set slaves {}
|
set slaves {}
|
||||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
|
|
||||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
|
|
||||||
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
|
|
||||||
set load_handle3 [start_write_load $master_host $master_port 8]
|
|
||||||
set load_handle4 [start_write_load $master_host $master_port 4]
|
|
||||||
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
|
|
||||||
start_server {} {
|
start_server {} {
|
||||||
lappend slaves [srv 0 client]
|
lappend slaves [srv 0 client]
|
||||||
start_server {} {
|
start_server {} {
|
||||||
@ -205,6 +199,14 @@ foreach mdl {no yes} {
|
|||||||
start_server {} {
|
start_server {} {
|
||||||
lappend slaves [srv 0 client]
|
lappend slaves [srv 0 client]
|
||||||
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
|
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
|
||||||
|
# start load handles only inside the test, so that the test can be skipped
|
||||||
|
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
|
||||||
|
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
|
||||||
|
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
|
||||||
|
set load_handle3 [start_write_load $master_host $master_port 8]
|
||||||
|
set load_handle4 [start_write_load $master_host $master_port 4]
|
||||||
|
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
|
||||||
|
|
||||||
# Send SLAVEOF commands to slaves
|
# Send SLAVEOF commands to slaves
|
||||||
[lindex $slaves 0] config set repl-diskless-load $sdl
|
[lindex $slaves 0] config set repl-diskless-load $sdl
|
||||||
[lindex $slaves 1] config set repl-diskless-load $sdl
|
[lindex $slaves 1] config set repl-diskless-load $sdl
|
||||||
@ -278,9 +280,9 @@ start_server {tags {"repl"}} {
|
|||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
set master_port [srv 0 port]
|
set master_port [srv 0 port]
|
||||||
set load_handle0 [start_write_load $master_host $master_port 3]
|
|
||||||
start_server {} {
|
start_server {} {
|
||||||
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
||||||
|
set load_handle0 [start_write_load $master_host $master_port 3]
|
||||||
set slave [srv 0 client]
|
set slave [srv 0 client]
|
||||||
$slave config set lua-time-limit 500
|
$slave config set lua-time-limit 500
|
||||||
$slave slaveof $master_host $master_port
|
$slave slaveof $master_host $master_port
|
||||||
@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {diskless loading short read} {
|
||||||
|
start_server {tags {"repl"}} {
|
||||||
|
set replica [srv 0 client]
|
||||||
|
set replica_host [srv 0 host]
|
||||||
|
set replica_port [srv 0 port]
|
||||||
|
start_server {} {
|
||||||
|
set master [srv 0 client]
|
||||||
|
set master_host [srv 0 host]
|
||||||
|
set master_port [srv 0 port]
|
||||||
|
|
||||||
|
# Set master and replica to use diskless replication
|
||||||
|
$master config set repl-diskless-sync yes
|
||||||
|
$master config set rdbcompression no
|
||||||
|
$replica config set repl-diskless-load swapdb
|
||||||
|
# Try to fill the master with all types of data types / encodings
|
||||||
|
for {set k 0} {$k < 3} {incr k} {
|
||||||
|
for {set i 0} {$i < 10} {incr i} {
|
||||||
|
r set "$k int_$i" [expr {int(rand()*10000)}]
|
||||||
|
r expire "$k int_$i" [expr {int(rand()*10000)}]
|
||||||
|
r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]]
|
||||||
|
r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]]
|
||||||
|
r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]]
|
||||||
|
r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]]
|
||||||
|
r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]]
|
||||||
|
r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]]
|
||||||
|
r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]]
|
||||||
|
r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]]
|
||||||
|
r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]]
|
||||||
|
for {set j 0} {$j < 10} {incr j} {
|
||||||
|
r xadd "$k stream" * foo "asdf" bar "1234"
|
||||||
|
}
|
||||||
|
r xgroup create "$k stream" "mygroup_$i" 0
|
||||||
|
r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" >
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Start the replication process...
|
||||||
|
$master config set repl-diskless-sync-delay 0
|
||||||
|
$replica replicaof $master_host $master_port
|
||||||
|
|
||||||
|
# kill the replication at various points
|
||||||
|
set attempts 3
|
||||||
|
if {$::accurate} { set attempts 10 }
|
||||||
|
for {set i 0} {$i < $attempts} {incr i} {
|
||||||
|
# wait for the replica to start reading the rdb
|
||||||
|
# using the log file since the replica only responds to INFO once in 2mb
|
||||||
|
wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1
|
||||||
|
|
||||||
|
# add some additional random sleep so that we kill the master on a different place each time
|
||||||
|
after [expr {int(rand()*100)}]
|
||||||
|
|
||||||
|
# kill the replica connection on the master
|
||||||
|
set killed [$master client kill type replica]
|
||||||
|
|
||||||
|
if {[catch {
|
||||||
|
set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10]
|
||||||
|
if {$::verbose} {
|
||||||
|
puts $res
|
||||||
|
}
|
||||||
|
}]} {
|
||||||
|
puts "failed triggering short read"
|
||||||
|
# force the replica to try another full sync
|
||||||
|
$master client kill type replica
|
||||||
|
$master set asdf asdf
|
||||||
|
# the side effect of resizing the backlog is that it is flushed (16k is the min size)
|
||||||
|
$master config set repl-backlog-size [expr {16384 + $i}]
|
||||||
|
}
|
||||||
|
# wait for loading to stop (fail)
|
||||||
|
wait_for_condition 100 10 {
|
||||||
|
[s -1 loading] eq 0
|
||||||
|
} else {
|
||||||
|
fail "Replica didn't disconnect"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
# enable fast shutdown
|
||||||
|
$master config set rdb-key-save-delay 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} {
|
||||||
|
set retry $maxtries
|
||||||
|
set stdout [srv $srv_idx stdout]
|
||||||
|
while {$retry} {
|
||||||
|
set result [exec tail -$last_lines < $stdout]
|
||||||
|
set result [split $result "\n"]
|
||||||
|
foreach line $result {
|
||||||
|
if {[string match $pattern $line]} {
|
||||||
|
return $line
|
||||||
|
}
|
||||||
|
}
|
||||||
|
incr retry -1
|
||||||
|
after $delay
|
||||||
|
}
|
||||||
|
if {$retry == 0} {
|
||||||
|
fail "log message of '$pattern' not found"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
# Random integer between 0 and max (excluded).
|
# Random integer between 0 and max (excluded).
|
||||||
proc randomInt {max} {
|
proc randomInt {max} {
|
||||||
expr {int(rand()*$max)}
|
expr {int(rand()*$max)}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user