Merge pull request #6193 from oranagra/diskless_slave_refresh3

diskless replication on slave side (don't store rdb to file), plus some other related fixes
This commit is contained in:
Salvatore Sanfilippo 2019-07-08 18:10:16 +02:00 committed by GitHub
commit d984732b35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 648 additions and 252 deletions

View File

@ -377,6 +377,22 @@ repl-diskless-sync no
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
# Replica can load the rdb it reads from the replication link directly from the
# socket, or store the rdb to a file and read that file after it was completely
# recived from the master.
# In many cases the disk is slower than the network, and storing and loading
# the rdb file may increase replication time (and even increase the master's
# Copy on Write memory and salve buffers).
# However, parsing the rdb file directly from the socket may mean that we have
# to flush the contents of the current database before the full rdb was received.
# for this reason we have the following options:
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
# the data directly from the socket. note that this requires
# sufficient memory, if you don't have it, you risk an OOM kill.
repl-diskless-load disabled
# Replicas send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_replica_period option. The default value is 10
# seconds.

View File

@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}
/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified
* number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetRecvTimeout(char *err, int fd, long long ms) {
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms%1000)*1000;
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".

View File

@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);

View File

@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createFakeClient();
startLoading(fp);
startLoadingFile(fp, filename);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */

View File

@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = {
{NULL, 0}
};
configEnum repl_diskless_load_enum[] = {
{"disabled", REPL_DISKLESS_LOAD_DISABLED},
{"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY},
{"swapdb", REPL_DISKLESS_LOAD_SWAPDB},
{NULL, 0}
};
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@ -427,6 +434,11 @@ void loadServerConfigFromString(char *config) {
err = "repl-timeout must be 1 or greater";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) {
server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]);
if (server.repl_diskless_load == INT_MIN) {
err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'";
}
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
server.repl_diskless_sync_delay = atoi(argv[1]);
if (server.repl_diskless_sync_delay < 0) {
@ -466,12 +478,10 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
int yes;
if ((yes = yesnotoi(argv[1])) == -1) {
if ((server.aof_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
server.aof_state = yes ? AOF_ON : AOF_OFF;
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
} else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
if (!pathIsBaseName(argv[1])) {
err = "appendfilename can't be a path, just a filename";
@ -497,6 +507,12 @@ void loadServerConfigFromString(char *config) {
argc == 2)
{
server.aof_rewrite_min_size = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) {
server.rdb_key_save_delay = atoi(argv[1]);
if (server.rdb_key_save_delay < 0) {
err = "rdb-key-save-delay can't be negative";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@ -942,6 +958,7 @@ void configSetCommand(client *c) {
int enable = yesnotoi(o->ptr);
if (enable == -1) goto badfmt;
server.aof_enabled = enable;
if (enable == 0 && server.aof_state != AOF_OFF) {
stopAppendOnly();
} else if (enable && server.aof_state == AOF_OFF) {
@ -1132,6 +1149,8 @@ void configSetCommand(client *c) {
"slave-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field(
"replica-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field(
"rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
} config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) {
} config_set_numerical_field(
@ -1199,6 +1218,8 @@ void configSetCommand(client *c) {
"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) {
} config_set_enum_field(
"appendfsync",server.aof_fsync,aof_fsync_enum) {
} config_set_enum_field(
"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) {
/* Everyhing else is an error... */
} config_set_else {
@ -1346,6 +1367,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */
@ -1370,12 +1392,14 @@ void configGetCommand(client *c) {
server.aof_fsync,aof_fsync_enum);
config_get_enum_field("syslog-facility",
server.syslog_facility,syslog_facility_enum);
config_get_enum_field("repl-diskless-load",
server.repl_diskless_load,repl_diskless_load_enum);
/* Everything we can't handle with macros follows. */
if (stringmatch(pattern,"appendonly",1)) {
addReplyBulkCString(c,"appendonly");
addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes");
addReplyBulkCString(c,server.aof_enabled ? "yes" : "no");
matches++;
}
if (stringmatch(pattern,"dir",1)) {
@ -2109,6 +2133,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT);
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE);
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD);
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE);
@ -2128,7 +2153,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC);
@ -2157,6 +2182,7 @@ int rewriteConfig(char *path) {
rewriteConfigClientoutputbufferlimitOption(state);
rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
/* Rewrite Sentinel config if in Sentinel mode. */
if (server.sentinel_mode) rewriteConfigSentinelOption(state);

View File

@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
removed += dictSize(dbarray[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);
emptyDbAsync(&dbarray[j]);
} else {
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
dictEmpty(dbarray[j].dict,callback);
dictEmpty(dbarray[j].expires,callback);
}
}
if (server.cluster_enabled) {
@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
return removed;
}
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
return emptyDbGeneric(server.db, dbnum, flags, callback);
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
@ -388,6 +392,15 @@ int selectDb(client *c, int id) {
return C_OK;
}
long long dbTotalServerKeyCount() {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += dictSize(server.db[j].dict);
}
return total;
}
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*

View File

@ -44,6 +44,7 @@
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) {
if (!rdbCheckMode) {
serverLog(LL_WARNING, "%s", msg);
char *argv[2] = {"",server.rdb_filename};
if (rdbFileBeingLoaded) {
char *argv[2] = {"",rdbFileBeingLoaded};
redis_check_rdb_main(2,argv,NULL);
} else {
serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
return;
}
} else {
rdbCheckError("%s",msg);
}
serverLog(LL_WARNING, "Terminating server after rdb file reading failure.");
exit(1);
}
@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
usleep(server.rdb_key_save_delay);
return 1;
}
@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(FILE *fp) {
struct stat sb;
void startLoading(size_t size) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
if (fstat(fileno(fp), &sb) == -1) {
server.loading_total_bytes = 0;
} else {
server.loading_total_bytes = sb.st_size;
}
server.loading_total_bytes = size;
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
void startLoadingFile(FILE *fp, char* filename) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
startLoading(sb.st_size);
}
/* Refresh the loading progress info */
@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(void) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
}
/* Track loading progress in order to serve client's from time to time
@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
startLoadingFile(fp, filename);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp);

View File

@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
startLoading(fp);
startLoadingFile(fp, rdbfilename);
while(1) {
robj *key, *val;
@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
stopLoading();
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
stopLoading();
return 1;
}

View File

@ -1113,11 +1113,22 @@ void restartAOFAfterSYNC() {
}
}
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
int use_diskless_load;
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
int i;
off_t left;
UNUSED(el);
UNUSED(privdata);
@ -1173,18 +1184,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master");
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
useDisklessLoad()? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
useDisklessLoad()? "to parser":"to disk");
}
return;
}
/* Read bulk data */
use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
/* read the data from the socket, store it to a file and search for the EOF */
if (usemark) {
readlen = sizeof(buf);
} else {
@ -1253,10 +1269,92 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (!eof_reached)
return;
}
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* We reach here when the slave is using diskless replication,
* or when we are done reading from the socket to the rdb file. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
signalFlushedDb(-1);
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* create a backup of the current db */
diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum);
for (i=0; i<server.dbnum; i++) {
diskless_load_backup[i] = server.db[i];
server.db[i].dict = dictCreate(&dbDictType,NULL);
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
}
} else {
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
anetBlock(NULL,fd);
anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
startLoading(server.repl_transfer_size);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
/* rdbloading failed */
stopLoading();
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* restore the backed up db */
emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(server.db[i].dict);
dictRelease(server.db[i].expires);
server.db[i] = diskless_load_backup[i];
}
zfree(diskless_load_backup);
} else {
/* Remove the half-loaded data */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
return;
}
stopLoading();
/* rdbloading succeeded */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* delete the backup db that we created before starting to load the new rdb */
emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(diskless_load_backup[i].dict);
dictRelease(diskless_load_backup[i].expires);
}
zfree(diskless_load_backup);
}
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) {
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
return;
}
}
/* get the unread command stream from the rio buffer */
rioFreeFd(&rdb, NULL);
/* Restore the socket as non-blocking. */
anetNonBlock(NULL,fd);
anetRecvTimeout(NULL,fd,0);
} else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
@ -1274,33 +1372,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore
* the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
@ -1320,8 +1404,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
}
if (server.aof_enabled) restartAOFAfterSYNC();
return;
error:
@ -1845,6 +1928,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
@ -1856,6 +1940,9 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
@ -1871,15 +1958,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
if (dfd != -1) close(dfd);
close(fd);
if (server.repl_transfer_fd != -1)
close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile)
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
@ -1933,9 +2024,13 @@ void undoConnectWithMaster(void) {
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
}
}
/* This function aborts a non blocking replication attempt if there is one
@ -2045,6 +2140,9 @@ void replicaofCommand(client *c) {
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
/* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
} else {
long port;

109
src/rio.c
View File

@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
/* ------------------- File descriptor implementation ------------------- */
static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not yet support writing. */
}
/* Returns 1 or 0 for success/failure. */
static size_t rioFdRead(rio *r, void *buf, size_t len) {
size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
/* if the buffer is too small for the entire request: realloc */
if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
/* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
r->io.fd.pos = 0;
}
/* if we don't already have all the data in the sds, read more */
while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
size_t toread = len - buffered;
/* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */
if (toread < PROTO_IOBUF_LEN)
toread = PROTO_IOBUF_LEN;
if (toread > sdsavail(r->io.fd.buf))
toread = sdsavail(r->io.fd.buf);
if (r->io.fd.read_limit != 0 &&
r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) {
if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
else {
errno = EOVERFLOW;
return 0;
}
}
int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread);
if (retval <= 0) {
if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
return 0;
}
sdsIncrLen(r->io.fd.buf, retval);
}
memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len);
r->io.fd.read_so_far += len;
r->io.fd.pos += len;
return len;
}
/* Returns read/write position in file. */
static off_t rioFdTell(rio *r) {
return r->io.fd.read_so_far;
}
/* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */
static int rioFdFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */
return rioFdWrite(r,NULL,0);
}
static const rio rioFdIO = {
rioFdRead,
rioFdWrite,
rioFdTell,
rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
/* create an rio that implements a buffered read from an fd
* read_limit argument stops buffering when the reaching the limit */
void rioInitWithFd(rio *r, int fd, size_t read_limit) {
*r = rioFdIO;
r->io.fd.fd = fd;
r->io.fd.pos = 0;
r->io.fd.read_limit = read_limit;
r->io.fd.read_so_far = 0;
r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(r->io.fd.buf);
}
/* release the rio stream.
* optionally returns the unread buffered data. */
void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
if (r->io.fd.pos > 0)
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
*out_remainingBufferedData = r->io.fd.buf;
} else {
sdsfree(r->io.fd.buf);
if (out_remainingBufferedData)
*out_remainingBufferedData = NULL;
}
r->io.fd.buf = NULL;
}
/* ------------------- File descriptors set implementation ------------------- */
/* Returns 1 or 0 for success/failure.
@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
* disk I/O concentrated in very little time. When we fsync in an explicit
* way instead the I/O pressure is more distributed across time. */
void rioSetAutoSync(rio *r, off_t bytes) {
serverAssert(r->read == rioFileIO.read);
if(r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}

View File

@ -73,6 +73,14 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* file descriptor */
struct {
int fd; /* File descriptor. */
off_t pos; /* pos in buf that was returned */
sds buf; /* buffered data */
size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */
} fd;
/* Multiple FDs target (used to write to N sockets). */
struct {
int *fds; /* File descriptors. */
@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithFd(rio *r, int fd, size_t read_limit);
void rioInitWithFdset(rio *r, int *fds, int numfds);
void rioFreeFdset(rio *r);
void rioFreeFd(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);

View File

@ -2265,6 +2265,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
@ -2334,6 +2335,9 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
@ -2342,6 +2346,7 @@ void initServerConfig(void) {
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD;
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
@ -4053,7 +4058,7 @@ sds genRedisInfoString(char *section) {
(server.aof_last_write_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes);
if (server.aof_state != AOF_OFF) {
if (server.aof_enabled) {
info = sdscatprintf(info,
"aof_current_size:%lld\r\n"
"aof_base_size:%lld\r\n"

View File

@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb"
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1
@ -394,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC
/* Replication diskless load defines */
#define REPL_DISKLESS_LOAD_DISABLED 0
#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1
#define REPL_DISKLESS_LOAD_SWAPDB 2
#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED
/* Zipped structures related defaults */
#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
@ -1158,6 +1165,7 @@ struct redisServer {
int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
@ -1214,6 +1222,8 @@ struct redisServer {
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing the RDB. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@ -1249,7 +1259,9 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
@ -1739,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
/* Generic persistence functions */
void startLoading(FILE *fp);
void startLoadingFile(FILE* fp, char* filename);
void startLoading(size_t size);
void loadingProgress(off_t pos);
void stopLoading(void);
@ -1996,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key);

View File

@ -1,12 +1,3 @@
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}
start_server {tags {"repl"}} {
start_server {} {

View File

@ -1,12 +1,3 @@
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} {
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} {
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} {
start_server {} {
@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
$master config set repl-backlog-size $backlog_size
$master config set repl-backlog-ttl $backlog_ttl
$master config set repl-diskless-sync $diskless
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
$slave config set repl-diskless-load $sdl
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" {
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
foreach diskless {no yes} {
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $diskless 0
} $mdl $sdl 0
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $diskless 1
} $mdl $sdl 1
}
}

View File

@ -183,26 +183,32 @@ start_server {tags {"repl"}} {
}
}
foreach dl {no yes} {
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
start_server {tags {"repl"}} {
set master [srv 0 client]
$master config set repl-diskless-sync $dl
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
set master_host [srv 0 host]
set master_port [srv 0 port]
set slaves {}
set load_handle0 [start_write_load $master_host $master_port 3]
set load_handle1 [start_write_load $master_host $master_port 5]
set load_handle2 [start_write_load $master_host $master_port 20]
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
set load_handle3 [start_write_load $master_host $master_port 8]
set load_handle4 [start_write_load $master_host $master_port 4]
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
test "Connect multiple replicas at the same time (issue #141), diskless=$dl" {
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
# Send SLAVEOF commands to slaves
[lindex $slaves 0] config set repl-diskless-load $sdl
[lindex $slaves 1] config set repl-diskless-load $sdl
[lindex $slaves 2] config set repl-diskless-load $sdl
[lindex $slaves 0] slaveof $master_host $master_port
[lindex $slaves 1] slaveof $master_host $master_port
[lindex $slaves 2] slaveof $master_host $master_port
@ -220,7 +226,7 @@ foreach dl {no yes} {
}
}
if {$retry == 0} {
error "assertion:Replicas not correctly synchronized"
error "assertion:Slaves not correctly synchronized"
}
# Wait that slaves acknowledge they are online so
@ -231,13 +237,13 @@ foreach dl {no yes} {
[lindex [[lindex $slaves 1] role] 3] eq {connected} &&
[lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
fail "Replicas still not connected after some time"
fail "Slaves still not connected after some time"
}
# Stop the write load
stop_write_load $load_handle0
stop_write_load $load_handle1
stop_write_load $load_handle2
stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2
stop_write_load $load_handle3
stop_write_load $load_handle4
@ -248,7 +254,7 @@ foreach dl {no yes} {
[$master dbsize] == [[lindex $slaves 1] dbsize] &&
[$master dbsize] == [[lindex $slaves 2] dbsize]
} else {
fail "Different number of keys between masted and replica after too long time."
fail "Different number of keys between master and replica after too long time."
}
# Check digests
@ -265,6 +271,7 @@ foreach dl {no yes} {
}
}
}
}
}
start_server {tags {"repl"}} {
@ -309,3 +316,70 @@ start_server {tags {"repl"}} {
}
}
}
test {slave fails full sync and diskless load swapdb recoveres it} {
start_server {tags {"repl"}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
set slave_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Put different data sets on the master and slave
# we need to put large keys on the master since the slave replies to info only once in 2mb
$slave debug populate 2000 slave 10
$master debug populate 200 master 100000
$master config set rdbcompression no
# Set master and slave to use diskless replication
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$slave config set repl-diskless-load swapdb
# Set master with a slow rdb generation, so that we can easily disconnect it mid sync
# 10ms per key, with 200 keys is 2 seconds
$master config set rdb-key-save-delay 10000
# Start the replication process...
$slave slaveof $master_host $master_port
# wait for the slave to start reading the rdb
wait_for_condition 50 100 {
[s -1 loading] eq 1
} else {
fail "Replica didn't get into loading mode"
}
# make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs
$master config set repl-diskless-sync-delay 5
# for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one)
$master config set rdb-key-save-delay 0
# waiting slave to do flushdb (key count drop)
wait_for_condition 50 100 {
2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d]
} else {
fail "Replica didn't flush"
}
# make sure we're still loading
assert_equal [s -1 loading] 1
# kill the slave connection on the master
set killed [$master client kill type slave]
# wait for loading to stop (fail)
wait_for_condition 50 100 {
[s -1 loading] eq 0
} else {
fail "Replica didn't disconnect"
}
# make sure the original keys were restored
assert_equal [$slave dbsize] 2000
}
}
}

View File

@ -399,3 +399,15 @@ proc lshuffle {list} {
}
return $slist
}
# Execute a background process writing complex data for the specified number
# of ops to the specified Redis instance.
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
# Stop a process generating write load executed with start_bg_complex_data.
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}