diff --git a/redis.conf b/redis.conf index 0547cada..81dcf5a8 100644 --- a/redis.conf +++ b/redis.conf @@ -240,6 +240,45 @@ slave-serve-stale-data yes # administrative / dangerous commands. slave-read-only yes +# Replication SYNC strategy: disk or socket. +# +# New slaves and reconnecting slaves that are not able to continue the replication +# process just receiving differences, need to do what is called a "full +# synchronization". An RDB file is transmitted from the master to the slaves. +# The transmission can happen in two different ways: +# +# 1) Disk-backed: The Redis master creates a new process that writes the RDB +# file on disk. Later the file is transferred by the parent +# process to the slaves incrementally. +# 2) Diskless: The Redis master creates a new process that directly writes the +# RDB file to slave sockets, without touching the disk at all. +# +# With disk-backed replication, while the RDB file is generated, more slaves +# can be queued and served with the RDB file as soon as the current child producing +# the RDB file finishes its work. With diskless replication instead once +# the transfer starts, new slaves arriving will be queued and a new transfer +# will start when the current one terminates. +# +# When diskless replication is used, the master waits a configurable amount of +# time (in seconds) before starting the transfer in the hope that multiple slaves +# will arrive and the transfer can be parallelized. +# +# With slow disks and fast (large bandwidth) networks, diskless replication +# works better. +repl-diskless-sync no + +# When diskless replication is enabled, it is possible to configure the delay +# the server waits in order to spawn the child that trnasfers the RDB via socket +# to the slaves. +# +# This is important since once the transfer starts, it is not possible to serve +# new slaves arriving, that will be queued for the next RDB transfer, so the server +# waits a delay in order to let more slaves arrive. +# +# The delay is specified in seconds, and by default is 5 seconds. To disable +# it entirely just set it to 0 seconds and the transfer will start ASAP. +repl-diskless-sync-delay 5 + # Slaves send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_slave_period option. The default value is 10 # seconds. diff --git a/src/anet.c b/src/anet.c index e3c15594..1e5d8549 100644 --- a/src/anet.c +++ b/src/anet.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -57,24 +58,37 @@ static void anetSetError(char *err, const char *fmt, ...) va_end(ap); } -int anetNonBlock(char *err, int fd) -{ +int anetSetBlock(char *err, int fd, int non_block) { int flags; - /* Set the socket non-blocking. + /* Set the socket blocking (if non_block is zero) or non-blocking. * Note that fcntl(2) for F_GETFL and F_SETFL can't be * interrupted by a signal. */ if ((flags = fcntl(fd, F_GETFL)) == -1) { anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno)); return ANET_ERR; } - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + + if (non_block) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) == -1) { anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno)); return ANET_ERR; } return ANET_OK; } +int anetNonBlock(char *err, int fd) { + return anetSetBlock(err,fd,1); +} + +int anetBlock(char *err, int fd) { + return anetSetBlock(err,fd,0); +} + /* Set TCP keep alive option to detect dead peers. The interval option * is only used for Linux as we are using Linux-specific APIs to set * the probe send time, interval, and count. */ @@ -165,6 +179,20 @@ int anetTcpKeepAlive(char *err, int fd) return ANET_OK; } +/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified + * number of milliseconds, or disable it if the 'ms' argument is zero. */ +int anetSendTimeout(char *err, int fd, long long ms) { + struct timeval tv; + + tv.tv_sec = ms/1000; + tv.tv_usec = (ms%1000)*1000; + if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) { + anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* anetGenericResolve() is called by anetResolve() and anetResolveIP() to * do the actual work. It resolves the hostname "host" and set the string * representation of the IP address into the buffer pointed by "ipbuf". diff --git a/src/anet.h b/src/anet.h index 5191c4b6..b94a0cd1 100644 --- a/src/anet.h +++ b/src/anet.h @@ -62,9 +62,11 @@ int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port) int anetUnixAccept(char *err, int serversock); int anetWrite(int fd, char *buf, int count); int anetNonBlock(char *err, int fd); +int anetBlock(char *err, int fd); int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); +int anetSendTimeout(char *err, int fd, long long ms); int anetPeerToString(int fd, char *ip, size_t ip_len, int *port); int anetKeepAlive(char *err, int fd, int interval); int anetSockName(int fd, char *ip, size_t ip_len, int *port); diff --git a/src/config.c b/src/config.c index 43507000..05cb7c9f 100644 --- a/src/config.c +++ b/src/config.c @@ -270,6 +270,16 @@ void loadServerConfigFromString(char *config) { if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) { + if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } + } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { + server.repl_diskless_sync_delay = atoi(argv[1]); + if (server.repl_diskless_sync_delay < 0) { + err = "repl-diskless-sync-delay can't be negative"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) { long long size = memtoll(argv[1],NULL); if (size <= 0) { @@ -284,7 +294,7 @@ void loadServerConfigFromString(char *config) { goto loaderr; } } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { - server.masterauth = zstrdup(argv[1]); + server.masterauth = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) { if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -911,6 +921,15 @@ void configSetCommand(redisClient *c) { if (yn == -1) goto badfmt; server.repl_disable_tcp_nodelay = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.repl_diskless_sync = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync-delay")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.repl_diskless_sync_delay = ll; } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; @@ -1049,6 +1068,7 @@ void configGetCommand(redisClient *c) { config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout); config_get_numerical_field("cluster-migration-barrier",server.cluster_migration_barrier); config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor); + config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); /* Bool (yes/no) values */ config_get_bool_field("cluster-require-full-coverage", @@ -1067,6 +1087,8 @@ void configGetCommand(redisClient *c) { config_get_bool_field("activerehashing", server.activerehashing); config_get_bool_field("repl-disable-tcp-nodelay", server.repl_disable_tcp_nodelay); + config_get_bool_field("repl-diskless-sync", + server.repl_diskless_sync); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", @@ -1792,6 +1814,8 @@ int rewriteConfig(char *path) { rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,REDIS_DEFAULT_REPL_BACKLOG_SIZE); rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT); rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY); + rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC); + rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE); rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG); diff --git a/src/networking.c b/src/networking.c index c7b1c9ba..cc9bbd98 100644 --- a/src/networking.c +++ b/src/networking.c @@ -678,12 +678,8 @@ void freeClient(redisClient *c) { /* Log link disconnection with slave */ if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) { - char ip[REDIS_IP_STR_LEN]; - - if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { - redisLog(REDIS_WARNING,"Connection with slave %s:%d lost.", - ip, c->slave_listening_port); - } + redisLog(REDIS_WARNING,"Connection with slave %s lost.", + replicationGetSlaveName(c)); } /* Free the query buffer */ diff --git a/src/rdb.c b/src/rdb.c index 4d789bc2..d2fd405d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -627,17 +627,100 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, return 1; } -/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -int rdbSave(char *filename) { +/* Produces a dump of the database in RDB format sending it to the specified + * Redis I/O channel. On success REDIS_OK is returned, otherwise REDIS_ERR + * is returned and part of the output, or all the output, can be + * missing because of I/O errors. + * + * When the function returns REDIS_ERR and if 'error' is not NULL, the + * integer pointed by 'error' is set to the value of errno just after the I/O + * error. */ +int rdbSaveRio(rio *rdb, int *error) { dictIterator *di = NULL; dictEntry *de; - char tmpfile[256]; char magic[10]; int j; long long now = mstime(); + uint64_t cksum; + + if (server.rdb_checksum) + rdb->update_cksum = rioGenericUpdateChecksum; + snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); + if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; + + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; + di = dictGetSafeIterator(d); + if (!di) return REDIS_ERR; + + /* Write the SELECT DB opcode */ + if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; + if (rdbSaveLen(rdb,j) == -1) goto werr; + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + sds keystr = dictGetKey(de); + robj key, *o = dictGetVal(de); + long long expire; + + initStaticStringObject(key,keystr); + expire = getExpire(db,&key); + if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr; + } + dictReleaseIterator(di); + } + di = NULL; /* So that we don't release it again on error. */ + + /* EOF opcode */ + if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; + + /* CRC64 checksum. It will be zero if checksum computation is disabled, the + * loading code skips the check in this case. */ + cksum = rdb->cksum; + memrev64ifbe(&cksum); + if (rioWrite(rdb,&cksum,8) == 0) goto werr; + return REDIS_OK; + +werr: + if (error) *error = errno; + if (di) dictReleaseIterator(di); + return REDIS_ERR; +} + +/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix + * and a suffix to the generated RDB dump. The prefix is: + * + * $EOF:<40 bytes unguessable hex string>\r\n + * + * While the suffix is the 40 bytes hex string we announced in the prefix. + * This way processes receiving the payload can understand when it ends + * without doing any processing of the content. */ +int rdbSaveRioWithEOFMark(rio *rdb, int *error) { + char eofmark[REDIS_EOF_MARK_SIZE]; + + getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE); + if (error) *error = 0; + if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + if (rioWrite(rdb,"\r\n",2) == 0) goto werr; + if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr; + if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr; + return REDIS_OK; + +werr: /* Write error. */ + /* Set 'error' only if not already set by rdbSaveRio() call. */ + if (error && *error == 0) *error = errno; + return REDIS_ERR; +} + +/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */ +int rdbSave(char *filename) { + char tmpfile[256]; FILE *fp; rio rdb; - uint64_t cksum; + int error; snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); @@ -648,47 +731,10 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (server.rdb_checksum) - rdb.update_cksum = rioGenericUpdateChecksum; - snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); - if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr; - - for (j = 0; j < server.dbnum; j++) { - redisDb *db = server.db+j; - dict *d = db->dict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); - if (!di) { - fclose(fp); - return REDIS_ERR; - } - - /* Write the SELECT DB opcode */ - if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; - if (rdbSaveLen(&rdb,j) == -1) goto werr; - - /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds keystr = dictGetKey(de); - robj key, *o = dictGetVal(de); - long long expire; - - initStaticStringObject(key,keystr); - expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr; - } - dictReleaseIterator(di); + if (rdbSaveRio(&rdb,&error) == REDIS_ERR) { + errno = error; + goto werr; } - di = NULL; /* So that we don't release it again on error. */ - - /* EOF opcode */ - if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; - - /* CRC64 checksum. It will be zero if checksum computation is disabled, the - * loading code skips the check in this case. */ - cksum = rdb.cksum; - memrev64ifbe(&cksum); - if (rioWrite(&rdb,&cksum,8) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; @@ -712,7 +758,6 @@ werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); - if (di) dictReleaseIterator(di); return REDIS_ERR; } @@ -757,6 +802,7 @@ int rdbSaveBackground(char *filename) { redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK; updateDictResizePolicy(); return REDIS_OK; } @@ -1191,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return REDIS_ERR; /* Just to avoid warning */ } -/* A background saving child (BGSAVE) terminated its work. Handle this. */ -void backgroundSaveDoneHandler(int exitcode, int bysignal) { +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of actual BGSAVEs. */ +void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); @@ -1217,11 +1264,258 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { server.lastbgsave_status = REDIS_ERR; } server.rdb_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ - updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR); + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK); +} + +/* A background saving child (BGSAVE) terminated its work. Handle this. + * This function covers the case of RDB -> Salves socket transfers for + * diskless replication. */ +void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + uint64_t *ok_slaves; + + if (!bysignal && exitcode == 0) { + redisLog(REDIS_NOTICE, + "Background RDB transfer terminated with success"); + } else if (!bysignal && exitcode != 0) { + redisLog(REDIS_WARNING, "Background transfer error"); + } else { + redisLog(REDIS_WARNING, + "Background transfer terminated by signal %d", bysignal); + } + server.rdb_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; + server.rdb_save_time_start = -1; + + /* If the child returns an OK exit code, read the set of slave client + * IDs and the associated status code. We'll terminate all the slaves + * in error state. + * + * If the process returned an error, consider the list of slaves that + * can continue to be emtpy, so that it's just a special case of the + * normal code path. */ + ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ + ok_slaves[0] = 0; + if (!bysignal && exitcode == 0) { + int readlen = sizeof(uint64_t); + + if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) == + readlen) + { + readlen = ok_slaves[0]*sizeof(uint64_t)*2; + + /* Make space for enough elements as specified by the first + * uint64_t element in the array. */ + ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen); + if (readlen && + read(server.rdb_pipe_read_result_from_child, ok_slaves+1, + readlen) != readlen) + { + ok_slaves[0] = 0; + } + } + } + + close(server.rdb_pipe_read_result_from_child); + close(server.rdb_pipe_write_result_to_parent); + + /* We can continue the replication process with all the slaves that + * correctly received the full payload. Others are terminated. */ + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + uint64_t j; + int errorcode = 0; + + /* Search for the slave ID in the reply. In order for a slave to + * continue the replication process, we need to find it in the list, + * and it must have an error code set to 0 (which means success). */ + for (j = 0; j < ok_slaves[0]; j++) { + if (slave->id == ok_slaves[2*j+1]) { + errorcode = ok_slaves[2*j+2]; + break; /* Found in slaves list. */ + } + } + if (j == ok_slaves[0] || errorcode != 0) { + redisLog(REDIS_WARNING, + "Closing slave %s: child->slave RDB transfer failed: %s", + replicationGetSlaveName(slave), + (errorcode == 0) ? "RDB transfer child aborted" + : strerror(errorcode)); + freeClient(slave); + } else { + redisLog(REDIS_WARNING, + "Slave %s correctly received the streamed RDB file.", + replicationGetSlaveName(slave)); + /* Restore the socket as non-blocking. */ + anetNonBlock(NULL,slave->fd); + anetSendTimeout(NULL,slave->fd,0); + } + } + } + zfree(ok_slaves); + + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); +} + +/* When a background RDB saving/transfer terminates, call the right handler. */ +void backgroundSaveDoneHandler(int exitcode, int bysignal) { + switch(server.rdb_child_type) { + case REDIS_RDB_CHILD_TYPE_DISK: + backgroundSaveDoneHandlerDisk(exitcode,bysignal); + break; + case REDIS_RDB_CHILD_TYPE_SOCKET: + backgroundSaveDoneHandlerSocket(exitcode,bysignal); + break; + default: + redisPanic("Unknown RDB child type."); + break; + } +} + +/* Spawn an RDB child that writes the RDB to the sockets of the slaves + * that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */ +int rdbSaveToSlavesSockets(void) { + int *fds; + uint64_t *clientids; + int numfds; + listNode *ln; + listIter li; + pid_t childpid; + long long start; + int pipefds[2]; + + if (server.rdb_child_pid != -1) return REDIS_ERR; + + /* Before to fork, create a pipe that will be used in order to + * send back to the parent the IDs of the slaves that successfully + * received all the writes. */ + if (pipe(pipefds) == -1) return REDIS_ERR; + server.rdb_pipe_read_result_from_child = pipefds[0]; + server.rdb_pipe_write_result_to_parent = pipefds[1]; + + /* Collect the file descriptors of the slaves we want to transfer + * the RDB to, which are i WAIT_BGSAVE_START state. */ + fds = zmalloc(sizeof(int)*listLength(server.slaves)); + /* We also allocate an array of corresponding client IDs. This will + * be useful for the child process in order to build the report + * (sent via unix pipe) that will be sent to the parent. */ + clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves)); + numfds = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + clientids[numfds] = slave->id; + fds[numfds++] = slave->fd; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + /* Put the socket in non-blocking mode to simplify RDB transfer. + * We'll restore it when the children returns (since duped socket + * will share the O_NONBLOCK attribute with the parent). */ + anetBlock(NULL,slave->fd); + anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000); + } + } + + /* Create the child process. */ + start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + int retval; + rio slave_sockets; + + rioInitWithFdset(&slave_sockets,fds,numfds); + zfree(fds); + + closeListeningSockets(0); + redisSetProcTitle("redis-rdb-to-slaves"); + + retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0) + retval = REDIS_ERR; + + if (retval == REDIS_OK) { + size_t private_dirty = zmalloc_get_private_dirty(); + + if (private_dirty) { + redisLog(REDIS_NOTICE, + "RDB: %zu MB of memory used by copy-on-write", + private_dirty/(1024*1024)); + } + + /* If we are returning OK, at least one slave was served + * with the RDB file as expected, so we need to send a report + * to the parent via the pipe. The format of the message is: + * + * ... + * + * len, slave IDs, and slave errors, are all uint64_t integers, + * so basically the reply is composed of 64 bits for the len field + * plus 2 additional 64 bit integers for each entry, for a total + * of 'len' entries. + * + * The 'id' represents the slave's client ID, so that the master + * can match the report with a specific slave, and 'error' is + * set to 0 if the replication process terminated with a success + * or the error code if an error occurred. */ + void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds)); + uint64_t *len = msg; + uint64_t *ids = len+1; + int j, msglen; + + *len = numfds; + for (j = 0; j < numfds; j++) { + *ids++ = clientids[j]; + *ids++ = slave_sockets.io.fdset.state[j]; + } + + /* Write the message to the parent. If we have no good slaves or + * we are unable to transfer the message to the parent, we exit + * with an error so that the parent will abort the replication + * process with all the childre that were waiting. */ + msglen = sizeof(uint64_t)*(1+2*numfds); + if (*len == 0 || + write(server.rdb_pipe_write_result_to_parent,msg,msglen) + != msglen) + { + retval = REDIS_ERR; + } + } + exitFromChild((retval == REDIS_OK) ? 0 : 1); + } else { + /* Parent */ + zfree(clientids); /* Not used by parent. Free ASAP. */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + redisLog(REDIS_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + zfree(fds); + close(pipefds[0]); + close(pipefds[1]); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid); + server.rdb_save_time_start = time(NULL); + server.rdb_child_pid = childpid; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET; + updateDictResizePolicy(); + zfree(fds); + return REDIS_OK; + } + return REDIS_OK; /* unreached */ } void saveCommand(redisClient *c) { diff --git a/src/rdb.h b/src/rdb.h index 54ee4e51..eb40d499 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename); int rdbSaveBackground(char *filename); +int rdbSaveToSlavesSockets(void); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char *filename); int rdbSaveObject(rio *rdb, robj *o); diff --git a/src/redis.c b/src/redis.c index e7faa885..09b8103b 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1480,6 +1480,8 @@ void initServerConfig(void) { server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY; + server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; @@ -1768,6 +1770,7 @@ void initServer(void) { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; + server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ diff --git a/src/redis.h b/src/redis.h index a1ae0f2b..f5301ab2 100644 --- a/src/redis.h +++ b/src/redis.h @@ -96,6 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_REPL_TIMEOUT 60 #define REDIS_REPL_PING_SLAVE_PERIOD 10 #define REDIS_RUN_ID_SIZE 40 +#define REDIS_EOF_MARK_SIZE 40 #define REDIS_OPS_SEC_SAMPLES 16 #define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */ #define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */ @@ -113,6 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_DEFAULT_RDB_COMPRESSION 1 #define REDIS_DEFAULT_RDB_CHECKSUM 1 #define REDIS_DEFAULT_RDB_FILENAME "dump.rdb" +#define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0 +#define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define REDIS_DEFAULT_SLAVE_READ_ONLY 1 #define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0 @@ -361,6 +364,11 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_PROPAGATE_AOF 1 #define REDIS_PROPAGATE_REPL 2 +/* RDB active child save type. */ +#define REDIS_RDB_CHILD_TYPE_NONE 0 +#define REDIS_RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ +#define REDIS_RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */ + /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. */ #define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */ @@ -764,8 +772,11 @@ struct redisServer { time_t lastbgsave_try; /* Unix time of last attempted bgsave */ time_t rdb_save_time_last; /* Time used by last RDB save run. */ time_t rdb_save_time_start; /* Current RDB save start time. */ + int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ + int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ + int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ @@ -790,6 +801,8 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ + int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ @@ -1132,7 +1145,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc); -void updateSlavesWaitingBgsave(int bgsaveerr); +void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); void replicationHandleMasterDisconnection(void); void replicationCacheMaster(redisClient *c); @@ -1149,6 +1162,7 @@ void unblockClientWaitingReplicas(redisClient *c); int replicationCountAcksByOffset(long long offset); void replicationSendNewlineToMaster(void); long long replicationGetSlaveOffset(void); +char *replicationGetSlaveName(redisClient *c); /* Generic persistence functions */ void startLoading(FILE *fp); diff --git a/src/replication.c b/src/replication.c index 16014c8a..77f9fa8b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -41,6 +41,30 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); void replicationSendAck(void); +/* --------------------------- Utility functions ---------------------------- */ + +/* Return the pointer to a string representing the slave ip:listening_port + * pair. Mostly useful for logging, since we want to log a slave using its + * IP address and it's listening port which is more clear for the user, for + * example: "Closing connection with slave 10.1.2.3:6380". */ +char *replicationGetSlaveName(redisClient *c) { + static char buf[REDIS_PEER_ID_LEN]; + char ip[REDIS_IP_STR_LEN]; + + ip[0] = '\0'; + buf[0] = '\0'; + if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) { + if (c->slave_listening_port) + snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port); + else + snprintf(buf,sizeof(buf),"%s:",ip); + } else { + snprintf(buf,sizeof(buf),"client id #%llu", + (unsigned long long) c->id); + } + return buf; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -212,7 +236,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } /* Write the command to every slave. */ - listRewind(slaves,&li); + listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; @@ -345,7 +369,8 @@ int masterTryPartialResynchronization(redisClient *c) { "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { - redisLog(REDIS_NOTICE,"Full resync requested by slave."); + redisLog(REDIS_NOTICE,"Full resync requested by slave %s", + replicationGetSlaveName(c)); } goto need_full_resync; } @@ -358,10 +383,10 @@ int masterTryPartialResynchronization(redisClient *c) { psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { redisLog(REDIS_NOTICE, - "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); + "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { redisLog(REDIS_WARNING, - "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); + "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } @@ -384,7 +409,9 @@ int masterTryPartialResynchronization(redisClient *c) { } psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, - "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); + "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", + replicationGetSlaveName(c), + psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ @@ -408,6 +435,28 @@ need_full_resync: return REDIS_ERR; } +/* Start a BGSAVE for replication goals, which is, selecting the disk or + * socket target depending on the configuration, and making sure that + * the script cache is flushed before to start. + * + * Returns REDIS_OK on success or REDIS_ERR otherwise. */ +int startBgsaveForReplication(void) { + int retval; + + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s", + server.repl_diskless_sync ? "slaves sockets" : "disk"); + + if (server.repl_diskless_sync) + retval = rdbSaveToSlavesSockets(); + else + retval = rdbSaveBackground(server.rdb_filename); + + /* Flush the script cache, since we need that slave differences are + * accumulated without requiring slaves to match our cached scripts. */ + if (retval == REDIS_OK) replicationScriptCacheFlush(); + return retval; +} + /* SYNC and PSYNC command implemenation. */ void syncCommand(redisClient *c) { /* ignore SYNC if already slave or in monitor mode */ @@ -429,7 +478,8 @@ void syncCommand(redisClient *c) { return; } - redisLog(REDIS_NOTICE,"Slave asks for synchronization"); + redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", + replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however @@ -465,10 +515,12 @@ void syncCommand(redisClient *c) { /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.rdb_child_pid != -1) { + if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) + { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is - * registering differences since the server forked to save */ + * registering differences since the server forked to save. */ redisClient *slave; listNode *ln; listIter li; @@ -486,21 +538,35 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to - * register differences */ + * register differences. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } + } else if (server.rdb_child_pid != -1 && + server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) + { + /* There is an RDB child process but it is writing directly to + * children sockets. We need to wait for the next BGSAVE + * in order to synchronize. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { - /* Ok we don't have a BGSAVE in progress, let's start one */ - redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { - redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplyError(c,"Unable to perform background save"); - return; + if (server.repl_diskless_sync) { + /* Diskless replication RDB child is created inside + * replicationCron() since we want to delay its start a + * few seconds to wait for more slaves to arrive. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + if (server.repl_diskless_sync_delay) + redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); + } else { + /* Ok we don't have a BGSAVE in progress, let's start one. */ + if (startBgsaveForReplication() != REDIS_OK) { + redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); + addReplyError(c,"Unable to perform background save"); + return; + } + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } - c->replstate = REDIS_REPL_WAIT_BGSAVE_END; - /* Flush the script cache for the new slave. */ - replicationScriptCacheFlush(); } if (server.repl_disable_tcp_nodelay) @@ -573,6 +639,29 @@ void replconfCommand(redisClient *c) { addReply(c,shared.ok); } +/* This function puts a slave in the online state, and should be called just + * after a slave received the RDB file for the initial synchronization, and + * we are finally ready to send the incremental stream of commands. + * + * It does a few things: + * + * 1) Put the slave in ONLINE state. + * 2) Make sure the writable event is re-installed, since calling the SYNC + * command disables it, so that we can accumulate output buffer without + * sending it to the slave. + * 3) Update the count of good slaves. */ +void putSlaveOnline(redisClient *slave) { + slave->replstate = REDIS_REPL_ONLINE; + slave->repl_ack_time = server.unixtime; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave) == AE_ERR) { + redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); + freeClient(slave); + return; + } + refreshGoodSlavesCount(); +} + void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); @@ -623,26 +712,26 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - slave->replstate = REDIS_REPL_ONLINE; - slave->repl_ack_time = server.unixtime; - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, - sendReplyToClient, slave) == AE_ERR) { - redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); - freeClient(slave); - return; - } - refreshGoodSlavesCount(); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + putSlaveOnline(slave); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)"); } } -/* This function is called at the end of every background saving. - * The argument bgsaveerr is REDIS_OK if the background saving succeeded - * otherwise REDIS_ERR is passed to the function. +/* This function is called at the end of every background saving, + * or when the replication RDB transfer strategy is modified from + * disk to socket or the other way around. * * The goal of this function is to handle slaves waiting for a successful - * background saving in order to perform non-blocking synchronization. */ -void updateSlavesWaitingBgsave(int bgsaveerr) { + * background saving in order to perform non-blocking synchronization, and + * to schedule a new BGSAVE if there are slaves that attached while a + * BGSAVE was in progress, but it was not a good one for replication (no + * other slave was accumulating differences). + * + * The argument bgsaveerr is REDIS_OK if the background saving succeeded + * otherwise REDIS_ERR is passed to the function. + * The 'type' argument is the type of the child that terminated + * (if it had a disk or socket target). */ +void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; int startbgsave = 0; listIter li; @@ -657,37 +746,44 @@ void updateSlavesWaitingBgsave(int bgsaveerr) { } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; - if (bgsaveerr != REDIS_OK) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); - continue; - } - if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || - redis_fstat(slave->repldbfd,&buf) == -1) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); - continue; - } - slave->repldboff = 0; - slave->repldbsize = buf.st_size; - slave->replstate = REDIS_REPL_SEND_BULK; - slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", - (unsigned long long) slave->repldbsize); + /* If this was an RDB on disk save, we have to prepare to send + * the RDB from disk to the slave socket. Otherwise if this was + * already an RDB -> Slaves socket transfer, used in the case of + * diskless replication, our work is trivial, we can just put + * the slave online. */ + if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { + putSlaveOnline(slave); + redisLog(REDIS_NOTICE, + "Synchronization with slave %s succeeded (socket)", + replicationGetSlaveName(slave)); + } else { + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || + redis_fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", + (unsigned long long) slave->repldbsize); - aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { - freeClient(slave); - continue; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + freeClient(slave); + continue; + } } } } if (startbgsave) { - /* Since we are starting a new background save for one or more slaves, - * we flush the Replication Script Cache to use EVAL to propagate every - * new EVALSHA for the first time, since all the new slaves don't know - * about previous scripts. */ - replicationScriptCacheFlush(); - if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { + if (startBgsaveForReplication() != REDIS_OK) { listIter li; listRewind(server.slaves,&li); @@ -751,6 +847,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); + /* Static vars used to hold the EOF mark, and the last bytes received + * form the server: when they match, we reached the end of the transfer. */ + static char eofmark[REDIS_RUN_ID_SIZE]; + static char lastbytes[REDIS_RUN_ID_SIZE]; + static int usemark = 0; + /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ if (server.repl_transfer_size == -1) { @@ -776,16 +878,44 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } - server.repl_transfer_size = strtol(buf+1,NULL,10); - redisLog(REDIS_NOTICE, - "MASTER <-> SLAVE sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + + /* There are two possible forms for the bulk payload. One is the + * usual $ bulk format. The other is used for diskless transfers + * when the master does not know beforehand the size of the file to + * transfer. In the latter case, the following format is used: + * + * $EOF:<40 bytes delimiter> + * + * At the end of the file the announced delimiter is transmitted. The + * delimiter is long and random enough that the probability of a + * collision with the actual file content can be ignored. */ + if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) { + usemark = 1; + memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE); + memset(lastbytes,0,REDIS_RUN_ID_SIZE); + /* Set any repl_transfer_size to avoid entering this code path + * at the next call. */ + server.repl_transfer_size = 0; + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving streamed RDB from master"); + } else { + usemark = 0; + server.repl_transfer_size = strtol(buf+1,NULL,10); + redisLog(REDIS_NOTICE, + "MASTER <-> SLAVE sync: receiving %lld bytes from master", + (long long) server.repl_transfer_size); + } return; } /* Read bulk data */ - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + if (usemark) { + readlen = sizeof(buf); + } else { + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); + } + nread = read(fd,buf,readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", @@ -793,6 +923,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { replicationAbortSyncTransfer(); return; } + + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our delimiter. */ + if (nread >= REDIS_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE); + } else { + int rem = REDIS_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1; + } + server.repl_transfer_lastio = server.unixtime; if (write(server.repl_transfer_fd,buf,nread) != nread) { redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); @@ -800,6 +947,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } server.repl_transfer_read += nread; + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, + server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1) + { + redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + goto error; + } + } + /* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */ @@ -814,7 +971,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Check if the transfer is now complete */ - if (server.repl_transfer_read == server.repl_transfer_size) { + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + + if (eof_reached) { if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); replicationAbortSyncTransfer(); @@ -1817,7 +1979,9 @@ void replicationCron(void) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || - slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && + server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) + { if (write(slave->fd, "\n", 1) == -1) { /* Don't worry, it's just a ping. */ } @@ -1838,15 +2002,8 @@ void replicationCron(void) { if (slave->flags & REDIS_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { - char ip[REDIS_IP_STR_LEN]; - int port; - - if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) { - redisLog(REDIS_WARNING, - "Disconnecting timedout slave: %s:%d", - ip, slave->slave_listening_port); - } - freeClient(slave); + redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", + replicationGetSlaveName(slave)); } } } @@ -1877,6 +2034,48 @@ void replicationCron(void) { replicationScriptCacheFlush(); } + /* If we are using diskless replication and there are slaves waiting + * in WAIT_BGSAVE_START state, check if enough seconds elapsed and + * start a BGSAVE. + * + * This code is also useful to trigger a BGSAVE if the diskless + * replication was turned off with CONFIG SET, while there were already + * slaves in WAIT_BGSAVE_START state. */ + if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + time_t idle, max_idle = 0; + int slaves_waiting = 0; + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + idle = server.unixtime - slave->lastinteraction; + if (idle > max_idle) max_idle = idle; + slaves_waiting++; + } + } + + if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { + /* Start a BGSAVE. Usually with socket target, or with disk target + * if there was a recent socket -> disk config change. */ + if (startBgsaveForReplication() == REDIS_OK) { + /* It started! We need to change the state of slaves + * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case + * the current target is disk. Otherwise it was already done + * by rdbSaveToSlavesSockets() which is called by + * startBgsaveForReplication(). */ + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } + } + } + } + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); } diff --git a/src/rio.c b/src/rio.c index 44f9b7a0..738e56fd 100644 --- a/src/rio.c +++ b/src/rio.c @@ -55,6 +55,8 @@ #include "config.h" #include "redis.h" +/* ------------------------- Buffer I/O implementation ----------------------- */ + /* Returns 1 or 0 for success/failure. */ static size_t rioBufferWrite(rio *r, const void *buf, size_t len) { r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len); @@ -76,6 +78,33 @@ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; } +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioBufferFlush(rio *r) { + REDIS_NOTUSED(r); + return 1; /* Nothing to do, our write just appends to the buffer. */ +} + +static const rio rioBufferIO = { + rioBufferRead, + rioBufferWrite, + rioBufferTell, + rioBufferFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithBuffer(rio *r, sds s) { + *r = rioBufferIO; + r->io.buffer.ptr = s; + r->io.buffer.pos = 0; +} + +/* --------------------- Stdio file pointer implementation ------------------- */ + /* Returns 1 or 0 for success/failure. */ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t retval; @@ -103,21 +132,17 @@ static off_t rioFileTell(rio *r) { return ftello(r->io.file.fp); } -static const rio rioBufferIO = { - rioBufferRead, - rioBufferWrite, - rioBufferTell, - NULL, /* update_checksum */ - 0, /* current checksum */ - 0, /* bytes read or written */ - 0, /* read/write chunk size */ - { { NULL, 0 } } /* union for io-specific vars */ -}; +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFileFlush(rio *r) { + return (fflush(r->io.file.fp) == 0) ? 1 : 0; +} static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ @@ -132,12 +157,133 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } -void rioInitWithBuffer(rio *r, sds s) { - *r = rioBufferIO; - r->io.buffer.ptr = s; - r->io.buffer.pos = 0; +/* ------------------- File descriptors set implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. + * The function returns success as long as we are able to correctly write + * to at least one file descriptor. + * + * When buf is NULL adn len is 0, the function performs a flush operation + * if there is some pending buffer, so this function is also used in order + * to implement rioFdsetFlush(). */ +static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + ssize_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + int doflush = (buf == NULL && len == 0); + + /* To start we always append to our buffer. If it gets larger than + * a given size, we actually write to the sockets. */ + if (len) { + r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); + len = 0; /* Prevent entering the while belove if we don't flush. */ + if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1; + } + + if (doflush) { + p = (unsigned char*) r->io.fdset.buf; + len = sdslen(r->io.fdset.buf); + } + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + int broken = 0; + for (j = 0; j < r->io.fdset.numfds; j++) { + if (r->io.fdset.state[j] != 0) { + /* Skip FDs alraedy in error. */ + broken++; + continue; + } + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) { + /* With blocking sockets, which is the sole user of this + * rio target, EWOULDBLOCK is returned only because of + * the SO_SNDTIMEO socket option, so we translate the error + * into one more recognizable by the user. */ + if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; + break; + } + nwritten += retval; + } + + if (nwritten != count) { + /* Mark this FD as broken. */ + r->io.fdset.state[j] = errno; + if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO; + } + } + if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */ + p += count; + len -= count; + r->io.fdset.pos += count; + } + + if (doflush) sdsclear(r->io.fdset.buf); + return 1; } +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetRead(rio *r, void *buf, size_t len) { + REDIS_NOTUSED(r); + REDIS_NOTUSED(buf); + REDIS_NOTUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdsetTell(rio *r) { + return r->io.fdset.pos; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdsetFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdsetWrite(r,NULL,0); +} + +static const rio rioFdsetIO = { + rioFdsetRead, + rioFdsetWrite, + rioFdsetTell, + rioFdsetFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFdset(rio *r, int *fds, int numfds) { + int j; + + *r = rioFdsetIO; + r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + r->io.fdset.state = zmalloc(sizeof(int)*numfds); + memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0; + r->io.fdset.numfds = numfds; + r->io.fdset.pos = 0; + r->io.fdset.buf = sdsempty(); +} + +void rioFreeFdset(rio *r) { + zfree(r->io.fdset.fds); + zfree(r->io.fdset.state); + sdsfree(r->io.fdset.buf); +} + +/* ---------------------------- Generic functions ---------------------------- */ + /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { @@ -157,7 +303,8 @@ void rioSetAutoSync(rio *r, off_t bytes) { r->io.file.autosync = bytes; } -/* ------------------------------ Higher level interface --------------------------- +/* --------------------------- Higher level interface -------------------------- + * * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ diff --git a/src/rio.h b/src/rio.h index 2d12c6cc..e5fa0cd3 100644 --- a/src/rio.h +++ b/src/rio.h @@ -43,6 +43,7 @@ struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); + int (*flush)(struct _rio *); /* The update_cksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf @@ -61,15 +62,25 @@ struct _rio { /* Backend-specific vars. */ union { + /* In-memory buffer target. */ struct { sds ptr; off_t pos; } buffer; + /* Stdio file pointer target. */ struct { FILE *fp; off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* Multiple FDs target (used to write to N sockets). */ + struct { + int *fds; /* File descriptors. */ + int *state; /* Error state of each fd. 0 (if ok) or errno. */ + int numfds; + off_t pos; + sds buf; + } fdset; } io; }; @@ -109,8 +120,13 @@ static inline off_t rioTell(rio *r) { return r->tell(r); } +static inline int rioFlush(rio *r) { + return r->flush(r); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFdset(rio *r, int *fds, int numfds); size_t rioWriteBulkCount(rio *r, char prefix, int count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); diff --git a/src/syncio.c b/src/syncio.c index 8810a842..ac2a4a37 100644 --- a/src/syncio.c +++ b/src/syncio.c @@ -139,6 +139,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) { *ptr = '\0'; nread++; } + size--; } return nread; } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 767349e5..d2668d73 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -94,79 +94,82 @@ start_server {tags {"repl"}} { } } -start_server {tags {"repl"}} { - set master [srv 0 client] - set master_host [srv 0 host] - set master_port [srv 0 port] - set slaves {} - set load_handle0 [start_write_load $master_host $master_port 3] - set load_handle1 [start_write_load $master_host $master_port 5] - set load_handle2 [start_write_load $master_host $master_port 20] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - start_server {} { - lappend slaves [srv 0 client] +foreach dl {no yes} { + start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync $dl + set master_host [srv 0 host] + set master_port [srv 0 port] + set slaves {} + set load_handle0 [start_write_load $master_host $master_port 3] + set load_handle1 [start_write_load $master_host $master_port 5] + set load_handle2 [start_write_load $master_host $master_port 20] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] start_server {} { lappend slaves [srv 0 client] start_server {} { lappend slaves [srv 0 client] - test "Connect multiple slaves at the same time (issue #141)" { - # Send SALVEOF commands to slaves - [lindex $slaves 0] slaveof $master_host $master_port - [lindex $slaves 1] slaveof $master_host $master_port - [lindex $slaves 2] slaveof $master_host $master_port + start_server {} { + lappend slaves [srv 0 client] + test "Connect multiple slaves at the same time (issue #141), diskless=$dl" { + # Send SALVEOF commands to slaves + [lindex $slaves 0] slaveof $master_host $master_port + [lindex $slaves 1] slaveof $master_host $master_port + [lindex $slaves 2] slaveof $master_host $master_port - # Wait for all the three slaves to reach the "online" state - set retry 500 - while {$retry} { - set info [r -3 info] - if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { - break - } else { - incr retry -1 - after 100 + # Wait for all the three slaves to reach the "online" state + set retry 500 + while {$retry} { + set info [r -3 info] + if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slaves not correctly synchronized" } - } - if {$retry == 0} { - error "assertion:Slaves not correctly synchronized" - } - # Stop the write load - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 + # Stop the write load + stop_write_load $load_handle0 + stop_write_load $load_handle1 + stop_write_load $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 - # Wait that slaves exit the "loading" state - wait_for_condition 500 100 { - ![string match {*loading:1*} [[lindex $slaves 0] info]] && - ![string match {*loading:1*} [[lindex $slaves 1] info]] && - ![string match {*loading:1*} [[lindex $slaves 2] info]] - } else { - fail "Slaves still loading data after too much time" + # Wait that slaves exit the "loading" state + wait_for_condition 500 100 { + ![string match {*loading:1*} [[lindex $slaves 0] info]] && + ![string match {*loading:1*} [[lindex $slaves 1] info]] && + ![string match {*loading:1*} [[lindex $slaves 2] info]] + } else { + fail "Slaves still loading data after too much time" + } + + # Make sure that slaves and master have same number of keys + wait_for_condition 500 100 { + [$master dbsize] == [[lindex $slaves 0] dbsize] && + [$master dbsize] == [[lindex $slaves 1] dbsize] && + [$master dbsize] == [[lindex $slaves 2] dbsize] + } else { + fail "Different number of keys between masted and slave after too long time." + } + + # Check digests + set digest [$master debug digest] + set digest0 [[lindex $slaves 0] debug digest] + set digest1 [[lindex $slaves 1] debug digest] + set digest2 [[lindex $slaves 2] debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + assert {$digest eq $digest1} + assert {$digest eq $digest2} } - - # Make sure that slaves and master have same number of keys - wait_for_condition 500 100 { - [$master dbsize] == [[lindex $slaves 0] dbsize] && - [$master dbsize] == [[lindex $slaves 1] dbsize] && - [$master dbsize] == [[lindex $slaves 2] dbsize] - } else { - fail "Different number of keys between masted and slave after too long time." - } - - # Check digests - set digest [$master debug digest] - set digest0 [[lindex $slaves 0] debug digest] - set digest1 [[lindex $slaves 1] debug digest] - set digest2 [[lindex $slaves 2] debug digest] - assert {$digest ne 0000000000000000000000000000000000000000} - assert {$digest eq $digest0} - assert {$digest eq $digest1} - assert {$digest eq $digest2} - } - } + } + } } } }