diff --git a/src/networking.c b/src/networking.c index cc9bbd98..f10a1c5e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -100,6 +100,7 @@ redisClient *createClient(int fd) { c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REDIS_REPL_NONE; + c->repl_put_online_on_ack = 0; c->reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; diff --git a/src/redis.h b/src/redis.h index f5301ab2..855ae574 100644 --- a/src/redis.h +++ b/src/redis.h @@ -532,6 +532,7 @@ typedef struct redisClient { int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int authenticated; /* when requirepass is non-NULL */ int replstate; /* replication state if this is a slave */ + int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repldbfd; /* replication DB file descriptor */ off_t repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ diff --git a/src/replication.c b/src/replication.c index 8e97a330..c0e83326 100644 --- a/src/replication.c +++ b/src/replication.c @@ -40,6 +40,7 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); void replicationSendAck(void); +void putSlaveOnline(redisClient *slave); /* --------------------------- Utility functions ---------------------------- */ @@ -398,6 +399,7 @@ int masterTryPartialResynchronization(redisClient *c) { c->flags |= REDIS_SLAVE; c->replstate = REDIS_REPL_ONLINE; c->repl_ack_time = server.unixtime; + c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -623,6 +625,11 @@ void replconfCommand(redisClient *c) { if (offset > c->repl_ack_off) c->repl_ack_off = offset; c->repl_ack_time = server.unixtime; + /* If this was a diskless replication, we need to really put + * the slave online when the first ACK is received (which + * confirms slave is online and ready to get more data). */ + if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE) + putSlaveOnline(c); /* Note: this command does not reply anything! */ return; } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { @@ -652,6 +659,7 @@ void replconfCommand(redisClient *c) { * 3) Update the count of good slaves. */ void putSlaveOnline(redisClient *slave) { slave->replstate = REDIS_REPL_ONLINE; + slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { @@ -660,6 +668,8 @@ void putSlaveOnline(redisClient *slave) { return; } refreshGoodSlavesCount(); + redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded", + replicationGetSlaveName(slave)); } void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -713,7 +723,6 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); putSlaveOnline(slave); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)"); } } @@ -752,10 +761,16 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { * diskless replication, our work is trivial, we can just put * the slave online. */ if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { - putSlaveOnline(slave); redisLog(REDIS_NOTICE, - "Synchronization with slave %s succeeded (socket)", + "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", replicationGetSlaveName(slave)); + /* Note: we wait for a REPLCONF ACK message from slave in + * order to really put it online (install the write handler + * so that the accumulated data can be transfered). However + * we change the replication state ASAP, since our slave + * is technically online now. */ + slave->replstate = REDIS_REPL_ONLINE; + slave->repl_put_online_on_ack = 1; } else { if (bgsaveerr != REDIS_OK) { freeClient(slave); @@ -929,7 +944,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { int eof_reached = 0; if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter. */ + /* Update the last bytes array, and check if it matches our delimiter.*/ if (nread >= REDIS_RUN_ID_SIZE) { memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); } else {