diff --git a/src/rdb.c b/src/rdb.c index 4c5fb21e..32420656 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1572,6 +1572,7 @@ int rdbSaveToSlavesSockets(void) { clientids[numfds] = slave->id; fds[numfds++] = slave->fd; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + replicationSendFullresyncReply(slave,getPsyncInitialOffset()); /* Put the socket in non-blocking mode to simplify RDB transfer. * We'll restore it when the children returns (since duped socket * will share the O_NONBLOCK attribute with the parent). */ diff --git a/src/replication.c b/src/replication.c index 20234279..72f29a07 100644 --- a/src/replication.c +++ b/src/replication.c @@ -349,6 +349,41 @@ long long addReplyReplicationBacklog(client *c, long long offset) { return server.repl_backlog_histlen - skip; } +/* Return the offset to provide as reply to the PSYNC command received + * from the slave. The returned value is only valid immediately after + * the BGSAVE process started and before executing any other command + * from clients. */ +long long getPsyncInitialOffset(void) { + long long psync_offset = server.master_repl_offset; + /* Add 1 to psync_offset if it the replication backlog does not exists + * as when it will be created later we'll increment the offset by one. */ + if (server.repl_backlog == NULL) psync_offset++; + return psync_offset; +} + +/* Send a PSYNC reply in the specific case of a full resynchronization. + * As a side effect, set into the slave client structure the offset + * we sent here, so that if new slaves will later attach to the same + * background RDB saving process (by duplicating this client output + * buffer), we can get the right offset from this slave. */ +int replicationSendFullresyncReply(client *slave, long long offset) { + char buf[128]; + int buflen; + + slave->psync_initial_offset = offset; + /* Don't send this reply to slaves that approached us with + * the old SYNC command. */ + if (!(slave->flags & CLIENT_PRE_PSYNC)) { + buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", + server.runid,offset); + if (write(slave->fd,buf,buflen) != buflen) { + freeClientAsync(slave); + return C_ERR; + } + } + return C_OK; +} + /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * @@ -422,18 +457,10 @@ int masterTryPartialResynchronization(client *c) { return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: - /* We need a full resync for some reason... notify the client. */ - psync_offset = server.master_repl_offset; - /* Add 1 to psync_offset if it the replication backlog does not exists - * as when it will be created later we'll increment the offset by one. */ - if (server.repl_backlog == NULL) psync_offset++; - /* Again, we can't use the connection buffers (see above). */ - buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", - server.runid,psync_offset); - if (write(c->fd,buf,buflen) != buflen) { - freeClientAsync(c); - return C_OK; - } + /* We need a full resync for some reason... Note that we can't + * reply to PSYNC right now if a full SYNC is needed. The reply + * must include the master offset at the time the RDB file we transfer + * is generated, so we need to delay the reply to that moment. */ return C_ERR; } @@ -537,6 +564,7 @@ void syncCommand(client *c) { * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave); c->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + replicationSendFullresyncReply(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to @@ -568,6 +596,7 @@ void syncCommand(client *c) { return; } c->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + replicationSendFullresyncReply(c,getPsyncInitialOffset()); } } @@ -755,6 +784,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { startbgsave = 1; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + replicationSendFullresyncReply(slave,getPsyncInitialOffset()); } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { struct redis_stat buf; @@ -2117,8 +2147,11 @@ void replicationCron(void) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + replicationSendFullresyncReply(slave, + getPsyncInitialOffset()); + } } } } diff --git a/src/server.h b/src/server.h index c3bea54e..bbd0014e 100644 --- a/src/server.h +++ b/src/server.h @@ -564,6 +564,9 @@ typedef struct client { long long reploff; /* replication offset if this is our master */ long long repl_ack_off; /* replication ack offset, if this is a slave */ long long repl_ack_time;/* replication ack time, if this is a slave */ + long long psync_initial_offset; /* FULLRESYNC reply offset other slaves + copying this slave output buffer + should use. */ char replrunid[CONFIG_RUN_ID_SIZE+1]; /* master run id if this is a master */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ multiState mstate; /* MULTI/EXEC state */ @@ -1198,6 +1201,8 @@ int replicationCountAcksByOffset(long long offset); void replicationSendNewlineToMaster(void); long long replicationGetSlaveOffset(void); char *replicationGetSlaveName(client *c); +long long getPsyncInitialOffset(void); +int replicationSendFullresyncReply(client *slave, long long offset); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/syncio.c b/src/syncio.c index b2843d5f..48e0a0b7 100644 --- a/src/syncio.c +++ b/src/syncio.c @@ -118,7 +118,9 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) { } /* Read a line making sure that every char will not require more than 'timeout' - * milliseconds to be read. + * milliseconds to be read. Empty newlines before the first non-empty line + * are ignored. This is useful because since Redis sometimes uses empty + * newlines in order to take the connection "alive". * * On success the number of bytes read is returned, otherwise -1. * On success the string is always correctly terminated with a 0 byte. */ @@ -131,9 +133,12 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) { if (syncRead(fd,&c,1,timeout) == -1) return -1; if (c == '\n') { - *ptr = '\0'; - if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0'; - return nread; + /* Ignore empty lines, otherwise return to the caller. */ + if (nread != 0) { + *ptr = '\0'; + if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0'; + return nread; + } } else { *ptr++ = c; *ptr = '\0';