mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
This special command is used by the slave to inform the master the amount of replication stream it currently consumed. it does not return anything so that we not need to consume additional bandwidth needed by the master to reply something. The master can do a number of things knowing the amount of stream processed, such as understanding the "lag" in bytes of the slave, verify if a given command was already processed by the slave, and so forth.
1474 lines
57 KiB
C
1474 lines
57 KiB
C
/* Asynchronous replication implementation.
|
|
*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
|
|
#include "redis.h"
|
|
|
|
#include <sys/time.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/stat.h>
|
|
|
|
void replicationDiscardCachedMaster(void);
|
|
void replicationResurrectCachedMaster(int newfd);
|
|
|
|
/* ---------------------------------- MASTER -------------------------------- */
|
|
|
|
void createReplicationBacklog(void) {
|
|
redisAssert(server.repl_backlog == NULL);
|
|
server.repl_backlog = zmalloc(server.repl_backlog_size);
|
|
server.repl_backlog_histlen = 0;
|
|
server.repl_backlog_idx = 0;
|
|
/* When a new backlog buffer is created, we increment the replication
|
|
* offset by one to make sure we'll not be able to PSYNC with any
|
|
* previous slave. This is needed because we avoid incrementing the
|
|
* master_repl_offset if no backlog exists nor slaves are attached. */
|
|
server.master_repl_offset++;
|
|
|
|
/* We don't have any data inside our buffer, but virtually the first
|
|
* byte we have is the next byte that will be generated for the
|
|
* replication stream. */
|
|
server.repl_backlog_off = server.master_repl_offset+1;
|
|
}
|
|
|
|
/* This function is called when the user modifies the replication backlog
|
|
* size at runtime. It is up to the function to both update the
|
|
* server.repl_backlog_size and to resize the buffer and setup it so that
|
|
* it contains the same data as the previous one (possibly less data, but
|
|
* the most recent bytes, or the same data and more free space in case the
|
|
* buffer is enlarged). */
|
|
void resizeReplicationBacklog(long long newsize) {
|
|
if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
|
|
newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
|
|
if (server.repl_backlog_size == newsize) return;
|
|
|
|
server.repl_backlog_size = newsize;
|
|
if (server.repl_backlog != NULL) {
|
|
/* What we actually do is to flush the old buffer and realloc a new
|
|
* empty one. It will refill with new data incrementally.
|
|
* The reason is that copying a few gigabytes adds latency and even
|
|
* worse often we need to alloc additional space before freeing the
|
|
* old buffer. */
|
|
zfree(server.repl_backlog);
|
|
server.repl_backlog = zmalloc(server.repl_backlog_size);
|
|
server.repl_backlog_histlen = 0;
|
|
server.repl_backlog_idx = 0;
|
|
/* Next byte we have is... the next since the buffer is emtpy. */
|
|
server.repl_backlog_off = server.master_repl_offset+1;
|
|
}
|
|
}
|
|
|
|
void freeReplicationBacklog(void) {
|
|
redisAssert(listLength(server.slaves) == 0);
|
|
zfree(server.repl_backlog);
|
|
server.repl_backlog = NULL;
|
|
}
|
|
|
|
/* Add data to the replication backlog.
|
|
* This function also increments the global replication offset stored at
|
|
* server.master_repl_offset, because there is no case where we want to feed
|
|
* the backlog without incrementing the buffer. */
|
|
void feedReplicationBacklog(void *ptr, size_t len) {
|
|
unsigned char *p = ptr;
|
|
|
|
server.master_repl_offset += len;
|
|
|
|
/* This is a circular buffer, so write as much data we can at every
|
|
* iteration and rewind the "idx" index if we reach the limit. */
|
|
while(len) {
|
|
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
|
|
if (thislen > len) thislen = len;
|
|
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
|
|
server.repl_backlog_idx += thislen;
|
|
if (server.repl_backlog_idx == server.repl_backlog_size)
|
|
server.repl_backlog_idx = 0;
|
|
len -= thislen;
|
|
p += thislen;
|
|
server.repl_backlog_histlen += thislen;
|
|
}
|
|
if (server.repl_backlog_histlen > server.repl_backlog_size)
|
|
server.repl_backlog_histlen = server.repl_backlog_size;
|
|
/* Set the offset of the first byte we have in the backlog. */
|
|
server.repl_backlog_off = server.master_repl_offset -
|
|
server.repl_backlog_histlen + 1;
|
|
}
|
|
|
|
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
|
|
* as input. */
|
|
void feedReplicationBacklogWithObject(robj *o) {
|
|
char llstr[REDIS_LONGSTR_SIZE];
|
|
void *p;
|
|
size_t len;
|
|
|
|
if (o->encoding == REDIS_ENCODING_INT) {
|
|
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
|
|
p = llstr;
|
|
} else {
|
|
len = sdslen(o->ptr);
|
|
p = o->ptr;
|
|
}
|
|
feedReplicationBacklog(p,len);
|
|
}
|
|
|
|
#define FEEDSLAVE_BUF_SIZE (1024*64)
|
|
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|
listNode *ln;
|
|
listIter li;
|
|
int j, i, len;
|
|
char buf[FEEDSLAVE_BUF_SIZE], *b = buf;
|
|
char llstr[REDIS_LONGSTR_SIZE];
|
|
int buf_left = FEEDSLAVE_BUF_SIZE;
|
|
robj *o;
|
|
|
|
/* If there aren't slaves, and there is no backlog buffer to populate,
|
|
* we can return ASAP. */
|
|
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
|
|
|
|
/* We can't have slaves attached and no backlog. */
|
|
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
|
|
|
|
/* What we do here is to try to write as much data as possible in a static
|
|
* buffer "buf" that is used to create an object that is later sent to all
|
|
* the slaves. This way we do the decoding only one time for most commands
|
|
* not containing big payloads. */
|
|
|
|
/* Create the SELECT command into the static buffer if needed. */
|
|
if (server.slaveseldb != dictid) {
|
|
char *selectcmd;
|
|
size_t sclen;
|
|
|
|
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
|
|
selectcmd = shared.select[dictid]->ptr;
|
|
sclen = sdslen(selectcmd);
|
|
memcpy(b,selectcmd,sclen);
|
|
b += sclen;
|
|
buf_left -= sclen;
|
|
} else {
|
|
int dictid_len;
|
|
|
|
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
|
|
sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
|
|
dictid_len, llstr);
|
|
b += sclen;
|
|
buf_left -= sclen;
|
|
}
|
|
}
|
|
server.slaveseldb = dictid;
|
|
|
|
/* Add the multi bulk reply size to the static buffer, that is, the number
|
|
* of arguments of the command to send to every slave. */
|
|
b[0] = '*';
|
|
len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc);
|
|
b += len+1;
|
|
buf_left -= len;
|
|
b[0] = '\r';
|
|
b[1] = '\n';
|
|
b += 2;
|
|
buf_left -= 2;
|
|
|
|
/* Try to use the static buffer for as much arguments is possible. */
|
|
for (j = 0; j < argc; j++) {
|
|
int objlen;
|
|
char *objptr;
|
|
|
|
if (argv[j]->encoding != REDIS_ENCODING_RAW &&
|
|
argv[j]->encoding != REDIS_ENCODING_INT) {
|
|
redisPanic("Unexpected encoding");
|
|
}
|
|
if (argv[j]->encoding == REDIS_ENCODING_RAW) {
|
|
objlen = sdslen(argv[j]->ptr);
|
|
objptr = argv[j]->ptr;
|
|
} else {
|
|
objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr);
|
|
objptr = llstr;
|
|
}
|
|
/* We need enough space for bulk reply encoding, newlines, and
|
|
* the data itself. */
|
|
if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break;
|
|
|
|
/* Write $...CRLF */
|
|
b[0] = '$';
|
|
len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen);
|
|
b += len+1;
|
|
buf_left -= len;
|
|
b[0] = '\r';
|
|
b[1] = '\n';
|
|
b += 2;
|
|
buf_left -= 2;
|
|
|
|
/* And data plus CRLF */
|
|
memcpy(b,objptr,objlen);
|
|
b += objlen;
|
|
buf_left -= objlen;
|
|
b[0] = '\r';
|
|
b[1] = '\n';
|
|
b += 2;
|
|
buf_left -= 2;
|
|
}
|
|
|
|
/* Create an object with the static buffer content. */
|
|
redisAssert(buf_left < FEEDSLAVE_BUF_SIZE);
|
|
o = createStringObject(buf,b-buf);
|
|
|
|
/* If we have a backlog, populate it with data and increment
|
|
* the global replication offset. */
|
|
if (server.repl_backlog) {
|
|
feedReplicationBacklogWithObject(o);
|
|
for (i = j; i < argc; i++) {
|
|
char aux[REDIS_LONGSTR_SIZE+3];
|
|
long objlen = stringObjectLen(argv[i]);
|
|
|
|
/* We need to feed the buffer with the object as a bulk reply
|
|
* not just as a plain string, so create the $..CRLF payload len
|
|
* ad add the final CRLF */
|
|
aux[0] = '$';
|
|
len = ll2string(aux+1,objlen,sizeof(aux)-1);
|
|
aux[len+1] = '\r';
|
|
aux[len+2] = '\n';
|
|
feedReplicationBacklog(aux,len+3);
|
|
feedReplicationBacklogWithObject(argv[j]);
|
|
feedReplicationBacklogWithObject(shared.crlf);
|
|
}
|
|
}
|
|
|
|
/* Write data to slaves. Here we do two things:
|
|
* 1) We write the "o" object that was created using the accumulated
|
|
* static buffer.
|
|
* 2) We write any additional argument of the command to replicate that
|
|
* was not written inside the static buffer for lack of space.
|
|
*/
|
|
listRewind(slaves,&li);
|
|
while((ln = listNext(&li))) {
|
|
redisClient *slave = ln->value;
|
|
|
|
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
|
|
|
|
/* Feed slaves that are waiting for the initial SYNC (so these commands
|
|
* are queued in the output buffer until the initial SYNC completes),
|
|
* or are already in sync with the master. */
|
|
|
|
/* First, trasmit the object created from the static buffer. */
|
|
addReply(slave,o);
|
|
|
|
/* Finally any additional argument that was not stored inside the
|
|
* static buffer if any (from j to argc). */
|
|
for (i = j; i < argc; i++)
|
|
addReplyBulk(slave,argv[i]);
|
|
}
|
|
decrRefCount(o);
|
|
}
|
|
|
|
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
|
|
listNode *ln;
|
|
listIter li;
|
|
int j, port;
|
|
sds cmdrepr = sdsnew("+");
|
|
robj *cmdobj;
|
|
char ip[32];
|
|
struct timeval tv;
|
|
|
|
gettimeofday(&tv,NULL);
|
|
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
|
|
if (c->flags & REDIS_LUA_CLIENT) {
|
|
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
|
|
} else if (c->flags & REDIS_UNIX_SOCKET) {
|
|
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
|
|
} else {
|
|
anetPeerToString(c->fd,ip,&port);
|
|
cmdrepr = sdscatprintf(cmdrepr,"[%d %s:%d] ",dictid,ip,port);
|
|
}
|
|
|
|
for (j = 0; j < argc; j++) {
|
|
if (argv[j]->encoding == REDIS_ENCODING_INT) {
|
|
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
|
|
} else {
|
|
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
|
|
sdslen(argv[j]->ptr));
|
|
}
|
|
if (j != argc-1)
|
|
cmdrepr = sdscatlen(cmdrepr," ",1);
|
|
}
|
|
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
|
|
cmdobj = createObject(REDIS_STRING,cmdrepr);
|
|
|
|
listRewind(monitors,&li);
|
|
while((ln = listNext(&li))) {
|
|
redisClient *monitor = ln->value;
|
|
addReply(monitor,cmdobj);
|
|
}
|
|
decrRefCount(cmdobj);
|
|
}
|
|
|
|
/* Feed the slave 'c' with the replication backlog starting from the
|
|
* specified 'offset' up to the end of the backlog. */
|
|
long long addReplyReplicationBacklog(redisClient *c, long long offset) {
|
|
long long j, skip, len;
|
|
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
|
|
|
|
if (server.repl_backlog_histlen == 0) {
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
|
|
return 0;
|
|
}
|
|
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
|
|
server.repl_backlog_size);
|
|
redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
|
|
server.repl_backlog_off);
|
|
redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
|
|
server.repl_backlog_histlen);
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
|
|
server.repl_backlog_idx);
|
|
|
|
/* Compute the amount of bytes we need to discard. */
|
|
skip = offset - server.repl_backlog_off;
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
|
|
|
|
/* Point j to the oldest byte, that is actaully our
|
|
* server.repl_backlog_off byte. */
|
|
j = (server.repl_backlog_idx +
|
|
(server.repl_backlog_size-server.repl_backlog_histlen)) %
|
|
server.repl_backlog_size;
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
|
|
|
|
/* Discard the amount of data to seek to the specified 'offset'. */
|
|
j = (j + skip) % server.repl_backlog_size;
|
|
|
|
/* Feed slave with data. Since it is a circular buffer we have to
|
|
* split the reply in two parts if we are cross-boundary. */
|
|
len = server.repl_backlog_histlen - skip;
|
|
redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
|
while(len) {
|
|
long long thislen =
|
|
((server.repl_backlog_size - j) < len) ?
|
|
(server.repl_backlog_size - j) : len;
|
|
|
|
redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
|
|
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
|
|
len -= thislen;
|
|
j = 0;
|
|
}
|
|
return server.repl_backlog_histlen - skip;
|
|
}
|
|
|
|
/* This function handles the PSYNC command from the point of view of a
|
|
* master receiving a request for partial resynchronization.
|
|
*
|
|
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
|
|
* with the usual full resync. */
|
|
int masterTryPartialResynchronization(redisClient *c) {
|
|
long long psync_offset, psync_len;
|
|
char *master_runid = c->argv[1]->ptr;
|
|
char buf[128];
|
|
int buflen;
|
|
|
|
/* Is the runid of this master the same advertised by the wannabe slave
|
|
* via PSYNC? If runid changed this master is a different instance and
|
|
* there is no way to continue. */
|
|
if (strcasecmp(master_runid, server.runid)) {
|
|
/* Run id "?" is used by slaves that want to force a full resync. */
|
|
if (master_runid[0] != '?') {
|
|
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
|
|
"Runid mismatch (Client asked for '%s', I'm '%s')",
|
|
master_runid, server.runid);
|
|
} else {
|
|
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
|
|
}
|
|
goto need_full_resync;
|
|
}
|
|
|
|
/* We still have the data our slave is asking for? */
|
|
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
|
|
REDIS_OK) goto need_full_resync;
|
|
if (!server.repl_backlog ||
|
|
psync_offset < server.repl_backlog_off ||
|
|
psync_offset >= (server.repl_backlog_off + server.repl_backlog_size))
|
|
{
|
|
redisLog(REDIS_NOTICE,
|
|
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
|
|
goto need_full_resync;
|
|
}
|
|
|
|
/* If we reached this point, we are able to perform a partial resync:
|
|
* 1) Set client state to make it a slave.
|
|
* 2) Inform the client we can continue with +CONTINUE
|
|
* 3) Send the backlog data (from the offset to the end) to the slave. */
|
|
c->flags |= REDIS_SLAVE;
|
|
c->replstate = REDIS_REPL_ONLINE;
|
|
listAddNodeTail(server.slaves,c);
|
|
/* We can't use the connection buffers since they are used to accumulate
|
|
* new commands at this stage. But we are sure the socket send buffer is
|
|
* emtpy so this write will never fail actually. */
|
|
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
|
|
if (write(c->fd,buf,buflen) != buflen) {
|
|
freeClientAsync(c);
|
|
return REDIS_OK;
|
|
}
|
|
psync_len = addReplyReplicationBacklog(c,psync_offset);
|
|
redisLog(REDIS_NOTICE,
|
|
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
|
|
/* Note that we don't need to set the selected DB at server.slaveseldb
|
|
* to -1 to force the master to emit SELECT, since the slave already
|
|
* has this state from the previous connection with the master. */
|
|
|
|
return REDIS_OK; /* The caller can return, no full resync needed. */
|
|
|
|
need_full_resync:
|
|
/* We need a full resync for some reason... notify the client. */
|
|
psync_offset = server.master_repl_offset;
|
|
/* Add 1 to psync_offset if it the replication backlog does not exists
|
|
* as when it will be created later we'll increment the offset by one. */
|
|
if (server.repl_backlog == NULL) psync_offset++;
|
|
/* Again, we can't use the connection buffers (see above). */
|
|
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
|
|
server.runid,psync_offset);
|
|
if (write(c->fd,buf,buflen) != buflen) {
|
|
freeClientAsync(c);
|
|
return REDIS_OK;
|
|
}
|
|
return REDIS_ERR;
|
|
}
|
|
|
|
/* SYNC ad PSYNC command implemenation. */
|
|
void syncCommand(redisClient *c) {
|
|
/* ignore SYNC if already slave or in monitor mode */
|
|
if (c->flags & REDIS_SLAVE) return;
|
|
|
|
/* Refuse SYNC requests if we are a slave but the link with our master
|
|
* is not ok... */
|
|
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
|
|
addReplyError(c,"Can't SYNC while not connected with my master");
|
|
return;
|
|
}
|
|
|
|
/* SYNC can't be issued when the server has pending data to send to
|
|
* the client about already issued commands. We need a fresh reply
|
|
* buffer registering the differences between the BGSAVE and the current
|
|
* dataset, so that we can copy to other slaves if needed. */
|
|
if (listLength(c->reply) != 0 || c->bufpos != 0) {
|
|
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
|
|
return;
|
|
}
|
|
|
|
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
|
|
|
|
/* Try a partial resynchronization if this is a PSYNC command.
|
|
* If it fails, we continue with usual full resynchronization, however
|
|
* when this happens masterTryPartialResynchronization() already
|
|
* replied with:
|
|
*
|
|
* +FULLRESYNC <runid> <offset>
|
|
*
|
|
* So the slave knows the new runid and offset to try a PSYNC later
|
|
* if the connection with the master is lost. */
|
|
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
|
|
if (masterTryPartialResynchronization(c) == REDIS_OK) {
|
|
server.stat_sync_partial_ok++;
|
|
return; /* No full resync needed, return. */
|
|
} else {
|
|
char *master_runid = c->argv[1]->ptr;
|
|
|
|
/* Increment stats for failed PSYNCs, but only if the
|
|
* runid is not "?", as this is used by slaves to force a full
|
|
* resync on purpose when they are not albe to partially
|
|
* resync. */
|
|
if (master_runid[0] != '?') server.stat_sync_partial_err++;
|
|
}
|
|
}
|
|
|
|
/* Full resynchronization. */
|
|
server.stat_sync_full++;
|
|
|
|
/* Here we need to check if there is a background saving operation
|
|
* in progress, or if it is required to start one */
|
|
if (server.rdb_child_pid != -1) {
|
|
/* Ok a background save is in progress. Let's check if it is a good
|
|
* one for replication, i.e. if there is another slave that is
|
|
* registering differences since the server forked to save */
|
|
redisClient *slave;
|
|
listNode *ln;
|
|
listIter li;
|
|
|
|
listRewind(server.slaves,&li);
|
|
while((ln = listNext(&li))) {
|
|
slave = ln->value;
|
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
|
|
}
|
|
if (ln) {
|
|
/* Perfect, the server is already registering differences for
|
|
* another slave. Set the right state, and copy the buffer. */
|
|
copyClientOutputBuffer(c,slave);
|
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
|
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
|
} else {
|
|
/* No way, we need to wait for the next BGSAVE in order to
|
|
* register differences */
|
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
|
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
|
|
}
|
|
} else {
|
|
/* Ok we don't have a BGSAVE in progress, let's start one */
|
|
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
|
|
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
|
|
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
|
|
addReplyError(c,"Unable to perform background save");
|
|
return;
|
|
}
|
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
|
}
|
|
|
|
if (server.repl_disable_tcp_nodelay)
|
|
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
|
|
c->repldbfd = -1;
|
|
c->flags |= REDIS_SLAVE;
|
|
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
|
|
listAddNodeTail(server.slaves,c);
|
|
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
|
|
createReplicationBacklog();
|
|
return;
|
|
}
|
|
|
|
/* REPLCONF <option> <value> <option> <value> ...
|
|
* This command is used by a slave in order to configure the replication
|
|
* process before starting it with the SYNC command.
|
|
*
|
|
* Currently the only use of this command is to communicate to the master
|
|
* what is the listening port of the Slave redis instance, so that the
|
|
* master can accurately list slaves and their listening ports in
|
|
* the INFO output.
|
|
*
|
|
* In the future the same command can be used in order to configure
|
|
* the replication to initiate an incremental replication instead of a
|
|
* full resync. */
|
|
void replconfCommand(redisClient *c) {
|
|
int j;
|
|
|
|
if ((c->argc % 2) == 0) {
|
|
/* Number of arguments must be odd to make sure that every
|
|
* option has a corresponding value. */
|
|
addReply(c,shared.syntaxerr);
|
|
return;
|
|
}
|
|
|
|
/* Process every option-value pair. */
|
|
for (j = 1; j < c->argc; j+=2) {
|
|
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
|
|
long port;
|
|
|
|
if ((getLongFromObjectOrReply(c,c->argv[j+1],
|
|
&port,NULL) != REDIS_OK))
|
|
return;
|
|
c->slave_listening_port = port;
|
|
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
|
|
/* REPLCONF ACK is used by slave to inform the master the amount
|
|
* of replication stream that it processed so far. It is an
|
|
* internal only command that normal clients should never use. */
|
|
long long offset;
|
|
|
|
if (!(c->flags & REDIS_SLAVE)) return;
|
|
if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
|
|
return;
|
|
if (offset > c->repl_ack_off)
|
|
c->repl_ack_off = offset;
|
|
c->repl_ack_time = server.unixtime;
|
|
/* Note: this command does not reply anything! */
|
|
} else {
|
|
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
|
(char*)c->argv[j]->ptr);
|
|
return;
|
|
}
|
|
}
|
|
addReply(c,shared.ok);
|
|
}
|
|
|
|
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
redisClient *slave = privdata;
|
|
REDIS_NOTUSED(el);
|
|
REDIS_NOTUSED(mask);
|
|
char buf[REDIS_IOBUF_LEN];
|
|
ssize_t nwritten, buflen;
|
|
|
|
if (slave->repldboff == 0) {
|
|
/* Write the bulk write count before to transfer the DB. In theory here
|
|
* we don't know how much room there is in the output buffer of the
|
|
* socket, but in practice SO_SNDLOWAT (the minimum count for output
|
|
* operations) will never be smaller than the few bytes we need. */
|
|
sds bulkcount;
|
|
|
|
bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
|
|
slave->repldbsize);
|
|
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
|
|
{
|
|
sdsfree(bulkcount);
|
|
freeClient(slave);
|
|
return;
|
|
}
|
|
sdsfree(bulkcount);
|
|
}
|
|
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
|
|
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
|
|
if (buflen <= 0) {
|
|
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
|
|
(buflen == 0) ? "premature EOF" : strerror(errno));
|
|
freeClient(slave);
|
|
return;
|
|
}
|
|
if ((nwritten = write(fd,buf,buflen)) == -1) {
|
|
redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
|
|
strerror(errno));
|
|
freeClient(slave);
|
|
return;
|
|
}
|
|
slave->repldboff += nwritten;
|
|
if (slave->repldboff == slave->repldbsize) {
|
|
close(slave->repldbfd);
|
|
slave->repldbfd = -1;
|
|
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
|
slave->replstate = REDIS_REPL_ONLINE;
|
|
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
|
|
sendReplyToClient, slave) == AE_ERR) {
|
|
freeClient(slave);
|
|
return;
|
|
}
|
|
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
|
|
}
|
|
}
|
|
|
|
/* This function is called at the end of every background saving.
|
|
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
|
|
* otherwise REDIS_ERR is passed to the function.
|
|
*
|
|
* The goal of this function is to handle slaves waiting for a successful
|
|
* background saving in order to perform non-blocking synchronization. */
|
|
void updateSlavesWaitingBgsave(int bgsaveerr) {
|
|
listNode *ln;
|
|
int startbgsave = 0;
|
|
listIter li;
|
|
|
|
listRewind(server.slaves,&li);
|
|
while((ln = listNext(&li))) {
|
|
redisClient *slave = ln->value;
|
|
|
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
|
|
startbgsave = 1;
|
|
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
|
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
|
struct redis_stat buf;
|
|
|
|
if (bgsaveerr != REDIS_OK) {
|
|
freeClient(slave);
|
|
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
|
|
continue;
|
|
}
|
|
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
|
|
redis_fstat(slave->repldbfd,&buf) == -1) {
|
|
freeClient(slave);
|
|
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
|
|
continue;
|
|
}
|
|
slave->repldboff = 0;
|
|
slave->repldbsize = buf.st_size;
|
|
slave->replstate = REDIS_REPL_SEND_BULK;
|
|
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
|
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
|
|
freeClient(slave);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
if (startbgsave) {
|
|
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
|
|
listIter li;
|
|
|
|
listRewind(server.slaves,&li);
|
|
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
|
|
while((ln = listNext(&li))) {
|
|
redisClient *slave = ln->value;
|
|
|
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
|
|
freeClient(slave);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* ----------------------------------- SLAVE -------------------------------- */
|
|
|
|
/* Abort the async download of the bulk dataset while SYNC-ing with master */
|
|
void replicationAbortSyncTransfer(void) {
|
|
redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
|
|
|
|
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
|
|
close(server.repl_transfer_s);
|
|
close(server.repl_transfer_fd);
|
|
unlink(server.repl_transfer_tmpfile);
|
|
zfree(server.repl_transfer_tmpfile);
|
|
server.repl_state = REDIS_REPL_CONNECT;
|
|
}
|
|
|
|
/* Asynchronously read the SYNC payload we receive from a master */
|
|
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
|
|
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
char buf[4096];
|
|
ssize_t nread, readlen;
|
|
off_t left;
|
|
REDIS_NOTUSED(el);
|
|
REDIS_NOTUSED(privdata);
|
|
REDIS_NOTUSED(mask);
|
|
|
|
/* If repl_transfer_size == -1 we still have to read the bulk length
|
|
* from the master reply. */
|
|
if (server.repl_transfer_size == -1) {
|
|
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
|
|
redisLog(REDIS_WARNING,
|
|
"I/O error reading bulk count from MASTER: %s",
|
|
strerror(errno));
|
|
goto error;
|
|
}
|
|
|
|
if (buf[0] == '-') {
|
|
redisLog(REDIS_WARNING,
|
|
"MASTER aborted replication with an error: %s",
|
|
buf+1);
|
|
goto error;
|
|
} else if (buf[0] == '\0') {
|
|
/* At this stage just a newline works as a PING in order to take
|
|
* the connection live. So we refresh our last interaction
|
|
* timestamp. */
|
|
server.repl_transfer_lastio = server.unixtime;
|
|
return;
|
|
} else if (buf[0] != '$') {
|
|
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
|
|
goto error;
|
|
}
|
|
server.repl_transfer_size = strtol(buf+1,NULL,10);
|
|
redisLog(REDIS_NOTICE,
|
|
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
|
|
(long long) server.repl_transfer_size);
|
|
return;
|
|
}
|
|
|
|
/* Read bulk data */
|
|
left = server.repl_transfer_size - server.repl_transfer_read;
|
|
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
|
|
nread = read(fd,buf,readlen);
|
|
if (nread <= 0) {
|
|
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
|
|
(nread == -1) ? strerror(errno) : "connection lost");
|
|
replicationAbortSyncTransfer();
|
|
return;
|
|
}
|
|
server.repl_transfer_lastio = server.unixtime;
|
|
if (write(server.repl_transfer_fd,buf,nread) != nread) {
|
|
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
|
goto error;
|
|
}
|
|
server.repl_transfer_read += nread;
|
|
|
|
/* Sync data on disk from time to time, otherwise at the end of the transfer
|
|
* we may suffer a big delay as the memory buffers are copied into the
|
|
* actual disk. */
|
|
if (server.repl_transfer_read >=
|
|
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
|
|
{
|
|
off_t sync_size = server.repl_transfer_read -
|
|
server.repl_transfer_last_fsync_off;
|
|
rdb_fsync_range(server.repl_transfer_fd,
|
|
server.repl_transfer_last_fsync_off, sync_size);
|
|
server.repl_transfer_last_fsync_off += sync_size;
|
|
}
|
|
|
|
/* Check if the transfer is now complete */
|
|
if (server.repl_transfer_read == server.repl_transfer_size) {
|
|
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
|
|
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
|
replicationAbortSyncTransfer();
|
|
return;
|
|
}
|
|
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
|
|
signalFlushedDb(-1);
|
|
emptyDb();
|
|
/* Before loading the DB into memory we need to delete the readable
|
|
* handler, otherwise it will get called recursively since
|
|
* rdbLoad() will call the event loop to process events from time to
|
|
* time for non blocking loading. */
|
|
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
|
|
if (rdbLoad(server.rdb_filename) != REDIS_OK) {
|
|
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
|
replicationAbortSyncTransfer();
|
|
return;
|
|
}
|
|
/* Final setup of the connected slave <- master link */
|
|
zfree(server.repl_transfer_tmpfile);
|
|
close(server.repl_transfer_fd);
|
|
server.master = createClient(server.repl_transfer_s);
|
|
server.master->flags |= REDIS_MASTER;
|
|
server.master->authenticated = 1;
|
|
server.repl_state = REDIS_REPL_CONNECTED;
|
|
server.master->reploff = server.repl_master_initial_offset;
|
|
memcpy(server.master->replrunid, server.repl_master_runid,
|
|
sizeof(server.repl_master_runid));
|
|
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
|
|
/* Restart the AOF subsystem now that we finished the sync. This
|
|
* will trigger an AOF rewrite, and when done will start appending
|
|
* to the new file. */
|
|
if (server.aof_state != REDIS_AOF_OFF) {
|
|
int retry = 10;
|
|
|
|
stopAppendOnly();
|
|
while (retry-- && startAppendOnly() == REDIS_ERR) {
|
|
redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
|
|
sleep(1);
|
|
}
|
|
if (!retry) {
|
|
redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
return;
|
|
|
|
error:
|
|
replicationAbortSyncTransfer();
|
|
return;
|
|
}
|
|
|
|
/* Send a synchronous command to the master. Used to send AUTH and
|
|
* REPLCONF commands before starting the replication with SYNC.
|
|
*
|
|
* The command returns an sds string representing the result of the
|
|
* operation. On error the first byte is a "-".
|
|
*/
|
|
char *sendSynchronousCommand(int fd, ...) {
|
|
va_list ap;
|
|
sds cmd = sdsempty();
|
|
char *arg, buf[256];
|
|
|
|
/* Create the command to send to the master, we use simple inline
|
|
* protocol for simplicity as currently we only send simple strings. */
|
|
va_start(ap,fd);
|
|
while(1) {
|
|
arg = va_arg(ap, char*);
|
|
if (arg == NULL) break;
|
|
|
|
if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
|
|
cmd = sdscat(cmd,arg);
|
|
}
|
|
cmd = sdscatlen(cmd,"\r\n",2);
|
|
|
|
/* Transfer command to the server. */
|
|
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
|
|
sdsfree(cmd);
|
|
return sdscatprintf(sdsempty(),"-Writing to master: %s",
|
|
strerror(errno));
|
|
}
|
|
sdsfree(cmd);
|
|
|
|
/* Read the reply from the server. */
|
|
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
|
|
{
|
|
return sdscatprintf(sdsempty(),"-Reading from master: %s",
|
|
strerror(errno));
|
|
}
|
|
return sdsnew(buf);
|
|
}
|
|
|
|
/* Try a partial resynchronization with the master if we are about to reconnect.
|
|
* If there is no cached master structure, at least try to issue a
|
|
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
|
|
* command in order to obtain the master run id and the master replication
|
|
* global offset.
|
|
*
|
|
* This function is designed to be called from syncWithMaster(), so the
|
|
* following assumptions are made:
|
|
*
|
|
* 1) We pass the function an already connected socket "fd".
|
|
* 2) This function does not close the file descriptor "fd". However in case
|
|
* of successful partial resynchronization, the function will reuse
|
|
* 'fd' as file descriptor of the server.master client structure.
|
|
*
|
|
* The function returns:
|
|
*
|
|
* PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
|
|
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
|
|
* In this case the master run_id and global replication
|
|
* offset is saved.
|
|
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
|
|
* the caller should fall back to SYNC.
|
|
*/
|
|
|
|
#define PSYNC_CONTINUE 0
|
|
#define PSYNC_FULLRESYNC 1
|
|
#define PSYNC_NOT_SUPPORTED 2
|
|
int slaveTryPartialResynchronization(int fd) {
|
|
char *psync_runid;
|
|
char psync_offset[32];
|
|
sds reply;
|
|
|
|
/* Initially set repl_master_initial_offset to -1 to mark the current
|
|
* master run_id and offset as not valid. Later if we'll be able to do
|
|
* a FULL resync using the PSYNC command we'll set the offset at the
|
|
* right value, so that this information will be propagated to the
|
|
* client structure representing the master into server.master. */
|
|
server.repl_master_initial_offset = -1;
|
|
|
|
if (server.cached_master) {
|
|
psync_runid = server.cached_master->replrunid;
|
|
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
|
|
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
|
|
} else {
|
|
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
|
|
psync_runid = "?";
|
|
memcpy(psync_offset,"-1",3);
|
|
}
|
|
|
|
/* Issue the PSYNC command */
|
|
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
|
|
|
|
if (!strncmp(reply,"+FULLRESYNC",11)) {
|
|
char *runid = NULL, *offset = NULL;
|
|
|
|
/* FULL RESYNC, parse the reply in order to extract the run id
|
|
* and the replication offset. */
|
|
runid = strchr(reply,' ');
|
|
if (runid) {
|
|
runid++;
|
|
offset = strchr(runid,' ');
|
|
if (offset) offset++;
|
|
}
|
|
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
|
|
redisLog(REDIS_WARNING,
|
|
"Master replied with wrong +FULLRESYNC syntax.");
|
|
/* This is an unexpected condition, actually the +FULLRESYNC
|
|
* reply means that the master supports PSYNC, but the reply
|
|
* format seems wrong. To stay safe we blank the master
|
|
* runid to make sure next PSYNCs will fail. */
|
|
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
|
|
} else {
|
|
memcpy(server.repl_master_runid, runid, offset-runid-1);
|
|
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
|
|
server.repl_master_initial_offset = strtoll(offset,NULL,10);
|
|
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
|
|
server.repl_master_runid,
|
|
server.repl_master_initial_offset);
|
|
}
|
|
/* We are going to full resync, discard the cached master structure. */
|
|
replicationDiscardCachedMaster();
|
|
sdsfree(reply);
|
|
return PSYNC_FULLRESYNC;
|
|
}
|
|
|
|
if (!strncmp(reply,"+CONTINUE",9)) {
|
|
/* Partial resync was accepted, set the replication state accordingly */
|
|
redisLog(REDIS_NOTICE,
|
|
"Successful partial resynchronization with master.");
|
|
sdsfree(reply);
|
|
replicationResurrectCachedMaster(fd);
|
|
return PSYNC_CONTINUE;
|
|
}
|
|
|
|
/* If we reach this point we receied either an error since the master does
|
|
* not understand PSYNC, or an unexpected reply from the master.
|
|
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
|
|
|
|
if (strncmp(reply,"-ERR",4)) {
|
|
/* If it's not an error, log the unexpected event. */
|
|
redisLog(REDIS_WARNING,
|
|
"Unexpected reply to PSYNC from master: %s", reply);
|
|
} else {
|
|
redisLog(REDIS_NOTICE,
|
|
"Master does not support PSYNC or is in "
|
|
"error state (reply: %s)", reply);
|
|
}
|
|
sdsfree(reply);
|
|
replicationDiscardCachedMaster();
|
|
return PSYNC_NOT_SUPPORTED;
|
|
}
|
|
|
|
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
char tmpfile[256], *err;
|
|
int dfd, maxtries = 5;
|
|
int sockerr = 0, psync_result;
|
|
socklen_t errlen = sizeof(sockerr);
|
|
REDIS_NOTUSED(el);
|
|
REDIS_NOTUSED(privdata);
|
|
REDIS_NOTUSED(mask);
|
|
|
|
/* If this event fired after the user turned the instance into a master
|
|
* with SLAVEOF NO ONE we must just return ASAP. */
|
|
if (server.repl_state == REDIS_REPL_NONE) {
|
|
close(fd);
|
|
return;
|
|
}
|
|
|
|
/* Check for errors in the socket. */
|
|
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
|
|
sockerr = errno;
|
|
if (sockerr) {
|
|
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
|
|
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
|
|
strerror(sockerr));
|
|
goto error;
|
|
}
|
|
|
|
/* If we were connecting, it's time to send a non blocking PING, we want to
|
|
* make sure the master is able to reply before going into the actual
|
|
* replication process where we have long timeouts in the order of
|
|
* seconds (in the meantime the slave would block). */
|
|
if (server.repl_state == REDIS_REPL_CONNECTING) {
|
|
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
|
|
/* Delete the writable event so that the readable event remains
|
|
* registered and we can wait for the PONG reply. */
|
|
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
|
|
server.repl_state = REDIS_REPL_RECEIVE_PONG;
|
|
/* Send the PING, don't check for errors at all, we have the timeout
|
|
* that will take care about this. */
|
|
syncWrite(fd,"PING\r\n",6,100);
|
|
return;
|
|
}
|
|
|
|
/* Receive the PONG command. */
|
|
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
|
|
char buf[1024];
|
|
|
|
/* Delete the readable event, we no longer need it now that there is
|
|
* the PING reply to read. */
|
|
aeDeleteFileEvent(server.el,fd,AE_READABLE);
|
|
|
|
/* Read the reply with explicit timeout. */
|
|
buf[0] = '\0';
|
|
if (syncReadLine(fd,buf,sizeof(buf),
|
|
server.repl_syncio_timeout*1000) == -1)
|
|
{
|
|
redisLog(REDIS_WARNING,
|
|
"I/O error reading PING reply from master: %s",
|
|
strerror(errno));
|
|
goto error;
|
|
}
|
|
|
|
/* We accept only two replies as valid, a positive +PONG reply
|
|
* (we just check for "+") or an authentication error.
|
|
* Note that older versions of Redis replied with "operation not
|
|
* permitted" instead of using a proper error code, so we test
|
|
* both. */
|
|
if (buf[0] != '+' &&
|
|
strncmp(buf,"-NOAUTH",7) != 0 &&
|
|
strncmp(buf,"-ERR operation not permitted",28) != 0)
|
|
{
|
|
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
|
|
goto error;
|
|
} else {
|
|
redisLog(REDIS_NOTICE,
|
|
"Master replied to PING, replication can continue...");
|
|
}
|
|
}
|
|
|
|
/* AUTH with the master if required. */
|
|
if(server.masterauth) {
|
|
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
|
|
if (err[0] == '-') {
|
|
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
|
|
sdsfree(err);
|
|
goto error;
|
|
}
|
|
sdsfree(err);
|
|
}
|
|
|
|
/* Set the slave port, so that Master's INFO command can list the
|
|
* slave listening port correctly. */
|
|
{
|
|
sds port = sdsfromlonglong(server.port);
|
|
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
|
|
NULL);
|
|
sdsfree(port);
|
|
/* Ignore the error if any, not all the Redis versions support
|
|
* REPLCONF listening-port. */
|
|
if (err[0] == '-') {
|
|
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
|
|
}
|
|
sdsfree(err);
|
|
}
|
|
|
|
/* Try a partial resynchonization. If we don't have a cached master
|
|
* slaveTryPartialResynchronization() will at least try to use PSYNC
|
|
* to start a full resynchronization so that we get the master run id
|
|
* and the global offset, to try a partial resync at the next
|
|
* reconnection attempt. */
|
|
psync_result = slaveTryPartialResynchronization(fd);
|
|
if (psync_result == PSYNC_CONTINUE) {
|
|
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
|
|
return;
|
|
}
|
|
|
|
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
|
|
* and the server.repl_master_runid and repl_master_initial_offset are
|
|
* already populated. */
|
|
if (psync_result == PSYNC_NOT_SUPPORTED) {
|
|
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
|
|
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
|
|
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
|
|
strerror(errno));
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
/* Prepare a suitable temp file for bulk transfer */
|
|
while(maxtries--) {
|
|
snprintf(tmpfile,256,
|
|
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
|
|
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
|
|
if (dfd != -1) break;
|
|
sleep(1);
|
|
}
|
|
if (dfd == -1) {
|
|
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
|
|
goto error;
|
|
}
|
|
|
|
/* Setup the non blocking download of the bulk file. */
|
|
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
|
|
== AE_ERR)
|
|
{
|
|
redisLog(REDIS_WARNING,
|
|
"Can't create readable event for SYNC: %s (fd=%d)",
|
|
strerror(errno),fd);
|
|
goto error;
|
|
}
|
|
|
|
server.repl_state = REDIS_REPL_TRANSFER;
|
|
server.repl_transfer_size = -1;
|
|
server.repl_transfer_read = 0;
|
|
server.repl_transfer_last_fsync_off = 0;
|
|
server.repl_transfer_fd = dfd;
|
|
server.repl_transfer_lastio = server.unixtime;
|
|
server.repl_transfer_tmpfile = zstrdup(tmpfile);
|
|
return;
|
|
|
|
error:
|
|
close(fd);
|
|
server.repl_transfer_s = -1;
|
|
server.repl_state = REDIS_REPL_CONNECT;
|
|
return;
|
|
}
|
|
|
|
int connectWithMaster(void) {
|
|
int fd;
|
|
|
|
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
|
|
if (fd == -1) {
|
|
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
|
|
strerror(errno));
|
|
return REDIS_ERR;
|
|
}
|
|
|
|
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
|
|
AE_ERR)
|
|
{
|
|
close(fd);
|
|
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
|
|
return REDIS_ERR;
|
|
}
|
|
|
|
server.repl_transfer_lastio = server.unixtime;
|
|
server.repl_transfer_s = fd;
|
|
server.repl_state = REDIS_REPL_CONNECTING;
|
|
return REDIS_OK;
|
|
}
|
|
|
|
/* This function can be called when a non blocking connection is currently
|
|
* in progress to undo it. */
|
|
void undoConnectWithMaster(void) {
|
|
int fd = server.repl_transfer_s;
|
|
|
|
redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
|
|
server.repl_state == REDIS_REPL_RECEIVE_PONG);
|
|
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
|
|
close(fd);
|
|
server.repl_transfer_s = -1;
|
|
server.repl_state = REDIS_REPL_CONNECT;
|
|
}
|
|
|
|
/* This function aborts a non blocking replication attempt if there is one
|
|
* in progress, by canceling the non-blocking connect attempt or
|
|
* the initial bulk transfer.
|
|
*
|
|
* If there was a replication handshake in progress 1 is returned and
|
|
* the replication state (server.repl_state) set to REDIS_REPL_CONNECT.
|
|
*
|
|
* Otherwise zero is returned and no operation is perforemd at all. */
|
|
int cancelReplicationHandshake(void) {
|
|
if (server.repl_state == REDIS_REPL_TRANSFER) {
|
|
replicationAbortSyncTransfer();
|
|
} else if (server.repl_state == REDIS_REPL_CONNECTING ||
|
|
server.repl_state == REDIS_REPL_RECEIVE_PONG)
|
|
{
|
|
undoConnectWithMaster();
|
|
} else {
|
|
return 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
/* Set replication to the specified master address and port. */
|
|
void replicationSetMaster(char *ip, int port) {
|
|
sdsfree(server.masterhost);
|
|
server.masterhost = sdsnew(ip);
|
|
server.masterport = port;
|
|
if (server.master) freeClient(server.master);
|
|
disconnectSlaves(); /* Force our slaves to resync with us as well. */
|
|
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
|
|
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
|
|
cancelReplicationHandshake();
|
|
server.repl_state = REDIS_REPL_CONNECT;
|
|
}
|
|
|
|
/* Cancel replication, setting the instance as a master itself. */
|
|
void replicationUnsetMaster(void) {
|
|
if (server.masterhost == NULL) return; /* Nothing to do. */
|
|
sdsfree(server.masterhost);
|
|
server.masterhost = NULL;
|
|
if (server.master) freeClient(server.master);
|
|
replicationDiscardCachedMaster();
|
|
cancelReplicationHandshake();
|
|
server.repl_state = REDIS_REPL_NONE;
|
|
}
|
|
|
|
void slaveofCommand(redisClient *c) {
|
|
/* SLAVEOF is not allowed in cluster mode as replication is automatically
|
|
* configured using the current address of the master node. */
|
|
if (server.cluster_enabled) {
|
|
addReplyError(c,"SLAVEOF not allowed in cluster mode.");
|
|
return;
|
|
}
|
|
|
|
/* The special host/port combination "NO" "ONE" turns the instance
|
|
* into a master. Otherwise the new master address is set. */
|
|
if (!strcasecmp(c->argv[1]->ptr,"no") &&
|
|
!strcasecmp(c->argv[2]->ptr,"one")) {
|
|
if (server.masterhost) {
|
|
replicationUnsetMaster();
|
|
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
|
|
}
|
|
} else {
|
|
long port;
|
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
|
|
return;
|
|
|
|
/* Check if we are already attached to the specified slave */
|
|
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
|
|
&& server.masterport == port) {
|
|
redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
|
|
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
|
|
return;
|
|
}
|
|
/* There was no previous master or the user specified a different one,
|
|
* we can continue. */
|
|
replicationSetMaster(c->argv[1]->ptr, port);
|
|
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
|
|
server.masterhost, server.masterport);
|
|
}
|
|
addReply(c,shared.ok);
|
|
}
|
|
|
|
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
|
|
|
|
/* In order to implement partial synchronization we need to be able to cache
|
|
* our master's client structure after a transient disconnection.
|
|
* It is cached into server.cached_master and flushed away using the following
|
|
* functions. */
|
|
|
|
/* This function is called by freeClient() in order to cache the master
|
|
* client structure instead of destryoing it. freeClient() will return
|
|
* ASAP after this function returns, so every action needed to avoid problems
|
|
* with a client that is really "suspended" has to be done by this function.
|
|
*
|
|
* The other functions that will deal with the cached master are:
|
|
*
|
|
* replicationDiscardCachedMaster() that will make sure to kill the client
|
|
* as for some reason we don't want to use it in the future.
|
|
*
|
|
* replicationResurrectCachedMaster() that is used after a successful PSYNC
|
|
* handshake in order to reactivate the cached master.
|
|
*/
|
|
void replicationCacheMaster(redisClient *c) {
|
|
listNode *ln;
|
|
|
|
redisAssert(server.master != NULL && server.cached_master == NULL);
|
|
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
|
|
|
|
/* Remove from the list of clients, we don't want this client to be
|
|
* listed by CLIENT LIST or processed in any way by batch operations. */
|
|
ln = listSearchKey(server.clients,c);
|
|
redisAssert(ln != NULL);
|
|
listDelNode(server.clients,ln);
|
|
|
|
/* Save the master. Server.master will be set to null later by
|
|
* replicationHandleMasterDisconnection(). */
|
|
server.cached_master = server.master;
|
|
|
|
/* Remove the event handlers and close the socket. We'll later reuse
|
|
* the socket of the new connection with the master during PSYNC. */
|
|
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
|
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
|
close(c->fd);
|
|
|
|
/* Set fd to -1 so that we can safely call freeClient(c) later. */
|
|
c->fd = -1;
|
|
|
|
/* Caching the master happens instead of the actual freeClient() call,
|
|
* so make sure to adjust the replication state. This function will
|
|
* also set server.master to NULL. */
|
|
replicationHandleMasterDisconnection();
|
|
}
|
|
|
|
/* Free a cached master, called when there are no longer the conditions for
|
|
* a partial resync on reconnection. */
|
|
void replicationDiscardCachedMaster(void) {
|
|
if (server.cached_master == NULL) return;
|
|
|
|
redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
|
|
server.cached_master->flags &= ~REDIS_MASTER;
|
|
freeClient(server.cached_master);
|
|
server.cached_master = NULL;
|
|
}
|
|
|
|
/* Turn the cached master into the current master, using the file descriptor
|
|
* passed as argument as the socket for the new master.
|
|
*
|
|
* This funciton is called when successfully setup a partial resynchronization
|
|
* so the stream of data that we'll receive will start from were this
|
|
* master left. */
|
|
void replicationResurrectCachedMaster(int newfd) {
|
|
server.master = server.cached_master;
|
|
server.cached_master = NULL;
|
|
server.master->fd = newfd;
|
|
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
|
|
server.master->authenticated = 1;
|
|
server.master->lastinteraction = server.unixtime;
|
|
server.repl_state = REDIS_REPL_CONNECTED;
|
|
|
|
/* Re-add to the list of clients. */
|
|
listAddNodeTail(server.clients,server.master);
|
|
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
|
|
readQueryFromClient, server.master)) {
|
|
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
|
|
freeClientAsync(server.master); /* Close ASAP. */
|
|
}
|
|
}
|
|
|
|
/* --------------------------- REPLICATION CRON ---------------------------- */
|
|
|
|
void replicationCron(void) {
|
|
/* Non blocking connection timeout? */
|
|
if (server.masterhost &&
|
|
(server.repl_state == REDIS_REPL_CONNECTING ||
|
|
server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
|
|
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
|
|
{
|
|
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
|
|
undoConnectWithMaster();
|
|
}
|
|
|
|
/* Bulk transfer I/O timeout? */
|
|
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
|
|
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
|
|
{
|
|
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
|
|
replicationAbortSyncTransfer();
|
|
}
|
|
|
|
/* Timed out master when we are an already connected slave? */
|
|
if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
|
|
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
|
|
{
|
|
redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
|
|
freeClient(server.master);
|
|
}
|
|
|
|
/* Check if we should connect to a MASTER */
|
|
if (server.repl_state == REDIS_REPL_CONNECT) {
|
|
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
|
|
if (connectWithMaster() == REDIS_OK) {
|
|
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
|
|
}
|
|
}
|
|
|
|
/* If we have attached slaves, PING them from time to time.
|
|
* So slaves can implement an explicit timeout to masters, and will
|
|
* be able to detect a link disconnection even if the TCP connection
|
|
* will not actually go down. */
|
|
if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
|
|
listIter li;
|
|
listNode *ln;
|
|
robj *ping_argv[1];
|
|
|
|
/* First, send PING */
|
|
ping_argv[0] = createStringObject("PING",4);
|
|
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
|
|
decrRefCount(ping_argv[0]);
|
|
|
|
/* Second, send a newline to all the slaves in pre-synchronization
|
|
* stage, that is, slaves waiting for the master to create the RDB file.
|
|
* The newline will be ignored by the slave but will refresh the
|
|
* last-io timer preventing a timeout. */
|
|
listRewind(server.slaves,&li);
|
|
while((ln = listNext(&li))) {
|
|
redisClient *slave = ln->value;
|
|
|
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
|
|
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
|
if (write(slave->fd, "\n", 1) == -1) {
|
|
/* Don't worry, it's just a ping. */
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* If we have no attached slaves and there is a replication backlog
|
|
* using memory, free it after some (configured) time. */
|
|
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
|
|
server.repl_backlog)
|
|
{
|
|
time_t idle = server.unixtime - server.repl_no_slaves_since;
|
|
|
|
if (idle > server.repl_backlog_time_limit) {
|
|
freeReplicationBacklog();
|
|
redisLog(REDIS_NOTICE,
|
|
"Replication backlog freed after %d seconds "
|
|
"without connected slaves.",
|
|
(int) server.repl_backlog_time_limit);
|
|
}
|
|
}
|
|
}
|