diff --git a/src/rdb.c b/src/rdb.c index 0fcf426f..60dd7113 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1065,20 +1065,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.loading_process_events_interval_bytes && (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) { - if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) { - static time_t newline_sent; - /* Avoid the master to detect the slave is timing out while - * loading the RDB file in initial synchronization. We send - * a single newline character that is valid protocol but is - * guaranteed to either be sent entierly or not, since the byte - * is indivisible. */ - if (time(NULL) != newline_sent) { - newline_sent = time(NULL); - if (write(server.repl_transfer_s,"\n",1) == -1) { - /* Pinging back in this stage is best-effort. */ - } - } - } + if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) + replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } diff --git a/src/redis.h b/src/redis.h index 16403a8b..42f531ea 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1070,6 +1070,7 @@ int replicationScriptCacheExists(sds sha1); void processClientsWaitingReplicas(void); void unblockClientWaitingReplicas(redisClient *c); int replicationCountAcksByOffset(long long offset); +void replicationSendNewlineToMaster(void); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/replication.c b/src/replication.c index 5044ca33..a6c5d0d4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -715,6 +715,31 @@ void replicationAbortSyncTransfer(void) { server.repl_state = REDIS_REPL_CONNECT; } +/* Avoid the master to detect the slave is timing out while loading the + * RDB file in initial synchronization. We send a single newline character + * that is valid protocol but is guaranteed to either be sent entierly or + * not, since the byte is indivisible. + * + * The function is called in two contexts: while we flush the current + * data with emptyDb(), and while we load the new data received as an + * RDB file from the master. */ +void replicationSendNewlineToMaster(void) { + static time_t newline_sent; + if (time(NULL) != newline_sent) { + newline_sent = time(NULL); + if (write(server.repl_transfer_s,"\n",1) == -1) { + /* Pinging back in this stage is best-effort. */ + } + } +} + +/* Callback used by emptyDb() while flushing away old data to load + * the new dataset received by the master. */ +void replicationEmptyDbCallback(void *privdata) { + REDIS_NOTUSED(privdata); + replicationSendNewlineToMaster(); +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -796,7 +821,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); signalFlushedDb(-1); - emptyDb(NULL); + emptyDb(replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to