Merge branch 'aofrdb' into unstable

This commit is contained in:
antirez 2016-09-09 15:03:21 +02:00
commit 3793afa0ba
8 changed files with 240 additions and 141 deletions

View File

@ -755,6 +755,20 @@ auto-aof-rewrite-min-size 64mb
# will be found. # will be found.
aof-load-truncated yes aof-load-truncated yes
# When rewriting the AOF file, Redis is able to use an RDB preamble in the
# AOF file for faster rewrites and recoveries. When this option is turned
# on the rewritten AOF file is composed of two different stanzas:
#
# [RDB file][AOF tail]
#
# When loading Redis recognizes that the AOF file starts with the "REDIS"
# string and loads the prefixed RDB file, and continues loading the AOF
# tail.
#
# This is currently turned off by default in order to avoid the surprise
# of a format change, but will at some point be used as the default.
aof-use-rdb-preamble no
################################ LUA SCRIPTING ############################### ################################ LUA SCRIPTING ###############################
# Max execution time of a Lua script in milliseconds. # Max execution time of a Lua script in milliseconds.

151
src/aof.c
View File

@ -616,19 +616,23 @@ int loadAppendOnlyFile(char *filename) {
struct redis_stat sb; struct redis_stat sb;
int old_aof_state = server.aof_state; int old_aof_state = server.aof_state;
long loops = 0; long loops = 0;
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */ off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return C_ERR;
}
if (fp == NULL) { if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1); exit(1);
} }
/* Handle a zero-length AOF file as a special case. An emtpy AOF file
* is a valid AOF because an empty server with AOF enabled will create
* a zero length file at startup, that will remain like that if no write
* operation is received. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return C_ERR;
}
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */ * to the same file we're about to read. */
server.aof_state = AOF_OFF; server.aof_state = AOF_OFF;
@ -636,6 +640,28 @@ int loadAppendOnlyFile(char *filename) {
fakeClient = createFakeClient(); fakeClient = createFakeClient();
startLoading(fp); startLoading(fp);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
/* No RDB preamble, seek back at 0 offset. */
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
/* RDB preamble. Pass loading the RDB functions. */
rio rdb;
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
}
}
/* Read the actual AOF file, in REPL format, command by command. */
while(1) { while(1) {
int argc, j; int argc, j;
unsigned long len; unsigned long len;
@ -989,7 +1015,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
} }
/* Call the module type callback in order to rewrite a data type /* Call the module type callback in order to rewrite a data type
* taht is exported by a module and is not handled by Redis itself. * that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */ * The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) { int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io; RedisModuleIO io;
@ -1015,51 +1041,23 @@ ssize_t aofReadDiffFromParent(void) {
return total; return total;
} }
/* Write a sequence of commands able to fully rebuild the dataset into int rewriteAppendOnlyFileRio(rio *aof) {
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL; dictIterator *di = NULL;
dictEntry *de; dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
char byte;
size_t processed = 0; size_t processed = 0;
long long now = mstime();
int j;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j; redisDb *db = server.db+j;
dict *d = db->dict; dict *d = db->dict;
if (dictSize(d) == 0) continue; if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d); di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return C_ERR;
}
/* SELECT the new DB */ /* SELECT the new DB */
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */ /* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
@ -1080,39 +1078,83 @@ int rewriteAppendOnlyFile(char *filename) {
if (o->type == OBJ_STRING) { if (o->type == OBJ_STRING) {
/* Emit a SET command */ /* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n"; char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */ /* Key and value */
if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr; if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) { } else if (o->type == OBJ_LIST) {
if (rewriteListObject(&aof,&key,o) == 0) goto werr; if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) { } else if (o->type == OBJ_SET) {
if (rewriteSetObject(&aof,&key,o) == 0) goto werr; if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) { } else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) { } else if (o->type == OBJ_HASH) {
if (rewriteHashObject(&aof,&key,o) == 0) goto werr; if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) { } else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(&aof,&key,o) == 0) goto werr; if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else { } else {
serverPanic("Unknown object type"); serverPanic("Unknown object type");
} }
/* Save the expire time */ /* Save the expire time */
if (expiretime != -1) { if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
} }
/* Read some diff from the parent process from time to time. */ /* Read some diff from the parent process from time to time. */
if (aof.processed_bytes > processed+1024*10) { if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof.processed_bytes; processed = aof->processed_bytes;
aofReadDiffFromParent(); aofReadDiffFromParent();
} }
} }
dictReleaseIterator(di); dictReleaseIterator(di);
di = NULL; di = NULL;
} }
return C_OK;
werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending /* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */ * data, in order to make the next final fsync faster. */
@ -1178,7 +1220,6 @@ werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp); fclose(fp);
unlink(tmpfile); unlink(tmpfile);
if (di) dictReleaseIterator(di);
return C_ERR; return C_ERR;
} }

View File

@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) {
if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr; err = "argument must be 'yes' or 'no'"; goto loaderr;
} }
} else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@ -953,6 +957,8 @@ void configSetCommand(client *c) {
"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) { "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) {
} config_set_bool_field( } config_set_bool_field(
"aof-load-truncated",server.aof_load_truncated) { "aof-load-truncated",server.aof_load_truncated) {
} config_set_bool_field(
"aof-use-rdb-preamble",server.aof_use_rdb_preamble) {
} config_set_bool_field( } config_set_bool_field(
"slave-serve-stale-data",server.repl_serve_stale_data) { "slave-serve-stale-data",server.repl_serve_stale_data) {
} config_set_bool_field( } config_set_bool_field(
@ -1227,6 +1233,8 @@ void configGetCommand(client *c) {
server.aof_rewrite_incremental_fsync); server.aof_rewrite_incremental_fsync);
config_get_bool_field("aof-load-truncated", config_get_bool_field("aof-load-truncated",
server.aof_load_truncated); server.aof_load_truncated);
config_get_bool_field("aof-use-rdb-preamble",
server.aof_use_rdb_preamble);
config_get_bool_field("lazyfree-lazy-eviction", config_get_bool_field("lazyfree-lazy-eviction",
server.lazyfree_lazy_eviction); server.lazyfree_lazy_eviction);
config_get_bool_field("lazyfree-lazy-expire", config_get_bool_field("lazyfree-lazy-expire",
@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ);
rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC);
rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED);
rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION); rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION);
rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE); rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);

View File

@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
} }
/* Save a few default AUX fields with information about the RDB generated. */ /* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb) { int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32; int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */ /* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1; return 1;
} }
@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) {
* When the function returns C_ERR and if 'error' is not NULL, the * When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O * integer pointed by 'error' is set to the value of errno just after the I/O
* error. */ * error. */
int rdbSaveRio(rio *rdb, int *error) { int rdbSaveRio(rio *rdb, int *error, int flags) {
dictIterator *di = NULL; dictIterator *di = NULL;
dictEntry *de; dictEntry *de;
char magic[10]; char magic[10];
int j; int j;
long long now = mstime(); long long now = mstime();
uint64_t cksum; uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum) if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum; rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j; redisDb *db = server.db+j;
@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) {
initStaticStringObject(key,keystr); initStaticStringObject(key,keystr);
expire = getExpire(db,&key); expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr; if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
} }
dictReleaseIterator(di); dictReleaseIterator(di);
} }
@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error) == C_ERR) goto werr; if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK; return C_OK;
@ -955,7 +968,7 @@ int rdbSave(char *filename) {
} }
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fp);
if (rdbSaveRio(&rdb,&error) == C_ERR) { if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
errno = error; errno = error;
goto werr; goto werr;
} }
@ -1373,67 +1386,61 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
} }
} }
int rdbLoad(char *filename) { /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb) {
uint64_t dbid; uint64_t dbid;
int type, rdbver; int type, rdbver;
redisDb *db = server.db+0; redisDb *db = server.db+0;
char buf[1024]; char buf[1024];
long long expiretime, now = mstime(); long long expiretime, now = mstime();
FILE *fp;
rio rdb;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR; rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
rioInitWithFile(&rdb,fp); if (rioRead(rdb,buf,9) == 0) goto eoferr;
rdb.update_cksum = rdbLoadProgressCallback;
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0'; buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) { if (memcmp(buf,"REDIS",5) != 0) {
fclose(fp);
serverLog(LL_WARNING,"Wrong signature trying to load DB from file"); serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
rdbver = atoi(buf+5); rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) { if (rdbver < 1 || rdbver > RDB_VERSION) {
fclose(fp);
serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver); serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL; errno = EINVAL;
return C_ERR; return C_ERR;
} }
startLoading(fp);
while(1) { while(1) {
robj *key, *val; robj *key, *val;
expiretime = -1; expiretime = -1;
/* Read type. */ /* Read type. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* Handle special types. */ /* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
/* EXPIRETIME: load an expire associated with the next key /* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to * to load. Note that after loading an expire we need to
* load the actual type, and continue. */ * load the actual type, and continue. */
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */ /* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert /* the EXPIRETIME opcode specifies time in seconds, so convert
* into milliseconds. */ * into milliseconds. */
expiretime *= 1000; expiretime *= 1000;
} else if (type == RDB_OPCODE_EXPIRETIME_MS) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced /* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */ * with RDB v3. Like EXPIRETIME but no with more precision. */
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr;
/* We read the time so we need to read the object type again. */ /* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
} else if (type == RDB_OPCODE_EOF) { } else if (type == RDB_OPCODE_EOF) {
/* EOF: End of file, exit the main loop. */ /* EOF: End of file, exit the main loop. */
break; break;
} else if (type == RDB_OPCODE_SELECTDB) { } else if (type == RDB_OPCODE_SELECTDB) {
/* SELECTDB: Select the specified database. */ /* SELECTDB: Select the specified database. */
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
if (dbid >= (unsigned)server.dbnum) { if (dbid >= (unsigned)server.dbnum) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
@ -1448,9 +1455,9 @@ int rdbLoad(char *filename) {
/* RESIZEDB: Hint about the size of the keys in the currently /* RESIZEDB: Hint about the size of the keys in the currently
* selected data base, in order to avoid useless rehashing. */ * selected data base, in order to avoid useless rehashing. */
uint64_t db_size, expires_size; uint64_t db_size, expires_size;
if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
dictExpand(db->dict,db_size); dictExpand(db->dict,db_size);
dictExpand(db->expires,expires_size); dictExpand(db->expires,expires_size);
@ -1462,8 +1469,8 @@ int rdbLoad(char *filename) {
* *
* An AUX field is composed of two strings: key and value. */ * An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval; robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if (((char*)auxkey->ptr)[0] == '%') { if (((char*)auxkey->ptr)[0] == '%') {
/* All the fields with a name staring with '%' are considered /* All the fields with a name staring with '%' are considered
@ -1485,9 +1492,9 @@ int rdbLoad(char *filename) {
} }
/* Read key */ /* Read key */
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */ /* Read value */
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is * received from the master. In the latter case, the master is
@ -1508,9 +1515,9 @@ int rdbLoad(char *filename) {
} }
/* Verify the checksum if RDB version is >= 5 */ /* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5 && server.rdb_checksum) { if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum; uint64_t cksum, expected = rdb->cksum;
if (rioRead(&rdb,&cksum,8) == 0) goto eoferr; if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
memrev64ifbe(&cksum); memrev64ifbe(&cksum);
if (cksum == 0) { if (cksum == 0) {
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed."); serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
@ -1519,9 +1526,6 @@ int rdbLoad(char *filename) {
rdbExitReportCorruptRDB("RDB CRC error"); rdbExitReportCorruptRDB("RDB CRC error");
} }
} }
fclose(fp);
stopLoading();
return C_OK; return C_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */ eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -1530,6 +1534,24 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
return C_ERR; /* Just to avoid warning */ return C_ERR; /* Just to avoid warning */
} }
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
* filename is open for reading and a rio stream object created in order
* to do the actual loading. Moreover the ETA displayed in the INFO
* output is initialized and finalized. */
int rdbLoad(char *filename) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb);
fclose(fp);
stopLoading();
return retval;
}
/* A background saving child (BGSAVE) terminated its work. Handle this. /* A background saving child (BGSAVE) terminated its work. Handle this.
* This function covers the case of actual BGSAVEs. */ * This function covers the case of actual BGSAVEs. */
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {

View File

@ -106,6 +106,9 @@
#define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2) #define RDB_LOAD_SDS (1<<2)
#define RDB_SAVE_NONE 0
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
int rdbSaveType(rio *rdb, unsigned char type); int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb); int rdbLoadType(rio *rdb);
int rdbSaveTime(rio *rdb, time_t t); int rdbSaveTime(rio *rdb, time_t t);
@ -131,5 +134,6 @@ ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr); void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr);
int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbLoadRio(rio *rdb);
#endif #endif

View File

@ -1345,6 +1345,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0; server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL; server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);

View File

@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_REWRITE_PERC 100 #define AOF_REWRITE_PERC 100
#define AOF_REWRITE_MIN_SIZE (64*1024*1024) #define AOF_REWRITE_MIN_SIZE (64*1024*1024)
#define AOF_REWRITE_ITEMS_PER_CMD 64 #define AOF_REWRITE_ITEMS_PER_CMD 64
#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
#define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000 #define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000
#define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128 #define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128
#define CONFIG_DEFAULT_MAX_CLIENTS 10000 #define CONFIG_DEFAULT_MAX_CLIENTS 10000
@ -136,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof" #define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof"
#define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0 #define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
#define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1 #define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1
#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0
#define CONFIG_DEFAULT_ACTIVE_REHASHING 1 #define CONFIG_DEFAULT_ACTIVE_REHASHING 1
#define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1 #define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
#define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0 #define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0
@ -900,6 +902,7 @@ struct redisServer {
int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_status; /* C_OK or C_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
/* AOF pipes used to communicate between parent and child during rewrite. */ /* AOF pipes used to communicate between parent and child during rewrite. */
int aof_pipe_write_data_to_child; int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent; int aof_pipe_read_data_from_parent;
@ -1365,6 +1368,7 @@ void stopLoading(void);
/* RDB persistence */ /* RDB persistence */
#include "rdb.h" #include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags);
/* AOF persistence */ /* AOF persistence */
void flushAppendOnlyFile(int force); void flushAppendOnlyFile(int force);
@ -1377,6 +1381,7 @@ int startAppendOnly(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal); void backgroundRewriteDoneHandler(int exitcode, int bysignal);
void aofRewriteBufferReset(void); void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void); unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
/* Sorted sets data type */ /* Sorted sets data type */

View File

@ -4,7 +4,9 @@ start_server {tags {"aofrw"}} {
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite. r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
waitForBgrewriteaof r waitForBgrewriteaof r
test {AOF rewrite during write load} { foreach rdbpre {yes no} {
r config set aof-use-rdb-preamble $rdbpre
test "AOF rewrite during write load: RDB preamble=$rdbpre" {
# Start a write load for 10 seconds # Start a write load for 10 seconds
set master [srv 0 client] set master [srv 0 client]
set master_host [srv 0 host] set master_host [srv 0 host]
@ -59,6 +61,7 @@ start_server {tags {"aofrw"}} {
# Make sure they are the same # Make sure they are the same
assert {$d1 eq $d2} assert {$d1 eq $d2}
} }
}
} }
start_server {tags {"aofrw"}} { start_server {tags {"aofrw"}} {