Diskless replication: read report from child.

This commit is contained in:
antirez 2014-10-15 11:35:00 +02:00
parent fbe7545545
commit 7a1e0d9898

View File

@ -1274,14 +1274,10 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
/* A background saving child (BGSAVE) terminated its work. Handle this.
* This function covers the case of RDB -> Salves socket transfers for
* diskless replication.
*
* TODO:
* 1) Read from the pipe the set of IDs which are fine. This should be
* just an uint32_t with the len, and N pid_t elements.
* 2) Close all the slaves in state REDIS_REPL_WAIT_BGSAVE_END but are
* not reported as "transfer ok" by the child. */
* diskless replication. */
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
uint64_t *ok_slaves;
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background RDB transfer terminated with success");
@ -1294,18 +1290,64 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
server.rdb_child_pid = -1;
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE;
server.rdb_save_time_start = -1;
/* Read the set of slave client IDs that currently received the full
* RDB payload, closing all the slaves which are not among the ones
* listed. */
/* TODO ... */
/* If the child returns an OK exit code, read the set of slave client
* IDs that received the full RDB payload, closing all the slaves
* which are not among the ones listed.
*
* If the process returned an error, consider the list of slaves that
* can continue to be emtpy, so that it's just a speical case of the
* normal code path. */
ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
ok_slaves = 0;
if (!bysignal && exitcode == 0) {
int readlen = sizeof(uint64_t);
/* Close the pipe FDs. */
if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) !=
readlen)
{
readlen = ok_slaves[0]*sizeof(uint64_t);
/* TODO ... */
/* Make space for enough elements as specified by the first
* uint64_t element in the array. */
ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
if (readlen &&
read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
readlen) != readlen)
{
ok_slaves[0] = 0;
}
}
}
close(server.rdb_pipe_read_result_from_child);
close(server.rdb_pipe_write_result_to_parent);
/* We can continue the replication process with all the slaves that
* correctly received the full payload. */
* correctly received the full payload. Others are terminated. */
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
uint64_t j;
for (j = 1; j < ok_slaves[0]; j++) {
if (slave->id == ok_slaves[j]) break; /* Found in the OK list. */
}
if (j == ok_slaves[0]) {
redisLog(REDIS_WARNING,
"Closing slave %llu: child->slave RDB transfer failed.",
slave->id);
freeClient(slave);
}
}
}
zfree(ok_slaves);
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET);
}