mirror of
https://github.com/fluencelabs/redis
synced 2025-04-22 09:02:13 +00:00
Merge branch 'memsync' into unstable
This commit is contained in:
commit
6fbaeddf3f
39
redis.conf
39
redis.conf
@ -240,6 +240,45 @@ slave-serve-stale-data yes
|
|||||||
# administrative / dangerous commands.
|
# administrative / dangerous commands.
|
||||||
slave-read-only yes
|
slave-read-only yes
|
||||||
|
|
||||||
|
# Replication SYNC strategy: disk or socket.
|
||||||
|
#
|
||||||
|
# New slaves and reconnecting slaves that are not able to continue the replication
|
||||||
|
# process just receiving differences, need to do what is called a "full
|
||||||
|
# synchronization". An RDB file is transmitted from the master to the slaves.
|
||||||
|
# The transmission can happen in two different ways:
|
||||||
|
#
|
||||||
|
# 1) Disk-backed: The Redis master creates a new process that writes the RDB
|
||||||
|
# file on disk. Later the file is transferred by the parent
|
||||||
|
# process to the slaves incrementally.
|
||||||
|
# 2) Diskless: The Redis master creates a new process that directly writes the
|
||||||
|
# RDB file to slave sockets, without touching the disk at all.
|
||||||
|
#
|
||||||
|
# With disk-backed replication, while the RDB file is generated, more slaves
|
||||||
|
# can be queued and served with the RDB file as soon as the current child producing
|
||||||
|
# the RDB file finishes its work. With diskless replication instead once
|
||||||
|
# the transfer starts, new slaves arriving will be queued and a new transfer
|
||||||
|
# will start when the current one terminates.
|
||||||
|
#
|
||||||
|
# When diskless replication is used, the master waits a configurable amount of
|
||||||
|
# time (in seconds) before starting the transfer in the hope that multiple slaves
|
||||||
|
# will arrive and the transfer can be parallelized.
|
||||||
|
#
|
||||||
|
# With slow disks and fast (large bandwidth) networks, diskless replication
|
||||||
|
# works better.
|
||||||
|
repl-diskless-sync no
|
||||||
|
|
||||||
|
# When diskless replication is enabled, it is possible to configure the delay
|
||||||
|
# the server waits in order to spawn the child that trnasfers the RDB via socket
|
||||||
|
# to the slaves.
|
||||||
|
#
|
||||||
|
# This is important since once the transfer starts, it is not possible to serve
|
||||||
|
# new slaves arriving, that will be queued for the next RDB transfer, so the server
|
||||||
|
# waits a delay in order to let more slaves arrive.
|
||||||
|
#
|
||||||
|
# The delay is specified in seconds, and by default is 5 seconds. To disable
|
||||||
|
# it entirely just set it to 0 seconds and the transfer will start ASAP.
|
||||||
|
repl-diskless-sync-delay 5
|
||||||
|
|
||||||
# Slaves send PINGs to server in a predefined interval. It's possible to change
|
# Slaves send PINGs to server in a predefined interval. It's possible to change
|
||||||
# this interval with the repl_ping_slave_period option. The default value is 10
|
# this interval with the repl_ping_slave_period option. The default value is 10
|
||||||
# seconds.
|
# seconds.
|
||||||
|
36
src/anet.c
36
src/anet.c
@ -34,6 +34,7 @@
|
|||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
|
#include <sys/time.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
@ -57,24 +58,37 @@ static void anetSetError(char *err, const char *fmt, ...)
|
|||||||
va_end(ap);
|
va_end(ap);
|
||||||
}
|
}
|
||||||
|
|
||||||
int anetNonBlock(char *err, int fd)
|
int anetSetBlock(char *err, int fd, int non_block) {
|
||||||
{
|
|
||||||
int flags;
|
int flags;
|
||||||
|
|
||||||
/* Set the socket non-blocking.
|
/* Set the socket blocking (if non_block is zero) or non-blocking.
|
||||||
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
|
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
|
||||||
* interrupted by a signal. */
|
* interrupted by a signal. */
|
||||||
if ((flags = fcntl(fd, F_GETFL)) == -1) {
|
if ((flags = fcntl(fd, F_GETFL)) == -1) {
|
||||||
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
|
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
|
||||||
return ANET_ERR;
|
return ANET_ERR;
|
||||||
}
|
}
|
||||||
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
|
|
||||||
|
if (non_block)
|
||||||
|
flags |= O_NONBLOCK;
|
||||||
|
else
|
||||||
|
flags &= ~O_NONBLOCK;
|
||||||
|
|
||||||
|
if (fcntl(fd, F_SETFL, flags) == -1) {
|
||||||
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
|
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
|
||||||
return ANET_ERR;
|
return ANET_ERR;
|
||||||
}
|
}
|
||||||
return ANET_OK;
|
return ANET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int anetNonBlock(char *err, int fd) {
|
||||||
|
return anetSetBlock(err,fd,1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int anetBlock(char *err, int fd) {
|
||||||
|
return anetSetBlock(err,fd,0);
|
||||||
|
}
|
||||||
|
|
||||||
/* Set TCP keep alive option to detect dead peers. The interval option
|
/* Set TCP keep alive option to detect dead peers. The interval option
|
||||||
* is only used for Linux as we are using Linux-specific APIs to set
|
* is only used for Linux as we are using Linux-specific APIs to set
|
||||||
* the probe send time, interval, and count. */
|
* the probe send time, interval, and count. */
|
||||||
@ -165,6 +179,20 @@ int anetTcpKeepAlive(char *err, int fd)
|
|||||||
return ANET_OK;
|
return ANET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified
|
||||||
|
* number of milliseconds, or disable it if the 'ms' argument is zero. */
|
||||||
|
int anetSendTimeout(char *err, int fd, long long ms) {
|
||||||
|
struct timeval tv;
|
||||||
|
|
||||||
|
tv.tv_sec = ms/1000;
|
||||||
|
tv.tv_usec = (ms%1000)*1000;
|
||||||
|
if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
|
||||||
|
anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno));
|
||||||
|
return ANET_ERR;
|
||||||
|
}
|
||||||
|
return ANET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
|
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
|
||||||
* do the actual work. It resolves the hostname "host" and set the string
|
* do the actual work. It resolves the hostname "host" and set the string
|
||||||
* representation of the IP address into the buffer pointed by "ipbuf".
|
* representation of the IP address into the buffer pointed by "ipbuf".
|
||||||
|
@ -62,9 +62,11 @@ int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port)
|
|||||||
int anetUnixAccept(char *err, int serversock);
|
int anetUnixAccept(char *err, int serversock);
|
||||||
int anetWrite(int fd, char *buf, int count);
|
int anetWrite(int fd, char *buf, int count);
|
||||||
int anetNonBlock(char *err, int fd);
|
int anetNonBlock(char *err, int fd);
|
||||||
|
int anetBlock(char *err, int fd);
|
||||||
int anetEnableTcpNoDelay(char *err, int fd);
|
int anetEnableTcpNoDelay(char *err, int fd);
|
||||||
int anetDisableTcpNoDelay(char *err, int fd);
|
int anetDisableTcpNoDelay(char *err, int fd);
|
||||||
int anetTcpKeepAlive(char *err, int fd);
|
int anetTcpKeepAlive(char *err, int fd);
|
||||||
|
int anetSendTimeout(char *err, int fd, long long ms);
|
||||||
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
|
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
|
||||||
int anetKeepAlive(char *err, int fd, int interval);
|
int anetKeepAlive(char *err, int fd, int interval);
|
||||||
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
|
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
|
||||||
|
26
src/config.c
26
src/config.c
@ -270,6 +270,16 @@ void loadServerConfigFromString(char *config) {
|
|||||||
if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
|
if ((server.repl_disable_tcp_nodelay = yesnotoi(argv[1])) == -1) {
|
||||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||||
}
|
}
|
||||||
|
} else if (!strcasecmp(argv[0],"repl-diskless-sync") && argc==2) {
|
||||||
|
if ((server.repl_diskless_sync = yesnotoi(argv[1])) == -1) {
|
||||||
|
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
|
||||||
|
server.repl_diskless_sync_delay = atoi(argv[1]);
|
||||||
|
if (server.repl_diskless_sync_delay < 0) {
|
||||||
|
err = "repl-diskless-sync-delay can't be negative";
|
||||||
|
goto loaderr;
|
||||||
|
}
|
||||||
} else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"repl-backlog-size") && argc == 2) {
|
||||||
long long size = memtoll(argv[1],NULL);
|
long long size = memtoll(argv[1],NULL);
|
||||||
if (size <= 0) {
|
if (size <= 0) {
|
||||||
@ -284,7 +294,7 @@ void loadServerConfigFromString(char *config) {
|
|||||||
goto loaderr;
|
goto loaderr;
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
|
||||||
server.masterauth = zstrdup(argv[1]);
|
server.masterauth = zstrdup(argv[1]);
|
||||||
} else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) {
|
} else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) {
|
||||||
if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
|
if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
|
||||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||||
@ -911,6 +921,15 @@ void configSetCommand(redisClient *c) {
|
|||||||
|
|
||||||
if (yn == -1) goto badfmt;
|
if (yn == -1) goto badfmt;
|
||||||
server.repl_disable_tcp_nodelay = yn;
|
server.repl_disable_tcp_nodelay = yn;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync")) {
|
||||||
|
int yn = yesnotoi(o->ptr);
|
||||||
|
|
||||||
|
if (yn == -1) goto badfmt;
|
||||||
|
server.repl_diskless_sync = yn;
|
||||||
|
} else if (!strcasecmp(c->argv[2]->ptr,"repl-diskless-sync-delay")) {
|
||||||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
|
ll < 0) goto badfmt;
|
||||||
|
server.repl_diskless_sync_delay = ll;
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
|
} else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
|
||||||
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
|
||||||
ll < 0) goto badfmt;
|
ll < 0) goto badfmt;
|
||||||
@ -1049,6 +1068,7 @@ void configGetCommand(redisClient *c) {
|
|||||||
config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout);
|
config_get_numerical_field("cluster-node-timeout",server.cluster_node_timeout);
|
||||||
config_get_numerical_field("cluster-migration-barrier",server.cluster_migration_barrier);
|
config_get_numerical_field("cluster-migration-barrier",server.cluster_migration_barrier);
|
||||||
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
|
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
|
||||||
|
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
|
||||||
|
|
||||||
/* Bool (yes/no) values */
|
/* Bool (yes/no) values */
|
||||||
config_get_bool_field("cluster-require-full-coverage",
|
config_get_bool_field("cluster-require-full-coverage",
|
||||||
@ -1067,6 +1087,8 @@ void configGetCommand(redisClient *c) {
|
|||||||
config_get_bool_field("activerehashing", server.activerehashing);
|
config_get_bool_field("activerehashing", server.activerehashing);
|
||||||
config_get_bool_field("repl-disable-tcp-nodelay",
|
config_get_bool_field("repl-disable-tcp-nodelay",
|
||||||
server.repl_disable_tcp_nodelay);
|
server.repl_disable_tcp_nodelay);
|
||||||
|
config_get_bool_field("repl-diskless-sync",
|
||||||
|
server.repl_diskless_sync);
|
||||||
config_get_bool_field("aof-rewrite-incremental-fsync",
|
config_get_bool_field("aof-rewrite-incremental-fsync",
|
||||||
server.aof_rewrite_incremental_fsync);
|
server.aof_rewrite_incremental_fsync);
|
||||||
config_get_bool_field("aof-load-truncated",
|
config_get_bool_field("aof-load-truncated",
|
||||||
@ -1792,6 +1814,8 @@ int rewriteConfig(char *path) {
|
|||||||
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,REDIS_DEFAULT_REPL_BACKLOG_SIZE);
|
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,REDIS_DEFAULT_REPL_BACKLOG_SIZE);
|
||||||
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
|
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
|
||||||
rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY);
|
rewriteConfigYesNoOption(state,"repl-disable-tcp-nodelay",server.repl_disable_tcp_nodelay,REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY);
|
||||||
|
rewriteConfigYesNoOption(state,"repl-diskless-sync",server.repl_diskless_sync,REDIS_DEFAULT_REPL_DISKLESS_SYNC);
|
||||||
|
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
|
||||||
rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY);
|
rewriteConfigNumericalOption(state,"slave-priority",server.slave_priority,REDIS_DEFAULT_SLAVE_PRIORITY);
|
||||||
rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE);
|
rewriteConfigNumericalOption(state,"min-slaves-to-write",server.repl_min_slaves_to_write,REDIS_DEFAULT_MIN_SLAVES_TO_WRITE);
|
||||||
rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG);
|
rewriteConfigNumericalOption(state,"min-slaves-max-lag",server.repl_min_slaves_max_lag,REDIS_DEFAULT_MIN_SLAVES_MAX_LAG);
|
||||||
|
@ -678,12 +678,8 @@ void freeClient(redisClient *c) {
|
|||||||
|
|
||||||
/* Log link disconnection with slave */
|
/* Log link disconnection with slave */
|
||||||
if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) {
|
if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) {
|
||||||
char ip[REDIS_IP_STR_LEN];
|
redisLog(REDIS_WARNING,"Connection with slave %s lost.",
|
||||||
|
replicationGetSlaveName(c));
|
||||||
if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
|
|
||||||
redisLog(REDIS_WARNING,"Connection with slave %s:%d lost.",
|
|
||||||
ip, c->slave_listening_port);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free the query buffer */
|
/* Free the query buffer */
|
||||||
|
390
src/rdb.c
390
src/rdb.c
@ -627,17 +627,100 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
|
/* Produces a dump of the database in RDB format sending it to the specified
|
||||||
int rdbSave(char *filename) {
|
* Redis I/O channel. On success REDIS_OK is returned, otherwise REDIS_ERR
|
||||||
|
* is returned and part of the output, or all the output, can be
|
||||||
|
* missing because of I/O errors.
|
||||||
|
*
|
||||||
|
* When the function returns REDIS_ERR and if 'error' is not NULL, the
|
||||||
|
* integer pointed by 'error' is set to the value of errno just after the I/O
|
||||||
|
* error. */
|
||||||
|
int rdbSaveRio(rio *rdb, int *error) {
|
||||||
dictIterator *di = NULL;
|
dictIterator *di = NULL;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
char tmpfile[256];
|
|
||||||
char magic[10];
|
char magic[10];
|
||||||
int j;
|
int j;
|
||||||
long long now = mstime();
|
long long now = mstime();
|
||||||
|
uint64_t cksum;
|
||||||
|
|
||||||
|
if (server.rdb_checksum)
|
||||||
|
rdb->update_cksum = rioGenericUpdateChecksum;
|
||||||
|
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
|
||||||
|
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
|
||||||
|
|
||||||
|
for (j = 0; j < server.dbnum; j++) {
|
||||||
|
redisDb *db = server.db+j;
|
||||||
|
dict *d = db->dict;
|
||||||
|
if (dictSize(d) == 0) continue;
|
||||||
|
di = dictGetSafeIterator(d);
|
||||||
|
if (!di) return REDIS_ERR;
|
||||||
|
|
||||||
|
/* Write the SELECT DB opcode */
|
||||||
|
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
|
||||||
|
if (rdbSaveLen(rdb,j) == -1) goto werr;
|
||||||
|
|
||||||
|
/* Iterate this DB writing every entry */
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
sds keystr = dictGetKey(de);
|
||||||
|
robj key, *o = dictGetVal(de);
|
||||||
|
long long expire;
|
||||||
|
|
||||||
|
initStaticStringObject(key,keystr);
|
||||||
|
expire = getExpire(db,&key);
|
||||||
|
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
}
|
||||||
|
di = NULL; /* So that we don't release it again on error. */
|
||||||
|
|
||||||
|
/* EOF opcode */
|
||||||
|
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
|
||||||
|
|
||||||
|
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
|
||||||
|
* loading code skips the check in this case. */
|
||||||
|
cksum = rdb->cksum;
|
||||||
|
memrev64ifbe(&cksum);
|
||||||
|
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
|
||||||
|
return REDIS_OK;
|
||||||
|
|
||||||
|
werr:
|
||||||
|
if (error) *error = errno;
|
||||||
|
if (di) dictReleaseIterator(di);
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This is just a wrapper to rdbSaveRio() that additionally adds a prefix
|
||||||
|
* and a suffix to the generated RDB dump. The prefix is:
|
||||||
|
*
|
||||||
|
* $EOF:<40 bytes unguessable hex string>\r\n
|
||||||
|
*
|
||||||
|
* While the suffix is the 40 bytes hex string we announced in the prefix.
|
||||||
|
* This way processes receiving the payload can understand when it ends
|
||||||
|
* without doing any processing of the content. */
|
||||||
|
int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
|
||||||
|
char eofmark[REDIS_EOF_MARK_SIZE];
|
||||||
|
|
||||||
|
getRandomHexChars(eofmark,REDIS_EOF_MARK_SIZE);
|
||||||
|
if (error) *error = 0;
|
||||||
|
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
|
||||||
|
if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
|
||||||
|
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
|
||||||
|
if (rdbSaveRio(rdb,error) == REDIS_ERR) goto werr;
|
||||||
|
if (rioWrite(rdb,eofmark,REDIS_EOF_MARK_SIZE) == 0) goto werr;
|
||||||
|
return REDIS_OK;
|
||||||
|
|
||||||
|
werr: /* Write error. */
|
||||||
|
/* Set 'error' only if not already set by rdbSaveRio() call. */
|
||||||
|
if (error && *error == 0) *error = errno;
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
|
||||||
|
int rdbSave(char *filename) {
|
||||||
|
char tmpfile[256];
|
||||||
FILE *fp;
|
FILE *fp;
|
||||||
rio rdb;
|
rio rdb;
|
||||||
uint64_t cksum;
|
int error;
|
||||||
|
|
||||||
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
||||||
fp = fopen(tmpfile,"w");
|
fp = fopen(tmpfile,"w");
|
||||||
@ -648,47 +731,10 @@ int rdbSave(char *filename) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rioInitWithFile(&rdb,fp);
|
rioInitWithFile(&rdb,fp);
|
||||||
if (server.rdb_checksum)
|
if (rdbSaveRio(&rdb,&error) == REDIS_ERR) {
|
||||||
rdb.update_cksum = rioGenericUpdateChecksum;
|
errno = error;
|
||||||
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
|
goto werr;
|
||||||
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
|
|
||||||
|
|
||||||
for (j = 0; j < server.dbnum; j++) {
|
|
||||||
redisDb *db = server.db+j;
|
|
||||||
dict *d = db->dict;
|
|
||||||
if (dictSize(d) == 0) continue;
|
|
||||||
di = dictGetSafeIterator(d);
|
|
||||||
if (!di) {
|
|
||||||
fclose(fp);
|
|
||||||
return REDIS_ERR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Write the SELECT DB opcode */
|
|
||||||
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
|
|
||||||
if (rdbSaveLen(&rdb,j) == -1) goto werr;
|
|
||||||
|
|
||||||
/* Iterate this DB writing every entry */
|
|
||||||
while((de = dictNext(di)) != NULL) {
|
|
||||||
sds keystr = dictGetKey(de);
|
|
||||||
robj key, *o = dictGetVal(de);
|
|
||||||
long long expire;
|
|
||||||
|
|
||||||
initStaticStringObject(key,keystr);
|
|
||||||
expire = getExpire(db,&key);
|
|
||||||
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
}
|
}
|
||||||
di = NULL; /* So that we don't release it again on error. */
|
|
||||||
|
|
||||||
/* EOF opcode */
|
|
||||||
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
|
|
||||||
|
|
||||||
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
|
|
||||||
* loading code skips the check in this case. */
|
|
||||||
cksum = rdb.cksum;
|
|
||||||
memrev64ifbe(&cksum);
|
|
||||||
if (rioWrite(&rdb,&cksum,8) == 0) goto werr;
|
|
||||||
|
|
||||||
/* Make sure data will not remain on the OS's output buffers */
|
/* Make sure data will not remain on the OS's output buffers */
|
||||||
if (fflush(fp) == EOF) goto werr;
|
if (fflush(fp) == EOF) goto werr;
|
||||||
@ -712,7 +758,6 @@ werr:
|
|||||||
fclose(fp);
|
fclose(fp);
|
||||||
unlink(tmpfile);
|
unlink(tmpfile);
|
||||||
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
|
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
|
||||||
if (di) dictReleaseIterator(di);
|
|
||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -757,6 +802,7 @@ int rdbSaveBackground(char *filename) {
|
|||||||
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
|
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
|
||||||
server.rdb_save_time_start = time(NULL);
|
server.rdb_save_time_start = time(NULL);
|
||||||
server.rdb_child_pid = childpid;
|
server.rdb_child_pid = childpid;
|
||||||
|
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK;
|
||||||
updateDictResizePolicy();
|
updateDictResizePolicy();
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
@ -1191,8 +1237,9 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
|
|||||||
return REDIS_ERR; /* Just to avoid warning */
|
return REDIS_ERR; /* Just to avoid warning */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* A background saving child (BGSAVE) terminated its work. Handle this. */
|
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
||||||
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
* This function covers the case of actual BGSAVEs. */
|
||||||
|
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
|
||||||
if (!bysignal && exitcode == 0) {
|
if (!bysignal && exitcode == 0) {
|
||||||
redisLog(REDIS_NOTICE,
|
redisLog(REDIS_NOTICE,
|
||||||
"Background saving terminated with success");
|
"Background saving terminated with success");
|
||||||
@ -1217,11 +1264,258 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
|||||||
server.lastbgsave_status = REDIS_ERR;
|
server.lastbgsave_status = REDIS_ERR;
|
||||||
}
|
}
|
||||||
server.rdb_child_pid = -1;
|
server.rdb_child_pid = -1;
|
||||||
|
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
|
||||||
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
|
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
|
||||||
server.rdb_save_time_start = -1;
|
server.rdb_save_time_start = -1;
|
||||||
/* Possibly there are slaves waiting for a BGSAVE in order to be served
|
/* Possibly there are slaves waiting for a BGSAVE in order to be served
|
||||||
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
|
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
|
||||||
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR);
|
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_DISK);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
||||||
|
* This function covers the case of RDB -> Salves socket transfers for
|
||||||
|
* diskless replication. */
|
||||||
|
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
|
||||||
|
uint64_t *ok_slaves;
|
||||||
|
|
||||||
|
if (!bysignal && exitcode == 0) {
|
||||||
|
redisLog(REDIS_NOTICE,
|
||||||
|
"Background RDB transfer terminated with success");
|
||||||
|
} else if (!bysignal && exitcode != 0) {
|
||||||
|
redisLog(REDIS_WARNING, "Background transfer error");
|
||||||
|
} else {
|
||||||
|
redisLog(REDIS_WARNING,
|
||||||
|
"Background transfer terminated by signal %d", bysignal);
|
||||||
|
}
|
||||||
|
server.rdb_child_pid = -1;
|
||||||
|
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
|
||||||
|
server.rdb_save_time_start = -1;
|
||||||
|
|
||||||
|
/* If the child returns an OK exit code, read the set of slave client
|
||||||
|
* IDs and the associated status code. We'll terminate all the slaves
|
||||||
|
* in error state.
|
||||||
|
*
|
||||||
|
* If the process returned an error, consider the list of slaves that
|
||||||
|
* can continue to be emtpy, so that it's just a special case of the
|
||||||
|
* normal code path. */
|
||||||
|
ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
|
||||||
|
ok_slaves[0] = 0;
|
||||||
|
if (!bysignal && exitcode == 0) {
|
||||||
|
int readlen = sizeof(uint64_t);
|
||||||
|
|
||||||
|
if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
|
||||||
|
readlen)
|
||||||
|
{
|
||||||
|
readlen = ok_slaves[0]*sizeof(uint64_t)*2;
|
||||||
|
|
||||||
|
/* Make space for enough elements as specified by the first
|
||||||
|
* uint64_t element in the array. */
|
||||||
|
ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
|
||||||
|
if (readlen &&
|
||||||
|
read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
|
||||||
|
readlen) != readlen)
|
||||||
|
{
|
||||||
|
ok_slaves[0] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(server.rdb_pipe_read_result_from_child);
|
||||||
|
close(server.rdb_pipe_write_result_to_parent);
|
||||||
|
|
||||||
|
/* We can continue the replication process with all the slaves that
|
||||||
|
* correctly received the full payload. Others are terminated. */
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
|
||||||
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
||||||
|
uint64_t j;
|
||||||
|
int errorcode = 0;
|
||||||
|
|
||||||
|
/* Search for the slave ID in the reply. In order for a slave to
|
||||||
|
* continue the replication process, we need to find it in the list,
|
||||||
|
* and it must have an error code set to 0 (which means success). */
|
||||||
|
for (j = 0; j < ok_slaves[0]; j++) {
|
||||||
|
if (slave->id == ok_slaves[2*j+1]) {
|
||||||
|
errorcode = ok_slaves[2*j+2];
|
||||||
|
break; /* Found in slaves list. */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (j == ok_slaves[0] || errorcode != 0) {
|
||||||
|
redisLog(REDIS_WARNING,
|
||||||
|
"Closing slave %s: child->slave RDB transfer failed: %s",
|
||||||
|
replicationGetSlaveName(slave),
|
||||||
|
(errorcode == 0) ? "RDB transfer child aborted"
|
||||||
|
: strerror(errorcode));
|
||||||
|
freeClient(slave);
|
||||||
|
} else {
|
||||||
|
redisLog(REDIS_WARNING,
|
||||||
|
"Slave %s correctly received the streamed RDB file.",
|
||||||
|
replicationGetSlaveName(slave));
|
||||||
|
/* Restore the socket as non-blocking. */
|
||||||
|
anetNonBlock(NULL,slave->fd);
|
||||||
|
anetSendTimeout(NULL,slave->fd,0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zfree(ok_slaves);
|
||||||
|
|
||||||
|
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* When a background RDB saving/transfer terminates, call the right handler. */
|
||||||
|
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
||||||
|
switch(server.rdb_child_type) {
|
||||||
|
case REDIS_RDB_CHILD_TYPE_DISK:
|
||||||
|
backgroundSaveDoneHandlerDisk(exitcode,bysignal);
|
||||||
|
break;
|
||||||
|
case REDIS_RDB_CHILD_TYPE_SOCKET:
|
||||||
|
backgroundSaveDoneHandlerSocket(exitcode,bysignal);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
redisPanic("Unknown RDB child type.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
|
||||||
|
* that are currently in REDIS_REPL_WAIT_BGSAVE_START state. */
|
||||||
|
int rdbSaveToSlavesSockets(void) {
|
||||||
|
int *fds;
|
||||||
|
uint64_t *clientids;
|
||||||
|
int numfds;
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
pid_t childpid;
|
||||||
|
long long start;
|
||||||
|
int pipefds[2];
|
||||||
|
|
||||||
|
if (server.rdb_child_pid != -1) return REDIS_ERR;
|
||||||
|
|
||||||
|
/* Before to fork, create a pipe that will be used in order to
|
||||||
|
* send back to the parent the IDs of the slaves that successfully
|
||||||
|
* received all the writes. */
|
||||||
|
if (pipe(pipefds) == -1) return REDIS_ERR;
|
||||||
|
server.rdb_pipe_read_result_from_child = pipefds[0];
|
||||||
|
server.rdb_pipe_write_result_to_parent = pipefds[1];
|
||||||
|
|
||||||
|
/* Collect the file descriptors of the slaves we want to transfer
|
||||||
|
* the RDB to, which are i WAIT_BGSAVE_START state. */
|
||||||
|
fds = zmalloc(sizeof(int)*listLength(server.slaves));
|
||||||
|
/* We also allocate an array of corresponding client IDs. This will
|
||||||
|
* be useful for the child process in order to build the report
|
||||||
|
* (sent via unix pipe) that will be sent to the parent. */
|
||||||
|
clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
|
||||||
|
numfds = 0;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
|
||||||
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
|
||||||
|
clientids[numfds] = slave->id;
|
||||||
|
fds[numfds++] = slave->fd;
|
||||||
|
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
||||||
|
/* Put the socket in non-blocking mode to simplify RDB transfer.
|
||||||
|
* We'll restore it when the children returns (since duped socket
|
||||||
|
* will share the O_NONBLOCK attribute with the parent). */
|
||||||
|
anetBlock(NULL,slave->fd);
|
||||||
|
anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create the child process. */
|
||||||
|
start = ustime();
|
||||||
|
if ((childpid = fork()) == 0) {
|
||||||
|
/* Child */
|
||||||
|
int retval;
|
||||||
|
rio slave_sockets;
|
||||||
|
|
||||||
|
rioInitWithFdset(&slave_sockets,fds,numfds);
|
||||||
|
zfree(fds);
|
||||||
|
|
||||||
|
closeListeningSockets(0);
|
||||||
|
redisSetProcTitle("redis-rdb-to-slaves");
|
||||||
|
|
||||||
|
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
|
||||||
|
if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0)
|
||||||
|
retval = REDIS_ERR;
|
||||||
|
|
||||||
|
if (retval == REDIS_OK) {
|
||||||
|
size_t private_dirty = zmalloc_get_private_dirty();
|
||||||
|
|
||||||
|
if (private_dirty) {
|
||||||
|
redisLog(REDIS_NOTICE,
|
||||||
|
"RDB: %zu MB of memory used by copy-on-write",
|
||||||
|
private_dirty/(1024*1024));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If we are returning OK, at least one slave was served
|
||||||
|
* with the RDB file as expected, so we need to send a report
|
||||||
|
* to the parent via the pipe. The format of the message is:
|
||||||
|
*
|
||||||
|
* <len> <slave[0].id> <slave[0].error> ...
|
||||||
|
*
|
||||||
|
* len, slave IDs, and slave errors, are all uint64_t integers,
|
||||||
|
* so basically the reply is composed of 64 bits for the len field
|
||||||
|
* plus 2 additional 64 bit integers for each entry, for a total
|
||||||
|
* of 'len' entries.
|
||||||
|
*
|
||||||
|
* The 'id' represents the slave's client ID, so that the master
|
||||||
|
* can match the report with a specific slave, and 'error' is
|
||||||
|
* set to 0 if the replication process terminated with a success
|
||||||
|
* or the error code if an error occurred. */
|
||||||
|
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
|
||||||
|
uint64_t *len = msg;
|
||||||
|
uint64_t *ids = len+1;
|
||||||
|
int j, msglen;
|
||||||
|
|
||||||
|
*len = numfds;
|
||||||
|
for (j = 0; j < numfds; j++) {
|
||||||
|
*ids++ = clientids[j];
|
||||||
|
*ids++ = slave_sockets.io.fdset.state[j];
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Write the message to the parent. If we have no good slaves or
|
||||||
|
* we are unable to transfer the message to the parent, we exit
|
||||||
|
* with an error so that the parent will abort the replication
|
||||||
|
* process with all the childre that were waiting. */
|
||||||
|
msglen = sizeof(uint64_t)*(1+2*numfds);
|
||||||
|
if (*len == 0 ||
|
||||||
|
write(server.rdb_pipe_write_result_to_parent,msg,msglen)
|
||||||
|
!= msglen)
|
||||||
|
{
|
||||||
|
retval = REDIS_ERR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exitFromChild((retval == REDIS_OK) ? 0 : 1);
|
||||||
|
} else {
|
||||||
|
/* Parent */
|
||||||
|
zfree(clientids); /* Not used by parent. Free ASAP. */
|
||||||
|
server.stat_fork_time = ustime()-start;
|
||||||
|
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
||||||
|
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
||||||
|
if (childpid == -1) {
|
||||||
|
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
|
||||||
|
strerror(errno));
|
||||||
|
zfree(fds);
|
||||||
|
close(pipefds[0]);
|
||||||
|
close(pipefds[1]);
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
|
redisLog(REDIS_NOTICE,"Background RDB transfer started by pid %d",childpid);
|
||||||
|
server.rdb_save_time_start = time(NULL);
|
||||||
|
server.rdb_child_pid = childpid;
|
||||||
|
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_SOCKET;
|
||||||
|
updateDictResizePolicy();
|
||||||
|
zfree(fds);
|
||||||
|
return REDIS_OK;
|
||||||
|
}
|
||||||
|
return REDIS_OK; /* unreached */
|
||||||
}
|
}
|
||||||
|
|
||||||
void saveCommand(redisClient *c) {
|
void saveCommand(redisClient *c) {
|
||||||
|
@ -101,6 +101,7 @@ int rdbSaveObjectType(rio *rdb, robj *o);
|
|||||||
int rdbLoadObjectType(rio *rdb);
|
int rdbLoadObjectType(rio *rdb);
|
||||||
int rdbLoad(char *filename);
|
int rdbLoad(char *filename);
|
||||||
int rdbSaveBackground(char *filename);
|
int rdbSaveBackground(char *filename);
|
||||||
|
int rdbSaveToSlavesSockets(void);
|
||||||
void rdbRemoveTempFile(pid_t childpid);
|
void rdbRemoveTempFile(pid_t childpid);
|
||||||
int rdbSave(char *filename);
|
int rdbSave(char *filename);
|
||||||
int rdbSaveObject(rio *rdb, robj *o);
|
int rdbSaveObject(rio *rdb, robj *o);
|
||||||
|
@ -1480,6 +1480,8 @@ void initServerConfig(void) {
|
|||||||
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
|
server.repl_slave_ro = REDIS_DEFAULT_SLAVE_READ_ONLY;
|
||||||
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||||
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
|
server.repl_disable_tcp_nodelay = REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY;
|
||||||
|
server.repl_diskless_sync = REDIS_DEFAULT_REPL_DISKLESS_SYNC;
|
||||||
|
server.repl_diskless_sync_delay = REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
|
||||||
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
|
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
|
||||||
server.master_repl_offset = 0;
|
server.master_repl_offset = 0;
|
||||||
|
|
||||||
@ -1768,6 +1770,7 @@ void initServer(void) {
|
|||||||
server.cronloops = 0;
|
server.cronloops = 0;
|
||||||
server.rdb_child_pid = -1;
|
server.rdb_child_pid = -1;
|
||||||
server.aof_child_pid = -1;
|
server.aof_child_pid = -1;
|
||||||
|
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
|
||||||
aofRewriteBufferReset();
|
aofRewriteBufferReset();
|
||||||
server.aof_buf = sdsempty();
|
server.aof_buf = sdsempty();
|
||||||
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
|
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
|
||||||
|
16
src/redis.h
16
src/redis.h
@ -96,6 +96,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define REDIS_REPL_TIMEOUT 60
|
#define REDIS_REPL_TIMEOUT 60
|
||||||
#define REDIS_REPL_PING_SLAVE_PERIOD 10
|
#define REDIS_REPL_PING_SLAVE_PERIOD 10
|
||||||
#define REDIS_RUN_ID_SIZE 40
|
#define REDIS_RUN_ID_SIZE 40
|
||||||
|
#define REDIS_EOF_MARK_SIZE 40
|
||||||
#define REDIS_OPS_SEC_SAMPLES 16
|
#define REDIS_OPS_SEC_SAMPLES 16
|
||||||
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
|
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
|
||||||
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
|
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
|
||||||
@ -113,6 +114,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define REDIS_DEFAULT_RDB_COMPRESSION 1
|
#define REDIS_DEFAULT_RDB_COMPRESSION 1
|
||||||
#define REDIS_DEFAULT_RDB_CHECKSUM 1
|
#define REDIS_DEFAULT_RDB_CHECKSUM 1
|
||||||
#define REDIS_DEFAULT_RDB_FILENAME "dump.rdb"
|
#define REDIS_DEFAULT_RDB_FILENAME "dump.rdb"
|
||||||
|
#define REDIS_DEFAULT_REPL_DISKLESS_SYNC 0
|
||||||
|
#define REDIS_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
|
||||||
#define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1
|
#define REDIS_DEFAULT_SLAVE_SERVE_STALE_DATA 1
|
||||||
#define REDIS_DEFAULT_SLAVE_READ_ONLY 1
|
#define REDIS_DEFAULT_SLAVE_READ_ONLY 1
|
||||||
#define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0
|
#define REDIS_DEFAULT_REPL_DISABLE_TCP_NODELAY 0
|
||||||
@ -361,6 +364,11 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define REDIS_PROPAGATE_AOF 1
|
#define REDIS_PROPAGATE_AOF 1
|
||||||
#define REDIS_PROPAGATE_REPL 2
|
#define REDIS_PROPAGATE_REPL 2
|
||||||
|
|
||||||
|
/* RDB active child save type. */
|
||||||
|
#define REDIS_RDB_CHILD_TYPE_NONE 0
|
||||||
|
#define REDIS_RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
|
||||||
|
#define REDIS_RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */
|
||||||
|
|
||||||
/* Keyspace changes notification classes. Every class is associated with a
|
/* Keyspace changes notification classes. Every class is associated with a
|
||||||
* character for configuration purposes. */
|
* character for configuration purposes. */
|
||||||
#define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */
|
#define REDIS_NOTIFY_KEYSPACE (1<<0) /* K */
|
||||||
@ -764,8 +772,11 @@ struct redisServer {
|
|||||||
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
|
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
|
||||||
time_t rdb_save_time_last; /* Time used by last RDB save run. */
|
time_t rdb_save_time_last; /* Time used by last RDB save run. */
|
||||||
time_t rdb_save_time_start; /* Current RDB save start time. */
|
time_t rdb_save_time_start; /* Current RDB save start time. */
|
||||||
|
int rdb_child_type; /* Type of save by active child. */
|
||||||
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
|
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
|
||||||
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
|
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
|
||||||
|
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
|
||||||
|
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
|
||||||
/* Propagation of commands in AOF / replication */
|
/* Propagation of commands in AOF / replication */
|
||||||
redisOpArray also_propagate; /* Additional command to propagate. */
|
redisOpArray also_propagate; /* Additional command to propagate. */
|
||||||
/* Logging */
|
/* Logging */
|
||||||
@ -790,6 +801,8 @@ struct redisServer {
|
|||||||
int repl_min_slaves_to_write; /* Min number of slaves to write. */
|
int repl_min_slaves_to_write; /* Min number of slaves to write. */
|
||||||
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
|
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
|
||||||
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
|
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
|
||||||
|
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
|
||||||
|
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
|
||||||
/* Replication (slave) */
|
/* Replication (slave) */
|
||||||
char *masterauth; /* AUTH with this password with master */
|
char *masterauth; /* AUTH with this password with master */
|
||||||
char *masterhost; /* Hostname of master */
|
char *masterhost; /* Hostname of master */
|
||||||
@ -1132,7 +1145,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
|||||||
/* Replication */
|
/* Replication */
|
||||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
||||||
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
|
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc);
|
||||||
void updateSlavesWaitingBgsave(int bgsaveerr);
|
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
|
||||||
void replicationCron(void);
|
void replicationCron(void);
|
||||||
void replicationHandleMasterDisconnection(void);
|
void replicationHandleMasterDisconnection(void);
|
||||||
void replicationCacheMaster(redisClient *c);
|
void replicationCacheMaster(redisClient *c);
|
||||||
@ -1149,6 +1162,7 @@ void unblockClientWaitingReplicas(redisClient *c);
|
|||||||
int replicationCountAcksByOffset(long long offset);
|
int replicationCountAcksByOffset(long long offset);
|
||||||
void replicationSendNewlineToMaster(void);
|
void replicationSendNewlineToMaster(void);
|
||||||
long long replicationGetSlaveOffset(void);
|
long long replicationGetSlaveOffset(void);
|
||||||
|
char *replicationGetSlaveName(redisClient *c);
|
||||||
|
|
||||||
/* Generic persistence functions */
|
/* Generic persistence functions */
|
||||||
void startLoading(FILE *fp);
|
void startLoading(FILE *fp);
|
||||||
|
@ -41,6 +41,30 @@ void replicationDiscardCachedMaster(void);
|
|||||||
void replicationResurrectCachedMaster(int newfd);
|
void replicationResurrectCachedMaster(int newfd);
|
||||||
void replicationSendAck(void);
|
void replicationSendAck(void);
|
||||||
|
|
||||||
|
/* --------------------------- Utility functions ---------------------------- */
|
||||||
|
|
||||||
|
/* Return the pointer to a string representing the slave ip:listening_port
|
||||||
|
* pair. Mostly useful for logging, since we want to log a slave using its
|
||||||
|
* IP address and it's listening port which is more clear for the user, for
|
||||||
|
* example: "Closing connection with slave 10.1.2.3:6380". */
|
||||||
|
char *replicationGetSlaveName(redisClient *c) {
|
||||||
|
static char buf[REDIS_PEER_ID_LEN];
|
||||||
|
char ip[REDIS_IP_STR_LEN];
|
||||||
|
|
||||||
|
ip[0] = '\0';
|
||||||
|
buf[0] = '\0';
|
||||||
|
if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
|
||||||
|
if (c->slave_listening_port)
|
||||||
|
snprintf(buf,sizeof(buf),"%s:%d",ip,c->slave_listening_port);
|
||||||
|
else
|
||||||
|
snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
|
||||||
|
} else {
|
||||||
|
snprintf(buf,sizeof(buf),"client id #%llu",
|
||||||
|
(unsigned long long) c->id);
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
/* ---------------------------------- MASTER -------------------------------- */
|
/* ---------------------------------- MASTER -------------------------------- */
|
||||||
|
|
||||||
void createReplicationBacklog(void) {
|
void createReplicationBacklog(void) {
|
||||||
@ -212,7 +236,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Write the command to every slave. */
|
/* Write the command to every slave. */
|
||||||
listRewind(slaves,&li);
|
listRewind(server.slaves,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
redisClient *slave = ln->value;
|
redisClient *slave = ln->value;
|
||||||
|
|
||||||
@ -345,7 +369,8 @@ int masterTryPartialResynchronization(redisClient *c) {
|
|||||||
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
|
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
|
||||||
master_runid, server.runid);
|
master_runid, server.runid);
|
||||||
} else {
|
} else {
|
||||||
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
|
redisLog(REDIS_NOTICE,"Full resync requested by slave %s",
|
||||||
|
replicationGetSlaveName(c));
|
||||||
}
|
}
|
||||||
goto need_full_resync;
|
goto need_full_resync;
|
||||||
}
|
}
|
||||||
@ -358,10 +383,10 @@ int masterTryPartialResynchronization(redisClient *c) {
|
|||||||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
|
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
|
||||||
{
|
{
|
||||||
redisLog(REDIS_NOTICE,
|
redisLog(REDIS_NOTICE,
|
||||||
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
|
"Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
|
||||||
if (psync_offset > server.master_repl_offset) {
|
if (psync_offset > server.master_repl_offset) {
|
||||||
redisLog(REDIS_WARNING,
|
redisLog(REDIS_WARNING,
|
||||||
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
|
"Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
|
||||||
}
|
}
|
||||||
goto need_full_resync;
|
goto need_full_resync;
|
||||||
}
|
}
|
||||||
@ -384,7 +409,9 @@ int masterTryPartialResynchronization(redisClient *c) {
|
|||||||
}
|
}
|
||||||
psync_len = addReplyReplicationBacklog(c,psync_offset);
|
psync_len = addReplyReplicationBacklog(c,psync_offset);
|
||||||
redisLog(REDIS_NOTICE,
|
redisLog(REDIS_NOTICE,
|
||||||
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
|
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
|
||||||
|
replicationGetSlaveName(c),
|
||||||
|
psync_len, psync_offset);
|
||||||
/* Note that we don't need to set the selected DB at server.slaveseldb
|
/* Note that we don't need to set the selected DB at server.slaveseldb
|
||||||
* to -1 to force the master to emit SELECT, since the slave already
|
* to -1 to force the master to emit SELECT, since the slave already
|
||||||
* has this state from the previous connection with the master. */
|
* has this state from the previous connection with the master. */
|
||||||
@ -408,6 +435,28 @@ need_full_resync:
|
|||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Start a BGSAVE for replication goals, which is, selecting the disk or
|
||||||
|
* socket target depending on the configuration, and making sure that
|
||||||
|
* the script cache is flushed before to start.
|
||||||
|
*
|
||||||
|
* Returns REDIS_OK on success or REDIS_ERR otherwise. */
|
||||||
|
int startBgsaveForReplication(void) {
|
||||||
|
int retval;
|
||||||
|
|
||||||
|
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
|
||||||
|
server.repl_diskless_sync ? "slaves sockets" : "disk");
|
||||||
|
|
||||||
|
if (server.repl_diskless_sync)
|
||||||
|
retval = rdbSaveToSlavesSockets();
|
||||||
|
else
|
||||||
|
retval = rdbSaveBackground(server.rdb_filename);
|
||||||
|
|
||||||
|
/* Flush the script cache, since we need that slave differences are
|
||||||
|
* accumulated without requiring slaves to match our cached scripts. */
|
||||||
|
if (retval == REDIS_OK) replicationScriptCacheFlush();
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
/* SYNC and PSYNC command implemenation. */
|
/* SYNC and PSYNC command implemenation. */
|
||||||
void syncCommand(redisClient *c) {
|
void syncCommand(redisClient *c) {
|
||||||
/* ignore SYNC if already slave or in monitor mode */
|
/* ignore SYNC if already slave or in monitor mode */
|
||||||
@ -429,7 +478,8 @@ void syncCommand(redisClient *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
|
redisLog(REDIS_NOTICE,"Slave %s asks for synchronization",
|
||||||
|
replicationGetSlaveName(c));
|
||||||
|
|
||||||
/* Try a partial resynchronization if this is a PSYNC command.
|
/* Try a partial resynchronization if this is a PSYNC command.
|
||||||
* If it fails, we continue with usual full resynchronization, however
|
* If it fails, we continue with usual full resynchronization, however
|
||||||
@ -465,10 +515,12 @@ void syncCommand(redisClient *c) {
|
|||||||
|
|
||||||
/* Here we need to check if there is a background saving operation
|
/* Here we need to check if there is a background saving operation
|
||||||
* in progress, or if it is required to start one */
|
* in progress, or if it is required to start one */
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.rdb_child_pid != -1 &&
|
||||||
|
server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
|
||||||
|
{
|
||||||
/* Ok a background save is in progress. Let's check if it is a good
|
/* Ok a background save is in progress. Let's check if it is a good
|
||||||
* one for replication, i.e. if there is another slave that is
|
* one for replication, i.e. if there is another slave that is
|
||||||
* registering differences since the server forked to save */
|
* registering differences since the server forked to save. */
|
||||||
redisClient *slave;
|
redisClient *slave;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
@ -486,21 +538,35 @@ void syncCommand(redisClient *c) {
|
|||||||
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
||||||
} else {
|
} else {
|
||||||
/* No way, we need to wait for the next BGSAVE in order to
|
/* No way, we need to wait for the next BGSAVE in order to
|
||||||
* register differences */
|
* register differences. */
|
||||||
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
||||||
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
|
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
|
||||||
}
|
}
|
||||||
|
} else if (server.rdb_child_pid != -1 &&
|
||||||
|
server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
|
||||||
|
{
|
||||||
|
/* There is an RDB child process but it is writing directly to
|
||||||
|
* children sockets. We need to wait for the next BGSAVE
|
||||||
|
* in order to synchronize. */
|
||||||
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
||||||
|
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
|
||||||
} else {
|
} else {
|
||||||
/* Ok we don't have a BGSAVE in progress, let's start one */
|
if (server.repl_diskless_sync) {
|
||||||
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
|
/* Diskless replication RDB child is created inside
|
||||||
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
|
* replicationCron() since we want to delay its start a
|
||||||
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
|
* few seconds to wait for more slaves to arrive. */
|
||||||
addReplyError(c,"Unable to perform background save");
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
||||||
return;
|
if (server.repl_diskless_sync_delay)
|
||||||
|
redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
|
||||||
|
} else {
|
||||||
|
/* Ok we don't have a BGSAVE in progress, let's start one. */
|
||||||
|
if (startBgsaveForReplication() != REDIS_OK) {
|
||||||
|
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
|
||||||
|
addReplyError(c,"Unable to perform background save");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
||||||
}
|
}
|
||||||
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
|
||||||
/* Flush the script cache for the new slave. */
|
|
||||||
replicationScriptCacheFlush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.repl_disable_tcp_nodelay)
|
if (server.repl_disable_tcp_nodelay)
|
||||||
@ -573,6 +639,29 @@ void replconfCommand(redisClient *c) {
|
|||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function puts a slave in the online state, and should be called just
|
||||||
|
* after a slave received the RDB file for the initial synchronization, and
|
||||||
|
* we are finally ready to send the incremental stream of commands.
|
||||||
|
*
|
||||||
|
* It does a few things:
|
||||||
|
*
|
||||||
|
* 1) Put the slave in ONLINE state.
|
||||||
|
* 2) Make sure the writable event is re-installed, since calling the SYNC
|
||||||
|
* command disables it, so that we can accumulate output buffer without
|
||||||
|
* sending it to the slave.
|
||||||
|
* 3) Update the count of good slaves. */
|
||||||
|
void putSlaveOnline(redisClient *slave) {
|
||||||
|
slave->replstate = REDIS_REPL_ONLINE;
|
||||||
|
slave->repl_ack_time = server.unixtime;
|
||||||
|
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
|
||||||
|
sendReplyToClient, slave) == AE_ERR) {
|
||||||
|
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
|
||||||
|
freeClient(slave);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
refreshGoodSlavesCount();
|
||||||
|
}
|
||||||
|
|
||||||
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
redisClient *slave = privdata;
|
redisClient *slave = privdata;
|
||||||
REDIS_NOTUSED(el);
|
REDIS_NOTUSED(el);
|
||||||
@ -623,26 +712,26 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
close(slave->repldbfd);
|
close(slave->repldbfd);
|
||||||
slave->repldbfd = -1;
|
slave->repldbfd = -1;
|
||||||
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
||||||
slave->replstate = REDIS_REPL_ONLINE;
|
putSlaveOnline(slave);
|
||||||
slave->repl_ack_time = server.unixtime;
|
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)");
|
||||||
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
|
|
||||||
sendReplyToClient, slave) == AE_ERR) {
|
|
||||||
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
|
|
||||||
freeClient(slave);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
refreshGoodSlavesCount();
|
|
||||||
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called at the end of every background saving.
|
/* This function is called at the end of every background saving,
|
||||||
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
|
* or when the replication RDB transfer strategy is modified from
|
||||||
* otherwise REDIS_ERR is passed to the function.
|
* disk to socket or the other way around.
|
||||||
*
|
*
|
||||||
* The goal of this function is to handle slaves waiting for a successful
|
* The goal of this function is to handle slaves waiting for a successful
|
||||||
* background saving in order to perform non-blocking synchronization. */
|
* background saving in order to perform non-blocking synchronization, and
|
||||||
void updateSlavesWaitingBgsave(int bgsaveerr) {
|
* to schedule a new BGSAVE if there are slaves that attached while a
|
||||||
|
* BGSAVE was in progress, but it was not a good one for replication (no
|
||||||
|
* other slave was accumulating differences).
|
||||||
|
*
|
||||||
|
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
|
||||||
|
* otherwise REDIS_ERR is passed to the function.
|
||||||
|
* The 'type' argument is the type of the child that terminated
|
||||||
|
* (if it had a disk or socket target). */
|
||||||
|
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
int startbgsave = 0;
|
int startbgsave = 0;
|
||||||
listIter li;
|
listIter li;
|
||||||
@ -657,37 +746,44 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
|
|||||||
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
||||||
struct redis_stat buf;
|
struct redis_stat buf;
|
||||||
|
|
||||||
if (bgsaveerr != REDIS_OK) {
|
/* If this was an RDB on disk save, we have to prepare to send
|
||||||
freeClient(slave);
|
* the RDB from disk to the slave socket. Otherwise if this was
|
||||||
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
|
* already an RDB -> Slaves socket transfer, used in the case of
|
||||||
continue;
|
* diskless replication, our work is trivial, we can just put
|
||||||
}
|
* the slave online. */
|
||||||
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
|
if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
|
||||||
redis_fstat(slave->repldbfd,&buf) == -1) {
|
putSlaveOnline(slave);
|
||||||
freeClient(slave);
|
redisLog(REDIS_NOTICE,
|
||||||
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
|
"Synchronization with slave %s succeeded (socket)",
|
||||||
continue;
|
replicationGetSlaveName(slave));
|
||||||
}
|
} else {
|
||||||
slave->repldboff = 0;
|
if (bgsaveerr != REDIS_OK) {
|
||||||
slave->repldbsize = buf.st_size;
|
freeClient(slave);
|
||||||
slave->replstate = REDIS_REPL_SEND_BULK;
|
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
|
||||||
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
|
continue;
|
||||||
(unsigned long long) slave->repldbsize);
|
}
|
||||||
|
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
|
||||||
|
redis_fstat(slave->repldbfd,&buf) == -1) {
|
||||||
|
freeClient(slave);
|
||||||
|
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
slave->repldboff = 0;
|
||||||
|
slave->repldbsize = buf.st_size;
|
||||||
|
slave->replstate = REDIS_REPL_SEND_BULK;
|
||||||
|
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
|
||||||
|
(unsigned long long) slave->repldbsize);
|
||||||
|
|
||||||
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
||||||
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
|
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
|
||||||
freeClient(slave);
|
freeClient(slave);
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (startbgsave) {
|
if (startbgsave) {
|
||||||
/* Since we are starting a new background save for one or more slaves,
|
if (startBgsaveForReplication() != REDIS_OK) {
|
||||||
* we flush the Replication Script Cache to use EVAL to propagate every
|
|
||||||
* new EVALSHA for the first time, since all the new slaves don't know
|
|
||||||
* about previous scripts. */
|
|
||||||
replicationScriptCacheFlush();
|
|
||||||
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
|
|
||||||
listIter li;
|
listIter li;
|
||||||
|
|
||||||
listRewind(server.slaves,&li);
|
listRewind(server.slaves,&li);
|
||||||
@ -751,6 +847,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
REDIS_NOTUSED(privdata);
|
REDIS_NOTUSED(privdata);
|
||||||
REDIS_NOTUSED(mask);
|
REDIS_NOTUSED(mask);
|
||||||
|
|
||||||
|
/* Static vars used to hold the EOF mark, and the last bytes received
|
||||||
|
* form the server: when they match, we reached the end of the transfer. */
|
||||||
|
static char eofmark[REDIS_RUN_ID_SIZE];
|
||||||
|
static char lastbytes[REDIS_RUN_ID_SIZE];
|
||||||
|
static int usemark = 0;
|
||||||
|
|
||||||
/* If repl_transfer_size == -1 we still have to read the bulk length
|
/* If repl_transfer_size == -1 we still have to read the bulk length
|
||||||
* from the master reply. */
|
* from the master reply. */
|
||||||
if (server.repl_transfer_size == -1) {
|
if (server.repl_transfer_size == -1) {
|
||||||
@ -776,16 +878,44 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
|
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
server.repl_transfer_size = strtol(buf+1,NULL,10);
|
|
||||||
redisLog(REDIS_NOTICE,
|
/* There are two possible forms for the bulk payload. One is the
|
||||||
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
|
* usual $<count> bulk format. The other is used for diskless transfers
|
||||||
(long long) server.repl_transfer_size);
|
* when the master does not know beforehand the size of the file to
|
||||||
|
* transfer. In the latter case, the following format is used:
|
||||||
|
*
|
||||||
|
* $EOF:<40 bytes delimiter>
|
||||||
|
*
|
||||||
|
* At the end of the file the announced delimiter is transmitted. The
|
||||||
|
* delimiter is long and random enough that the probability of a
|
||||||
|
* collision with the actual file content can be ignored. */
|
||||||
|
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
|
||||||
|
usemark = 1;
|
||||||
|
memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
|
||||||
|
memset(lastbytes,0,REDIS_RUN_ID_SIZE);
|
||||||
|
/* Set any repl_transfer_size to avoid entering this code path
|
||||||
|
* at the next call. */
|
||||||
|
server.repl_transfer_size = 0;
|
||||||
|
redisLog(REDIS_NOTICE,
|
||||||
|
"MASTER <-> SLAVE sync: receiving streamed RDB from master");
|
||||||
|
} else {
|
||||||
|
usemark = 0;
|
||||||
|
server.repl_transfer_size = strtol(buf+1,NULL,10);
|
||||||
|
redisLog(REDIS_NOTICE,
|
||||||
|
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
|
||||||
|
(long long) server.repl_transfer_size);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Read bulk data */
|
/* Read bulk data */
|
||||||
left = server.repl_transfer_size - server.repl_transfer_read;
|
if (usemark) {
|
||||||
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
|
readlen = sizeof(buf);
|
||||||
|
} else {
|
||||||
|
left = server.repl_transfer_size - server.repl_transfer_read;
|
||||||
|
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
|
||||||
|
}
|
||||||
|
|
||||||
nread = read(fd,buf,readlen);
|
nread = read(fd,buf,readlen);
|
||||||
if (nread <= 0) {
|
if (nread <= 0) {
|
||||||
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
|
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
|
||||||
@ -793,6 +923,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
replicationAbortSyncTransfer();
|
replicationAbortSyncTransfer();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* When a mark is used, we want to detect EOF asap in order to avoid
|
||||||
|
* writing the EOF mark into the file... */
|
||||||
|
int eof_reached = 0;
|
||||||
|
|
||||||
|
if (usemark) {
|
||||||
|
/* Update the last bytes array, and check if it matches our delimiter. */
|
||||||
|
if (nread >= REDIS_RUN_ID_SIZE) {
|
||||||
|
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
|
||||||
|
} else {
|
||||||
|
int rem = REDIS_RUN_ID_SIZE-nread;
|
||||||
|
memmove(lastbytes,lastbytes+nread,rem);
|
||||||
|
memcpy(lastbytes+rem,buf,nread);
|
||||||
|
}
|
||||||
|
if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
|
||||||
|
}
|
||||||
|
|
||||||
server.repl_transfer_lastio = server.unixtime;
|
server.repl_transfer_lastio = server.unixtime;
|
||||||
if (write(server.repl_transfer_fd,buf,nread) != nread) {
|
if (write(server.repl_transfer_fd,buf,nread) != nread) {
|
||||||
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
||||||
@ -800,6 +947,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
server.repl_transfer_read += nread;
|
server.repl_transfer_read += nread;
|
||||||
|
|
||||||
|
/* Delete the last 40 bytes from the file if we reached EOF. */
|
||||||
|
if (usemark && eof_reached) {
|
||||||
|
if (ftruncate(server.repl_transfer_fd,
|
||||||
|
server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)
|
||||||
|
{
|
||||||
|
redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Sync data on disk from time to time, otherwise at the end of the transfer
|
/* Sync data on disk from time to time, otherwise at the end of the transfer
|
||||||
* we may suffer a big delay as the memory buffers are copied into the
|
* we may suffer a big delay as the memory buffers are copied into the
|
||||||
* actual disk. */
|
* actual disk. */
|
||||||
@ -814,7 +971,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Check if the transfer is now complete */
|
/* Check if the transfer is now complete */
|
||||||
if (server.repl_transfer_read == server.repl_transfer_size) {
|
if (!usemark) {
|
||||||
|
if (server.repl_transfer_read == server.repl_transfer_size)
|
||||||
|
eof_reached = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eof_reached) {
|
||||||
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
|
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
|
||||||
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
||||||
replicationAbortSyncTransfer();
|
replicationAbortSyncTransfer();
|
||||||
@ -1817,7 +1979,9 @@ void replicationCron(void) {
|
|||||||
redisClient *slave = ln->value;
|
redisClient *slave = ln->value;
|
||||||
|
|
||||||
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
|
||||||
slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
|
(slave->replstate == REDIS_REPL_WAIT_BGSAVE_END &&
|
||||||
|
server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET))
|
||||||
|
{
|
||||||
if (write(slave->fd, "\n", 1) == -1) {
|
if (write(slave->fd, "\n", 1) == -1) {
|
||||||
/* Don't worry, it's just a ping. */
|
/* Don't worry, it's just a ping. */
|
||||||
}
|
}
|
||||||
@ -1838,15 +2002,8 @@ void replicationCron(void) {
|
|||||||
if (slave->flags & REDIS_PRE_PSYNC) continue;
|
if (slave->flags & REDIS_PRE_PSYNC) continue;
|
||||||
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
|
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
|
||||||
{
|
{
|
||||||
char ip[REDIS_IP_STR_LEN];
|
redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s",
|
||||||
int port;
|
replicationGetSlaveName(slave));
|
||||||
|
|
||||||
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
|
|
||||||
redisLog(REDIS_WARNING,
|
|
||||||
"Disconnecting timedout slave: %s:%d",
|
|
||||||
ip, slave->slave_listening_port);
|
|
||||||
}
|
|
||||||
freeClient(slave);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1877,6 +2034,48 @@ void replicationCron(void) {
|
|||||||
replicationScriptCacheFlush();
|
replicationScriptCacheFlush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we are using diskless replication and there are slaves waiting
|
||||||
|
* in WAIT_BGSAVE_START state, check if enough seconds elapsed and
|
||||||
|
* start a BGSAVE.
|
||||||
|
*
|
||||||
|
* This code is also useful to trigger a BGSAVE if the diskless
|
||||||
|
* replication was turned off with CONFIG SET, while there were already
|
||||||
|
* slaves in WAIT_BGSAVE_START state. */
|
||||||
|
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
|
||||||
|
time_t idle, max_idle = 0;
|
||||||
|
int slaves_waiting = 0;
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
|
||||||
|
idle = server.unixtime - slave->lastinteraction;
|
||||||
|
if (idle > max_idle) max_idle = idle;
|
||||||
|
slaves_waiting++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
|
||||||
|
/* Start a BGSAVE. Usually with socket target, or with disk target
|
||||||
|
* if there was a recent socket -> disk config change. */
|
||||||
|
if (startBgsaveForReplication() == REDIS_OK) {
|
||||||
|
/* It started! We need to change the state of slaves
|
||||||
|
* from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
|
||||||
|
* the current target is disk. Otherwise it was already done
|
||||||
|
* by rdbSaveToSlavesSockets() which is called by
|
||||||
|
* startBgsaveForReplication(). */
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
|
||||||
|
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||||
refreshGoodSlavesCount();
|
refreshGoodSlavesCount();
|
||||||
}
|
}
|
||||||
|
177
src/rio.c
177
src/rio.c
@ -55,6 +55,8 @@
|
|||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include "redis.h"
|
#include "redis.h"
|
||||||
|
|
||||||
|
/* ------------------------- Buffer I/O implementation ----------------------- */
|
||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure. */
|
||||||
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
|
||||||
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
|
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
|
||||||
@ -76,6 +78,33 @@ static off_t rioBufferTell(rio *r) {
|
|||||||
return r->io.buffer.pos;
|
return r->io.buffer.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
|
* and 0 on failures. */
|
||||||
|
static int rioBufferFlush(rio *r) {
|
||||||
|
REDIS_NOTUSED(r);
|
||||||
|
return 1; /* Nothing to do, our write just appends to the buffer. */
|
||||||
|
}
|
||||||
|
|
||||||
|
static const rio rioBufferIO = {
|
||||||
|
rioBufferRead,
|
||||||
|
rioBufferWrite,
|
||||||
|
rioBufferTell,
|
||||||
|
rioBufferFlush,
|
||||||
|
NULL, /* update_checksum */
|
||||||
|
0, /* current checksum */
|
||||||
|
0, /* bytes read or written */
|
||||||
|
0, /* read/write chunk size */
|
||||||
|
{ { NULL, 0 } } /* union for io-specific vars */
|
||||||
|
};
|
||||||
|
|
||||||
|
void rioInitWithBuffer(rio *r, sds s) {
|
||||||
|
*r = rioBufferIO;
|
||||||
|
r->io.buffer.ptr = s;
|
||||||
|
r->io.buffer.pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* --------------------- Stdio file pointer implementation ------------------- */
|
||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure. */
|
||||||
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
||||||
size_t retval;
|
size_t retval;
|
||||||
@ -103,21 +132,17 @@ static off_t rioFileTell(rio *r) {
|
|||||||
return ftello(r->io.file.fp);
|
return ftello(r->io.file.fp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static const rio rioBufferIO = {
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
rioBufferRead,
|
* and 0 on failures. */
|
||||||
rioBufferWrite,
|
static int rioFileFlush(rio *r) {
|
||||||
rioBufferTell,
|
return (fflush(r->io.file.fp) == 0) ? 1 : 0;
|
||||||
NULL, /* update_checksum */
|
}
|
||||||
0, /* current checksum */
|
|
||||||
0, /* bytes read or written */
|
|
||||||
0, /* read/write chunk size */
|
|
||||||
{ { NULL, 0 } } /* union for io-specific vars */
|
|
||||||
};
|
|
||||||
|
|
||||||
static const rio rioFileIO = {
|
static const rio rioFileIO = {
|
||||||
rioFileRead,
|
rioFileRead,
|
||||||
rioFileWrite,
|
rioFileWrite,
|
||||||
rioFileTell,
|
rioFileTell,
|
||||||
|
rioFileFlush,
|
||||||
NULL, /* update_checksum */
|
NULL, /* update_checksum */
|
||||||
0, /* current checksum */
|
0, /* current checksum */
|
||||||
0, /* bytes read or written */
|
0, /* bytes read or written */
|
||||||
@ -132,12 +157,133 @@ void rioInitWithFile(rio *r, FILE *fp) {
|
|||||||
r->io.file.autosync = 0;
|
r->io.file.autosync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rioInitWithBuffer(rio *r, sds s) {
|
/* ------------------- File descriptors set implementation ------------------- */
|
||||||
*r = rioBufferIO;
|
|
||||||
r->io.buffer.ptr = s;
|
/* Returns 1 or 0 for success/failure.
|
||||||
r->io.buffer.pos = 0;
|
* The function returns success as long as we are able to correctly write
|
||||||
|
* to at least one file descriptor.
|
||||||
|
*
|
||||||
|
* When buf is NULL adn len is 0, the function performs a flush operation
|
||||||
|
* if there is some pending buffer, so this function is also used in order
|
||||||
|
* to implement rioFdsetFlush(). */
|
||||||
|
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
||||||
|
ssize_t retval;
|
||||||
|
int j;
|
||||||
|
unsigned char *p = (unsigned char*) buf;
|
||||||
|
int doflush = (buf == NULL && len == 0);
|
||||||
|
|
||||||
|
/* To start we always append to our buffer. If it gets larger than
|
||||||
|
* a given size, we actually write to the sockets. */
|
||||||
|
if (len) {
|
||||||
|
r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
|
||||||
|
len = 0; /* Prevent entering the while belove if we don't flush. */
|
||||||
|
if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doflush) {
|
||||||
|
p = (unsigned char*) r->io.fdset.buf;
|
||||||
|
len = sdslen(r->io.fdset.buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Write in little chunchs so that when there are big writes we
|
||||||
|
* parallelize while the kernel is sending data in background to
|
||||||
|
* the TCP socket. */
|
||||||
|
while(len) {
|
||||||
|
size_t count = len < 1024 ? len : 1024;
|
||||||
|
int broken = 0;
|
||||||
|
for (j = 0; j < r->io.fdset.numfds; j++) {
|
||||||
|
if (r->io.fdset.state[j] != 0) {
|
||||||
|
/* Skip FDs alraedy in error. */
|
||||||
|
broken++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Make sure to write 'count' bytes to the socket regardless
|
||||||
|
* of short writes. */
|
||||||
|
size_t nwritten = 0;
|
||||||
|
while(nwritten != count) {
|
||||||
|
retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
|
||||||
|
if (retval <= 0) {
|
||||||
|
/* With blocking sockets, which is the sole user of this
|
||||||
|
* rio target, EWOULDBLOCK is returned only because of
|
||||||
|
* the SO_SNDTIMEO socket option, so we translate the error
|
||||||
|
* into one more recognizable by the user. */
|
||||||
|
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
nwritten += retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nwritten != count) {
|
||||||
|
/* Mark this FD as broken. */
|
||||||
|
r->io.fdset.state[j] = errno;
|
||||||
|
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
|
||||||
|
p += count;
|
||||||
|
len -= count;
|
||||||
|
r->io.fdset.pos += count;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doflush) sdsclear(r->io.fdset.buf);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns 1 or 0 for success/failure. */
|
||||||
|
static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
|
||||||
|
REDIS_NOTUSED(r);
|
||||||
|
REDIS_NOTUSED(buf);
|
||||||
|
REDIS_NOTUSED(len);
|
||||||
|
return 0; /* Error, this target does not support reading. */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns read/write position in file. */
|
||||||
|
static off_t rioFdsetTell(rio *r) {
|
||||||
|
return r->io.fdset.pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
|
* and 0 on failures. */
|
||||||
|
static int rioFdsetFlush(rio *r) {
|
||||||
|
/* Our flush is implemented by the write method, that recognizes a
|
||||||
|
* buffer set to NULL with a count of zero as a flush request. */
|
||||||
|
return rioFdsetWrite(r,NULL,0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const rio rioFdsetIO = {
|
||||||
|
rioFdsetRead,
|
||||||
|
rioFdsetWrite,
|
||||||
|
rioFdsetTell,
|
||||||
|
rioFdsetFlush,
|
||||||
|
NULL, /* update_checksum */
|
||||||
|
0, /* current checksum */
|
||||||
|
0, /* bytes read or written */
|
||||||
|
0, /* read/write chunk size */
|
||||||
|
{ { NULL, 0 } } /* union for io-specific vars */
|
||||||
|
};
|
||||||
|
|
||||||
|
void rioInitWithFdset(rio *r, int *fds, int numfds) {
|
||||||
|
int j;
|
||||||
|
|
||||||
|
*r = rioFdsetIO;
|
||||||
|
r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
|
||||||
|
r->io.fdset.state = zmalloc(sizeof(int)*numfds);
|
||||||
|
memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
|
||||||
|
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
|
||||||
|
r->io.fdset.numfds = numfds;
|
||||||
|
r->io.fdset.pos = 0;
|
||||||
|
r->io.fdset.buf = sdsempty();
|
||||||
|
}
|
||||||
|
|
||||||
|
void rioFreeFdset(rio *r) {
|
||||||
|
zfree(r->io.fdset.fds);
|
||||||
|
zfree(r->io.fdset.state);
|
||||||
|
sdsfree(r->io.fdset.buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ---------------------------- Generic functions ---------------------------- */
|
||||||
|
|
||||||
/* This function can be installed both in memory and file streams when checksum
|
/* This function can be installed both in memory and file streams when checksum
|
||||||
* computation is needed. */
|
* computation is needed. */
|
||||||
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
|
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
|
||||||
@ -157,7 +303,8 @@ void rioSetAutoSync(rio *r, off_t bytes) {
|
|||||||
r->io.file.autosync = bytes;
|
r->io.file.autosync = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------ Higher level interface ---------------------------
|
/* --------------------------- Higher level interface --------------------------
|
||||||
|
*
|
||||||
* The following higher level functions use lower level rio.c functions to help
|
* The following higher level functions use lower level rio.c functions to help
|
||||||
* generating the Redis protocol for the Append Only File. */
|
* generating the Redis protocol for the Append Only File. */
|
||||||
|
|
||||||
|
16
src/rio.h
16
src/rio.h
@ -43,6 +43,7 @@ struct _rio {
|
|||||||
size_t (*read)(struct _rio *, void *buf, size_t len);
|
size_t (*read)(struct _rio *, void *buf, size_t len);
|
||||||
size_t (*write)(struct _rio *, const void *buf, size_t len);
|
size_t (*write)(struct _rio *, const void *buf, size_t len);
|
||||||
off_t (*tell)(struct _rio *);
|
off_t (*tell)(struct _rio *);
|
||||||
|
int (*flush)(struct _rio *);
|
||||||
/* The update_cksum method if not NULL is used to compute the checksum of
|
/* The update_cksum method if not NULL is used to compute the checksum of
|
||||||
* all the data that was read or written so far. The method should be
|
* all the data that was read or written so far. The method should be
|
||||||
* designed so that can be called with the current checksum, and the buf
|
* designed so that can be called with the current checksum, and the buf
|
||||||
@ -61,15 +62,25 @@ struct _rio {
|
|||||||
|
|
||||||
/* Backend-specific vars. */
|
/* Backend-specific vars. */
|
||||||
union {
|
union {
|
||||||
|
/* In-memory buffer target. */
|
||||||
struct {
|
struct {
|
||||||
sds ptr;
|
sds ptr;
|
||||||
off_t pos;
|
off_t pos;
|
||||||
} buffer;
|
} buffer;
|
||||||
|
/* Stdio file pointer target. */
|
||||||
struct {
|
struct {
|
||||||
FILE *fp;
|
FILE *fp;
|
||||||
off_t buffered; /* Bytes written since last fsync. */
|
off_t buffered; /* Bytes written since last fsync. */
|
||||||
off_t autosync; /* fsync after 'autosync' bytes written. */
|
off_t autosync; /* fsync after 'autosync' bytes written. */
|
||||||
} file;
|
} file;
|
||||||
|
/* Multiple FDs target (used to write to N sockets). */
|
||||||
|
struct {
|
||||||
|
int *fds; /* File descriptors. */
|
||||||
|
int *state; /* Error state of each fd. 0 (if ok) or errno. */
|
||||||
|
int numfds;
|
||||||
|
off_t pos;
|
||||||
|
sds buf;
|
||||||
|
} fdset;
|
||||||
} io;
|
} io;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -109,8 +120,13 @@ static inline off_t rioTell(rio *r) {
|
|||||||
return r->tell(r);
|
return r->tell(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int rioFlush(rio *r) {
|
||||||
|
return r->flush(r);
|
||||||
|
}
|
||||||
|
|
||||||
void rioInitWithFile(rio *r, FILE *fp);
|
void rioInitWithFile(rio *r, FILE *fp);
|
||||||
void rioInitWithBuffer(rio *r, sds s);
|
void rioInitWithBuffer(rio *r, sds s);
|
||||||
|
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
||||||
|
|
||||||
size_t rioWriteBulkCount(rio *r, char prefix, int count);
|
size_t rioWriteBulkCount(rio *r, char prefix, int count);
|
||||||
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
|
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
|
||||||
|
@ -139,6 +139,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
|
|||||||
*ptr = '\0';
|
*ptr = '\0';
|
||||||
nread++;
|
nread++;
|
||||||
}
|
}
|
||||||
|
size--;
|
||||||
}
|
}
|
||||||
return nread;
|
return nread;
|
||||||
}
|
}
|
||||||
|
@ -94,79 +94,82 @@ start_server {tags {"repl"}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start_server {tags {"repl"}} {
|
foreach dl {no yes} {
|
||||||
set master [srv 0 client]
|
start_server {tags {"repl"}} {
|
||||||
set master_host [srv 0 host]
|
set master [srv 0 client]
|
||||||
set master_port [srv 0 port]
|
$master config set repl-diskless-sync $dl
|
||||||
set slaves {}
|
set master_host [srv 0 host]
|
||||||
set load_handle0 [start_write_load $master_host $master_port 3]
|
set master_port [srv 0 port]
|
||||||
set load_handle1 [start_write_load $master_host $master_port 5]
|
set slaves {}
|
||||||
set load_handle2 [start_write_load $master_host $master_port 20]
|
set load_handle0 [start_write_load $master_host $master_port 3]
|
||||||
set load_handle3 [start_write_load $master_host $master_port 8]
|
set load_handle1 [start_write_load $master_host $master_port 5]
|
||||||
set load_handle4 [start_write_load $master_host $master_port 4]
|
set load_handle2 [start_write_load $master_host $master_port 20]
|
||||||
start_server {} {
|
set load_handle3 [start_write_load $master_host $master_port 8]
|
||||||
lappend slaves [srv 0 client]
|
set load_handle4 [start_write_load $master_host $master_port 4]
|
||||||
start_server {} {
|
start_server {} {
|
||||||
lappend slaves [srv 0 client]
|
lappend slaves [srv 0 client]
|
||||||
start_server {} {
|
start_server {} {
|
||||||
lappend slaves [srv 0 client]
|
lappend slaves [srv 0 client]
|
||||||
test "Connect multiple slaves at the same time (issue #141)" {
|
start_server {} {
|
||||||
# Send SALVEOF commands to slaves
|
lappend slaves [srv 0 client]
|
||||||
[lindex $slaves 0] slaveof $master_host $master_port
|
test "Connect multiple slaves at the same time (issue #141), diskless=$dl" {
|
||||||
[lindex $slaves 1] slaveof $master_host $master_port
|
# Send SALVEOF commands to slaves
|
||||||
[lindex $slaves 2] slaveof $master_host $master_port
|
[lindex $slaves 0] slaveof $master_host $master_port
|
||||||
|
[lindex $slaves 1] slaveof $master_host $master_port
|
||||||
|
[lindex $slaves 2] slaveof $master_host $master_port
|
||||||
|
|
||||||
# Wait for all the three slaves to reach the "online" state
|
# Wait for all the three slaves to reach the "online" state
|
||||||
set retry 500
|
set retry 500
|
||||||
while {$retry} {
|
while {$retry} {
|
||||||
set info [r -3 info]
|
set info [r -3 info]
|
||||||
if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
|
if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
incr retry -1
|
incr retry -1
|
||||||
after 100
|
after 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if {$retry == 0} {
|
||||||
|
error "assertion:Slaves not correctly synchronized"
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if {$retry == 0} {
|
|
||||||
error "assertion:Slaves not correctly synchronized"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Stop the write load
|
# Stop the write load
|
||||||
stop_write_load $load_handle0
|
stop_write_load $load_handle0
|
||||||
stop_write_load $load_handle1
|
stop_write_load $load_handle1
|
||||||
stop_write_load $load_handle2
|
stop_write_load $load_handle2
|
||||||
stop_write_load $load_handle3
|
stop_write_load $load_handle3
|
||||||
stop_write_load $load_handle4
|
stop_write_load $load_handle4
|
||||||
|
|
||||||
# Wait that slaves exit the "loading" state
|
# Wait that slaves exit the "loading" state
|
||||||
wait_for_condition 500 100 {
|
wait_for_condition 500 100 {
|
||||||
![string match {*loading:1*} [[lindex $slaves 0] info]] &&
|
![string match {*loading:1*} [[lindex $slaves 0] info]] &&
|
||||||
![string match {*loading:1*} [[lindex $slaves 1] info]] &&
|
![string match {*loading:1*} [[lindex $slaves 1] info]] &&
|
||||||
![string match {*loading:1*} [[lindex $slaves 2] info]]
|
![string match {*loading:1*} [[lindex $slaves 2] info]]
|
||||||
} else {
|
} else {
|
||||||
fail "Slaves still loading data after too much time"
|
fail "Slaves still loading data after too much time"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Make sure that slaves and master have same number of keys
|
||||||
|
wait_for_condition 500 100 {
|
||||||
|
[$master dbsize] == [[lindex $slaves 0] dbsize] &&
|
||||||
|
[$master dbsize] == [[lindex $slaves 1] dbsize] &&
|
||||||
|
[$master dbsize] == [[lindex $slaves 2] dbsize]
|
||||||
|
} else {
|
||||||
|
fail "Different number of keys between masted and slave after too long time."
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check digests
|
||||||
|
set digest [$master debug digest]
|
||||||
|
set digest0 [[lindex $slaves 0] debug digest]
|
||||||
|
set digest1 [[lindex $slaves 1] debug digest]
|
||||||
|
set digest2 [[lindex $slaves 2] debug digest]
|
||||||
|
assert {$digest ne 0000000000000000000000000000000000000000}
|
||||||
|
assert {$digest eq $digest0}
|
||||||
|
assert {$digest eq $digest1}
|
||||||
|
assert {$digest eq $digest2}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
# Make sure that slaves and master have same number of keys
|
}
|
||||||
wait_for_condition 500 100 {
|
|
||||||
[$master dbsize] == [[lindex $slaves 0] dbsize] &&
|
|
||||||
[$master dbsize] == [[lindex $slaves 1] dbsize] &&
|
|
||||||
[$master dbsize] == [[lindex $slaves 2] dbsize]
|
|
||||||
} else {
|
|
||||||
fail "Different number of keys between masted and slave after too long time."
|
|
||||||
}
|
|
||||||
|
|
||||||
# Check digests
|
|
||||||
set digest [$master debug digest]
|
|
||||||
set digest0 [[lindex $slaves 0] debug digest]
|
|
||||||
set digest1 [[lindex $slaves 1] debug digest]
|
|
||||||
set digest2 [[lindex $slaves 2] debug digest]
|
|
||||||
assert {$digest ne 0000000000000000000000000000000000000000}
|
|
||||||
assert {$digest eq $digest0}
|
|
||||||
assert {$digest eq $digest1}
|
|
||||||
assert {$digest eq $digest2}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user