From 11120689c4c59b0fdde5446ea40f6abdc215d7c9 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 10 Dec 2013 18:38:26 +0100 Subject: [PATCH] Slaves heartbeats during sync improved. The previous fix for false positive timeout detected by master was not complete. There is another blocking stage while loading data for the first synchronization with the master, that is, flushing away the current data from the DB memory. This commit uses the newly introduced dict.c callback in order to make some incremental work (to send "\n" heartbeats to the master) while flushing the old data from memory. It is hard to write a regression test for this issue unfortunately. More support for debugging in the Redis core would be needed in terms of functionalities to simulate a slow DB loading / deletion. --- src/rdb.c | 16 ++-------------- src/redis.h | 1 + src/replication.c | 27 ++++++++++++++++++++++++++- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 0fcf426f..60dd7113 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1065,20 +1065,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.loading_process_events_interval_bytes && (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) { - if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) { - static time_t newline_sent; - /* Avoid the master to detect the slave is timing out while - * loading the RDB file in initial synchronization. We send - * a single newline character that is valid protocol but is - * guaranteed to either be sent entierly or not, since the byte - * is indivisible. */ - if (time(NULL) != newline_sent) { - newline_sent = time(NULL); - if (write(server.repl_transfer_s,"\n",1) == -1) { - /* Pinging back in this stage is best-effort. */ - } - } - } + if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) + replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } diff --git a/src/redis.h b/src/redis.h index 16403a8b..42f531ea 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1070,6 +1070,7 @@ int replicationScriptCacheExists(sds sha1); void processClientsWaitingReplicas(void); void unblockClientWaitingReplicas(redisClient *c); int replicationCountAcksByOffset(long long offset); +void replicationSendNewlineToMaster(void); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/replication.c b/src/replication.c index 5044ca33..a6c5d0d4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -715,6 +715,31 @@ void replicationAbortSyncTransfer(void) { server.repl_state = REDIS_REPL_CONNECT; } +/* Avoid the master to detect the slave is timing out while loading the + * RDB file in initial synchronization. We send a single newline character + * that is valid protocol but is guaranteed to either be sent entierly or + * not, since the byte is indivisible. + * + * The function is called in two contexts: while we flush the current + * data with emptyDb(), and while we load the new data received as an + * RDB file from the master. */ +void replicationSendNewlineToMaster(void) { + static time_t newline_sent; + if (time(NULL) != newline_sent) { + newline_sent = time(NULL); + if (write(server.repl_transfer_s,"\n",1) == -1) { + /* Pinging back in this stage is best-effort. */ + } + } +} + +/* Callback used by emptyDb() while flushing away old data to load + * the new dataset received by the master. */ +void replicationEmptyDbCallback(void *privdata) { + REDIS_NOTUSED(privdata); + replicationSendNewlineToMaster(); +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -796,7 +821,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); signalFlushedDb(-1); - emptyDb(NULL); + emptyDb(replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to