Diskless replication: handle putting the slave online.

This commit is contained in:
antirez 2014-10-15 15:31:19 +02:00
parent 7a1e0d9898
commit 3730d118a3

View File

@ -611,6 +611,29 @@ void replconfCommand(redisClient *c) {
addReply(c,shared.ok); addReply(c,shared.ok);
} }
/* This function puts a slave in the online state, and should be called just
* after a slave received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
*
* 1) Put the slave in ONLINE state.
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the slave.
* 3) Update the count of good slaves. */
void putSlaveOnline(redisClient *slave) {
slave->replstate = REDIS_REPL_ONLINE;
slave->repl_ack_time = server.unixtime;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
refreshGoodSlavesCount();
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata; redisClient *slave = privdata;
REDIS_NOTUSED(el); REDIS_NOTUSED(el);
@ -661,16 +684,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
close(slave->repldbfd); close(slave->repldbfd);
slave->repldbfd = -1; slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
slave->replstate = REDIS_REPL_ONLINE; putSlaveOnline(slave);
slave->repl_ack_time = server.unixtime; redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)");
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
refreshGoodSlavesCount();
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
} }
} }
@ -700,27 +715,38 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf; struct redis_stat buf;
if (bgsaveerr != REDIS_OK) { /* If this was an RDB on disk save, we have to prepare to send
freeClient(slave); * the RDB from disk to the slave socket. Otherwise if this was
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); * already an RDB -> Slaves socket transfer, used in the case of
continue; * diskless replication, our work is trivial, we can just put
} * the slave online. */
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
redis_fstat(slave->repldbfd,&buf) == -1) { putSlaveOnline(slave);
freeClient(slave); redisLog(REDIS_NOTICE,
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); "Synchronization with slave succeeded (socket)");
continue; } else {
} if (bgsaveerr != REDIS_OK) {
slave->repldboff = 0; freeClient(slave);
slave->repldbsize = buf.st_size; redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
slave->replstate = REDIS_REPL_SEND_BULK; continue;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", }
(unsigned long long) slave->repldbsize); if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave); freeClient(slave);
continue; continue;
}
} }
} }
} }