mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 17:40:50 +00:00
Diskless replication: EOF:<mark> streaming support slave side.
This commit is contained in:
parent
43ae606430
commit
5ee2ccf48e
@ -818,6 +818,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
REDIS_NOTUSED(privdata);
|
||||
REDIS_NOTUSED(mask);
|
||||
|
||||
/* Static vars used to hold the EOF mark, and the last bytes received
|
||||
* form the server: when they match, we reached the end of the transfer. */
|
||||
static char eofmark[REDIS_RUN_ID_SIZE];
|
||||
static char lastbytes[REDIS_RUN_ID_SIZE];
|
||||
static int usemark = 0;
|
||||
|
||||
/* If repl_transfer_size == -1 we still have to read the bulk length
|
||||
* from the master reply. */
|
||||
if (server.repl_transfer_size == -1) {
|
||||
@ -843,16 +849,41 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
|
||||
goto error;
|
||||
}
|
||||
server.repl_transfer_size = strtol(buf+1,NULL,10);
|
||||
redisLog(REDIS_NOTICE,
|
||||
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
|
||||
(long long) server.repl_transfer_size);
|
||||
|
||||
/* There are two possible forms for the bulk payload. One is the
|
||||
* usual $<count> bulk format. The other is used for diskless transfers
|
||||
* when the master does not know beforehand the size of the file to
|
||||
* transfer. In the latter case, the following format is used:
|
||||
*
|
||||
* $EOF:<40 bytes delimiter>
|
||||
*
|
||||
* At the end of the file the announced delimiter is transmitted. The
|
||||
* delimiter is long and random enough that the probability of a
|
||||
* collision with the actual file content can be ignored. */
|
||||
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
|
||||
usemark = 1;
|
||||
memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
|
||||
memset(lastbytes,0,REDIS_RUN_ID_SIZE);
|
||||
redisLog(REDIS_NOTICE,
|
||||
"MASTER <-> SLAVE sync: receiving streamed RDB from master");
|
||||
} else {
|
||||
usemark = 0;
|
||||
server.repl_transfer_size = strtol(buf+1,NULL,10);
|
||||
redisLog(REDIS_NOTICE,
|
||||
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
|
||||
(long long) server.repl_transfer_size);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Read bulk data */
|
||||
left = server.repl_transfer_size - server.repl_transfer_read;
|
||||
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
|
||||
if (usemark) {
|
||||
left = server.repl_transfer_size - server.repl_transfer_read;
|
||||
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
|
||||
} else {
|
||||
readlen = sizeof(buf);
|
||||
}
|
||||
|
||||
nread = read(fd,buf,readlen);
|
||||
if (nread <= 0) {
|
||||
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
|
||||
@ -860,6 +891,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
}
|
||||
|
||||
/* When a mark is used, we want to detect EOF asap in order to avoid
|
||||
* writing the EOF mark into the file... */
|
||||
int eof_reached = 0;
|
||||
|
||||
if (usemark) {
|
||||
/* Update the last bytes array, and check if it matches our delimiter. */
|
||||
if (nread >= REDIS_RUN_ID_SIZE) {
|
||||
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
|
||||
} else {
|
||||
int rem = REDIS_RUN_ID_SIZE-nread;
|
||||
memmove(lastbytes,lastbytes+nread,rem);
|
||||
memcpy(lastbytes+rem,buf,nread);
|
||||
}
|
||||
if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
|
||||
}
|
||||
|
||||
server.repl_transfer_lastio = server.unixtime;
|
||||
if (write(server.repl_transfer_fd,buf,nread) != nread) {
|
||||
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
||||
@ -881,7 +929,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
}
|
||||
|
||||
/* Check if the transfer is now complete */
|
||||
if (server.repl_transfer_read == server.repl_transfer_size) {
|
||||
if (!usemark) {
|
||||
if (server.repl_transfer_read == server.repl_transfer_size)
|
||||
eof_reached = 1;
|
||||
}
|
||||
|
||||
if (eof_reached) {
|
||||
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
|
||||
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
|
||||
replicationAbortSyncTransfer();
|
||||
|
Loading…
x
Reference in New Issue
Block a user