mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 09:30:55 +00:00
rio fdset target: handle short writes.
While the socket is set in blocking mode, we still can get short writes writing to a socket.
This commit is contained in:
parent
74f90c6123
commit
525c488f63
@ -1347,6 +1347,8 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
|
||||
redisLog(REDIS_WARNING,
|
||||
"Slave %llu correctly received the streamed RDB file.",
|
||||
slave->id);
|
||||
/* Restore the socket as non-blocking. */
|
||||
anetNonBlock(NULL,slave->fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1408,6 +1410,10 @@ int rdbSaveToSlavesSockets(void) {
|
||||
clientids[numfds] = slave->id;
|
||||
fds[numfds++] = slave->fd;
|
||||
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
|
||||
/* Put the socket in non-blocking mode to simplify RDB transfer.
|
||||
* We'll restore it when the children returns (since duped socket
|
||||
* will share the O_NONBLOCK attribute with the parent). */
|
||||
anetBlock(NULL,slave->fd);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -888,6 +888,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
}
|
||||
|
||||
nread = read(fd,buf,readlen);
|
||||
printf("NREAD %d (%d)\n", (int)nread, (int)readlen);
|
||||
if (nread <= 0) {
|
||||
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
|
||||
(nread == -1) ? strerror(errno) : "connection lost");
|
||||
|
13
src/rio.c
13
src/rio.c
@ -197,8 +197,17 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
||||
broken++;
|
||||
continue;
|
||||
}
|
||||
retval = write(r->io.fdset.fds[j],p,count);
|
||||
if (retval != count) {
|
||||
|
||||
/* Make sure to write 'count' bytes to the socket regardless
|
||||
* of short writes. */
|
||||
size_t nwritten = 0;
|
||||
while(nwritten != count) {
|
||||
retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
|
||||
if (retval <= 0) break;
|
||||
nwritten += retval;
|
||||
}
|
||||
|
||||
if (nwritten != count) {
|
||||
/* Mark this FD as broken. */
|
||||
r->io.fdset.state[j] = errno;
|
||||
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
|
||||
|
Loading…
x
Reference in New Issue
Block a user