diff --git a/src/rdb.c b/src/rdb.c index bd6d1e57..4d45c424 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -689,6 +689,32 @@ werr: return REDIS_ERR; } +/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix + * and a suffix to the generated RDB dump. The prefix is: + * + * $EOF:<40 bytes unguessable hex string>\r\n + * + * While the suffix is the 40 bytes hex string we announced in the prefix. + * This way processes receiving the payload can understand when it ends + * without doing any processing of the content. */ +int rdbSaveRioWithEOFMark(rio *rdb, int *error) { + char eofmark[REDIS_EOF_MARK_SIZE]; + + getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE); + if (error) *error = 0; + if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + if (rioWrite(rdb,"\r\n",2) == 0) goto werr; + if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + return REDIS_OK; + +werr: /* Write error. */ + /* Set 'error' only if not already set by rdbSaveRio() call. */ + if (error && *error == 0) *error = errno; + return REDIS_ERR; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */ int rdbSave(char *filename) { char tmpfile[256]; @@ -1211,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return REDIS_ERR; /* Just to avoid warning */ } -/* A background saving child (BGSAVE) terminated its work. Handle this. */ -void backgroundSaveDoneHandler(int exitcode, int bysignal) { +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of actual BGSAVEs. */ +void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); @@ -1242,7 +1269,113 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ - updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR); + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK); +} + +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of RDB -> Salves socket transfers for + * diskless replication. */ +void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + if (!bysignal && exitcode == 0) { + redisLog(REDIS_NOTICE, + "Background RDB transfer terminated with success"); + } else if (!bysignal && exitcode != 0) { + redisLog(REDIS_WARNING, "Background transfer error"); + } else { + redisLog(REDIS_WARNING, + "Background transfer terminated by signal %d", bysignal); + } + server.rdb_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; + server.rdb_save_time_start = -1; + /* Possibly there are slaves waiting for a BGSAVE in order to be served + * (the first stage of SYNC is a bulk transfer of dump.rdb) */ + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); +} + +/* When a background RDB saving/transfer terminates, call the right handler. */ +void backgroundSaveDoneHandler(int exitcode, int bysignal) { + switch(server.rdb_child_type) { + case REDIS_RDB_CHILD_TYPE_DISK: + backgroundSaveDoneHandlerDisk(exitcode,bysignal); + break; + case REDIS_RDB_CHILD_TYPE_SOCKET: + backgroundSaveDoneHandlerSocket(exitcode,bysignal); + break; + default: + redisPanic("Unknown RDB child type."); + break; + } +} + +/* Spawn an RDB child that writes the RDB to the sockets of the slaves + * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */ +int rdbSaveToSlavesSockets(void) { + int *fds; + int numfds; + listNode *ln; + listIter li; + pid_t childpid; + long long start; + + if (server.rdb_child_pid != -1) return REDIS_ERR; + + fds = zmalloc(sizeof(int)*listLength(server.slaves)); + numfds = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + fds[numfds++] = slave->fd; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } + } + + /* Fork ... */ + start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + int retval; + rio slave_sockets; + + rioInitWithFdset(&slave_sockets,fds,numfds); + zfree(fds); + + closeListeningSockets(0); + redisSetProcTitle("redis-rdb-to-slaves"); + + retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + if (retval == REDIS_OK) { + size_t private_dirty = zmalloc_get_private_dirty(); + + if (private_dirty) { + redisLog(REDIS_NOTICE, + "RDB: %zu MB of memory used by copy-on-write", + private_dirty/(1024*1024)); + } + } + exitFromChild((retval == REDIS_OK) ? 0 : 1); + } else { + /* Parent */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + redisLog(REDIS_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid); + server.rdb_save_time_start = time(NULL); + server.rdb_child_pid = childpid; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET; + updateDictResizePolicy(); + zfree(fds); + return REDIS_OK; + } + return REDIS_OK; /* unreached */ } void saveCommand(redisClient *c) { diff --git a/src/rdb.h b/src/rdb.h index 54ee4e51..eb40d499 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename); int rdbSaveBackground(char *filename); +int rdbSaveToSlavesSockets(void); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char *filename); int rdbSaveObject(rio *rdb, robj *o); diff --git a/src/redis.c b/src/redis.c index 3340ecef..d8ba5675 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1480,6 +1480,7 @@ void initServerConfig(void) { server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; + server.repl_diskless = REDIS_DEFAULT_RDB_DISKLESS; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; diff --git a/src/redis.h b/src/redis.h index 5e756f0f..d8f2b444 100644 --- a/src/redis.h +++ b/src/redis.h @@ -96,6 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_REPL_TIMEOUT 60 #define REDIS_REPL_PING_SLAVE_PERIOD 10 #define REDIS_RUN_ID_SIZE 40 +#define REDIS_EOF_MARK_SIZE 40 #define REDIS_OPS_SEC_SAMPLES 16 #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */ #define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */ @@ -113,6 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_COMPRESSION 1 #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" +#define REDIS_DEFAULT_RDB_DISKLESS 0 +#define REIDS_DEFAULT_RDB_DISKLESS_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 @@ -796,6 +799,7 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ + int repl_diskless; /* Send RDB to slaves sockets directly. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ @@ -1138,7 +1142,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc); -void updateSlavesWaitingBgsave(int bgsaveerr); +void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); void replicationHandleMasterDisconnection(void); void replicationCacheMaster(redisClient *c); diff --git a/src/replication.c b/src/replication.c index f4ac58f4..ba39fddb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -408,6 +408,28 @@ need_full_resync: return REDIS_ERR; } +/* Start a BGSAVE for replication goals, which is, selecting the disk or + * socket target depending on the configuration, and making sure that + * the script cache is flushed before to start. + * + * Returns REDIS_OK on success or REDIS_ERR otherwise. */ +int startBgsaveForReplication(void) { + int retval; + + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", + server.repl_diskless ? "slaves sockets" : "disk"); + + if (server.repl_diskless) + retval = rdbSaveToSlavesSockets(); + else + retval = rdbSaveBackground(server.rdb_filename); + + /* Flush the script cache, since we need that slave differences are + * accumulated without requiring slaves to match our cached scripts. */ + if (retval == REDIS_OK) replicationScriptCacheFlush(); + return retval; +} + /* SYNC and PSYNC command implemenation. */ void syncCommand(redisClient *c) { /* ignore SYNC if already slave or in monitor mode */ @@ -465,7 +487,9 @@ void syncCommand(redisClient *c) { /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.rdb_child_pid != -1) { + if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) + { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ @@ -480,12 +504,7 @@ void syncCommand(redisClient *c) { } if (ln) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. - * - * Note that if we found a slave in WAIT_BGSAVE_END state, this - * means that the current child is of type - * REDIS_RDB_CHILD_TYPE_DISK, since the first slave in this state - * can only be added when an RDB save with disk target is started. */ + * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); @@ -495,17 +514,31 @@ void syncCommand(redisClient *c) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } + } else if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) + { + /* There is an RDB child process but it is writing directly to + * children sockets. We need to wait for the next BGSAVE + * in order to synchronize. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - /* Ok we don't have a BGSAVE in progress, let's start one. */ - redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { - redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplyError(c,"Unable to perform background save"); - return; + if (server.repl_diskless) { + /* Diskless replication RDB child is created inside + * replicationCron() since we want to delay its start a + * few seconds to wait for more slaves to arrive. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); + } else { + /* Ok we don't have a BGSAVE in progress, let's start one. */ + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); + if (startBgsaveForReplication() != REDIS_OK) { + redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); + addReplyError(c,"Unable to perform background save"); + return; + } + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } - c->replstate = REDIS_REPL_WAIT_BGSAVE_END; - /* Flush the script cache for the new slave. */ - replicationScriptCacheFlush(); } if (server.repl_disable_tcp_nodelay) @@ -644,10 +677,15 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { /* This function is called at the end of every background saving. * The argument bgsaveerr is REDIS_OK if the background saving succeeded * otherwise REDIS_ERR is passed to the function. + * The 'type' argument is the type of the child that terminated + * (if it had a disk or socket target). * * The goal of this function is to handle slaves waiting for a successful - * background saving in order to perform non-blocking synchronization. */ -void updateSlavesWaitingBgsave(int bgsaveerr) { + * background saving in order to perform non-blocking synchronization, and + * to schedule a new BGSAVE if there are slaves that attached while a + * BGSAVE was in progress, but it was not a good one for replication (no + * other slave was accumulating differences). */ +void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; int startbgsave = 0; listIter li; @@ -687,12 +725,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr) { } } if (startbgsave) { - /* Since we are starting a new background save for one or more slaves, - * we flush the Replication Script Cache to use EVAL to propagate every - * new EVALSHA for the first time, since all the new slaves don't know - * about previous scripts. */ - replicationScriptCacheFlush(); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { + if (startBgsaveForReplication() != REDIS_OK) { listIter li; listRewind(server.slaves,&li);