diff --git a/src/rdb.c b/src/rdb.c index f2eae023..bcabd0ae 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1274,14 +1274,10 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { /* A background saving child (BGSAVE) terminated its work. Handle this. * This function covers the case of RDB -> Salves socket transfers for - * diskless replication. - * - * TODO: - * 1) Read from the pipe the set of IDs which are fine. This should be - * just an uint32_t with the len, and N pid_t elements. - * 2) Close all the slaves in state REDIS_REPL_WAIT_BGSAVE_END but are - * not reported as "transfer ok" by the child. */ + * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + uint64_t *ok_slaves; + if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background RDB transfer terminated with success"); @@ -1294,18 +1290,64 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { server.rdb_child_pid = -1; server.rdb_child_type = REDIS_RDB_CHILD_TYPE_NONE; server.rdb_save_time_start = -1; - /* Read the set of slave client IDs that currently received the full - * RDB payload, closing all the slaves which are not among the ones - * listed. */ - /* TODO ... */ + /* If the child returns an OK exit code, read the set of slave client + * IDs that received the full RDB payload, closing all the slaves + * which are not among the ones listed. + * + * If the process returned an error, consider the list of slaves that + * can continue to be emtpy, so that it's just a speical case of the + * normal code path. */ + ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */ + ok_slaves = 0; + if (!bysignal && exitcode == 0) { + int readlen = sizeof(uint64_t); - /* Close the pipe FDs. */ + if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) != + readlen) + { + readlen = ok_slaves[0]*sizeof(uint64_t); - /* TODO ... */ + /* Make space for enough elements as specified by the first + * uint64_t element in the array. */ + ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen); + if (readlen && + read(server.rdb_pipe_read_result_from_child, ok_slaves+1, + readlen) != readlen) + { + ok_slaves[0] = 0; + } + } + } + + close(server.rdb_pipe_read_result_from_child); + close(server.rdb_pipe_write_result_to_parent); /* We can continue the replication process with all the slaves that - * correctly received the full payload. */ + * correctly received the full payload. Others are terminated. */ + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + uint64_t j; + + for (j = 1; j < ok_slaves[0]; j++) { + if (slave->id == ok_slaves[j]) break; /* Found in the OK list. */ + } + if (j == ok_slaves[0]) { + redisLog(REDIS_WARNING, + "Closing slave %llu: child->slave RDB transfer failed.", + slave->id); + freeClient(slave); + } + } + } + zfree(ok_slaves); + updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR, REDIS_RDB_CHILD_TYPE_SOCKET); }