mirror of
https://github.com/fluencelabs/redis
synced 2025-04-03 16:21:03 +00:00
PSYNC2: stop sending newlines to sub-slaves when master is down.
This actually includes two changes: 1) No newlines to take the master-slave link up when the upstream master is down. Doing this is dangerous because the sub-slave often is received replication protocol for an half-command, so can't receive newlines without desyncing the replication link, even with the code in order to cancel out the bytes that PSYNC2 was using. Moreover this is probably also not needed/sane, because anyway the slave can keep serving requests, and because if it's configured to don't serve stale data, it's a good idea, actually, to break the link. 2) When a +CONTINUE with a different ID is received, we now break connection with the sub-slaves: they need to be notified as well. This was part of the original specification but for some reason it was not implemented in the code, and was alter found as a PSYNC2 bug in the integration testing.
This commit is contained in:
parent
16559a02fc
commit
eab865a0a1
@ -1031,7 +1031,7 @@ int processInlineBuffer(client *c) {
|
|||||||
char *newline;
|
char *newline;
|
||||||
int argc, j;
|
int argc, j;
|
||||||
sds *argv, aux;
|
sds *argv, aux;
|
||||||
size_t querylen, protolen;
|
size_t querylen;
|
||||||
|
|
||||||
/* Search for end of line */
|
/* Search for end of line */
|
||||||
newline = strchr(c->querybuf,'\n');
|
newline = strchr(c->querybuf,'\n');
|
||||||
@ -1044,7 +1044,6 @@ int processInlineBuffer(client *c) {
|
|||||||
}
|
}
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */
|
|
||||||
|
|
||||||
/* Handle the \r\n case. */
|
/* Handle the \r\n case. */
|
||||||
if (newline && newline != c->querybuf && *(newline-1) == '\r')
|
if (newline && newline != c->querybuf && *(newline-1) == '\r')
|
||||||
@ -1067,15 +1066,6 @@ int processInlineBuffer(client *c) {
|
|||||||
if (querylen == 0 && c->flags & CLIENT_SLAVE)
|
if (querylen == 0 && c->flags & CLIENT_SLAVE)
|
||||||
c->repl_ack_time = server.unixtime;
|
c->repl_ack_time = server.unixtime;
|
||||||
|
|
||||||
/* Newline from masters can be used to prevent timeouts, but should
|
|
||||||
* not affect the replication offset since they are always sent
|
|
||||||
* "out of band" directly writing to the socket and without passing
|
|
||||||
* from the output buffers. */
|
|
||||||
if (querylen == 0 && c->flags & CLIENT_MASTER) {
|
|
||||||
c->reploff -= protolen;
|
|
||||||
while (protolen--) chopReplicationBacklog();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Leave data after the first line of the query in the buffer */
|
/* Leave data after the first line of the query in the buffer */
|
||||||
sdsrange(c->querybuf,querylen+2,-1);
|
sdsrange(c->querybuf,querylen+2,-1);
|
||||||
|
|
||||||
|
@ -148,22 +148,6 @@ void feedReplicationBacklog(void *ptr, size_t len) {
|
|||||||
server.repl_backlog_histlen + 1;
|
server.repl_backlog_histlen + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove the last byte from the replication backlog. This
|
|
||||||
* is useful when we receive an out of band "\n" to keep the connection
|
|
||||||
* alive but don't want to count it as replication stream.
|
|
||||||
*
|
|
||||||
* As a side effect this function adjusts the master replication offset
|
|
||||||
* of this instance to account for the missing byte. */
|
|
||||||
void chopReplicationBacklog(void) {
|
|
||||||
if (!server.repl_backlog || !server.repl_backlog_histlen) return;
|
|
||||||
if (server.repl_backlog_idx == 0)
|
|
||||||
server.repl_backlog_idx = server.repl_backlog_size-1;
|
|
||||||
else
|
|
||||||
server.repl_backlog_idx--;
|
|
||||||
server.master_repl_offset--;
|
|
||||||
server.repl_backlog_histlen--;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
|
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
|
||||||
* as input. */
|
* as input. */
|
||||||
void feedReplicationBacklogWithObject(robj *o) {
|
void feedReplicationBacklogWithObject(robj *o) {
|
||||||
@ -1530,6 +1514,9 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
|
|||||||
* new one. */
|
* new one. */
|
||||||
memcpy(server.replid,new,sizeof(server.replid));
|
memcpy(server.replid,new,sizeof(server.replid));
|
||||||
memcpy(server.cached_master->replid,new,sizeof(server.replid));
|
memcpy(server.cached_master->replid,new,sizeof(server.replid));
|
||||||
|
|
||||||
|
/* Disconnect all the sub-slaves: they need to be notified. */
|
||||||
|
disconnectSlaves();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2553,10 +2540,8 @@ void replicationCron(void) {
|
|||||||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
|
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
|
||||||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
|
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
|
||||||
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
|
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
|
||||||
int is_subslave = server.masterhost && server.master == NULL &&
|
|
||||||
slave->replstate == SLAVE_STATE_ONLINE;
|
|
||||||
|
|
||||||
if (is_presync || is_subslave) {
|
if (is_presync) {
|
||||||
if (write(slave->fd, "\n", 1) == -1) {
|
if (write(slave->fd, "\n", 1) == -1) {
|
||||||
/* Don't worry about socket errors, it's just a ping. */
|
/* Don't worry about socket errors, it's just a ping. */
|
||||||
}
|
}
|
||||||
|
@ -87,6 +87,7 @@ start_server {} {
|
|||||||
set slave_id [randomInt 5]
|
set slave_id [randomInt 5]
|
||||||
if {$disconnect} {
|
if {$disconnect} {
|
||||||
$R($slave_id) client kill type master
|
$R($slave_id) client kill type master
|
||||||
|
puts "+++ Breaking link for slave #$slave_id"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user