diff --git a/src/config.h b/src/config.h index 28ef37d6..617682fc 100644 --- a/src/config.h +++ b/src/config.h @@ -52,6 +52,14 @@ #define aof_fsync fsync #endif +/* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use + * the plain fsync() call. */ +#ifdef __linux__ +#define rdb_fsync_range(fd,off,size) sync_file_range(fd,off,size,SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE) +#else +#define rdb_fsync_range(fd,off,size) fsync(fd) +#endif + /* Byte ordering detection */ #include /* This will likely define BYTE_ORDER */ diff --git a/src/fmacros.h b/src/fmacros.h index 866a9afa..3e548765 100644 --- a/src/fmacros.h +++ b/src/fmacros.h @@ -3,6 +3,10 @@ #define _BSD_SOURCE +#if defined(__linux__) +#define _GNU_SOURCE +#endif + #if defined(__linux__) || defined(__OpenBSD__) #define _XOPEN_SOURCE 700 #else diff --git a/src/redis.c b/src/redis.c index 92f1323b..0d7432d2 100644 --- a/src/redis.c +++ b/src/redis.c @@ -2079,9 +2079,10 @@ sds genRedisInfoString(char *section) { if (server.repl_state == REDIS_REPL_TRANSFER) { info = sdscatprintf(info, - "master_sync_left_bytes:%ld\r\n" + "master_sync_left_bytes:%lld\r\n" "master_sync_last_io_seconds_ago:%d\r\n" - ,(long)server.repl_transfer_left, + , (long long) + (server.repl_transfer_size - server.repl_transfer_read), (int)(server.unixtime-server.repl_transfer_lastio) ); } diff --git a/src/redis.h b/src/redis.h index d12e9445..52bf5b8d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -684,7 +684,9 @@ struct redisServer { redisClient *master; /* Client that is master for this slave */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a slave */ - off_t repl_transfer_left; /* Bytes left reading .rdb */ + off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ + off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ int repl_transfer_s; /* Slave -> Master SYNC socket */ int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ diff --git a/src/replication.c b/src/replication.c index 45a223b8..543477b6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -311,16 +311,18 @@ void replicationAbortSyncTransfer(void) { } /* Asynchronously read the SYNC payload we receive from a master */ +#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen; + off_t left; REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); - /* If repl_transfer_left == -1 we still have to read the bulk length + /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ - if (server.repl_transfer_left == -1) { + if (server.repl_transfer_size == -1) { if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading bulk count from MASTER: %s", @@ -343,16 +345,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); goto error; } - server.repl_transfer_left = strtol(buf+1,NULL,10); + server.repl_transfer_size = strtol(buf+1,NULL,10); redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: receiving %ld bytes from master", - server.repl_transfer_left); + server.repl_transfer_size); return; } /* Read bulk data */ - readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ? - server.repl_transfer_left : (signed)sizeof(buf); + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); nread = read(fd,buf,readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", @@ -365,9 +367,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); goto error; } - server.repl_transfer_left -= nread; + server.repl_transfer_read += nread; + + /* Sync data on disk from time to time, otherwise at the end of the transfer + * we may suffer a big delay as the memory buffers are copied into the + * actual disk. */ + if (server.repl_transfer_read >= + server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) + { + off_t sync_size = server.repl_transfer_read - + server.repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, + server.repl_transfer_last_fsync_off, sync_size); + server.repl_transfer_last_fsync_off += sync_size; + } + /* Check if the transfer is now complete */ - if (server.repl_transfer_left == 0) { + if (server.repl_transfer_read == server.repl_transfer_size) { if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); @@ -538,7 +554,9 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } server.repl_state = REDIS_REPL_TRANSFER; - server.repl_transfer_left = -1; + server.repl_transfer_size = -1; + server.repl_transfer_read = 0; + server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile);