mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
rio.c fdset target: tolerate (and report) a subset of FDs in error.
Fdset target is used when we want to write an RDB file directly to slave's sockets. In this setup as long as there is a single slave that is still receiving our payload, we want to continue sennding instead of aborting. However rio calls should abort of no FD is ok. Also we want the errors reported so that we can signal the parent who is ok and who is broken, so there is a new set integers with the state of each fd. Zero is ok, non-zero is the errno of the failure, if avaialble, or a generic EIO.
This commit is contained in:
parent
1cd0d26c63
commit
2a436aaeab
22
src/rio.c
22
src/rio.c
@ -144,7 +144,9 @@ void rioInitWithFile(rio *r, FILE *fp) {
|
|||||||
|
|
||||||
/* ------------------- File descriptors set implementation ------------------- */
|
/* ------------------- File descriptors set implementation ------------------- */
|
||||||
|
|
||||||
/* Returns 1 or 0 for success/failure. */
|
/* Returns 1 or 0 for success/failure.
|
||||||
|
* The function returns success as long as we are able to correctly write
|
||||||
|
* to at least one file descriptor. */
|
||||||
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
||||||
size_t retval;
|
size_t retval;
|
||||||
int j;
|
int j;
|
||||||
@ -155,10 +157,21 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
|||||||
* the TCP socket. */
|
* the TCP socket. */
|
||||||
while(len) {
|
while(len) {
|
||||||
size_t count = len < 1024 ? len : 1024;
|
size_t count = len < 1024 ? len : 1024;
|
||||||
|
int broken = 0;
|
||||||
for (j = 0; j < r->io.fdset.numfds; j++) {
|
for (j = 0; j < r->io.fdset.numfds; j++) {
|
||||||
|
if (r->io.fdset.state[j] != 0) {
|
||||||
|
/* Skip FDs alraedy in error. */
|
||||||
|
broken++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
retval = write(r->io.fdset.fds[j],p,count);
|
retval = write(r->io.fdset.fds[j],p,count);
|
||||||
if (retval != count) return 0;
|
if (retval != count) {
|
||||||
|
/* Mark this FD as broken. */
|
||||||
|
r->io.fdset.state[j] = errno;
|
||||||
|
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
|
||||||
p += count;
|
p += count;
|
||||||
len -= count;
|
len -= count;
|
||||||
r->io.fdset.pos += count;
|
r->io.fdset.pos += count;
|
||||||
@ -191,15 +204,20 @@ static const rio rioFdsetIO = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
void rioInitWithFdset(rio *r, int *fds, int numfds) {
|
void rioInitWithFdset(rio *r, int *fds, int numfds) {
|
||||||
|
int j;
|
||||||
|
|
||||||
*r = rioFdsetIO;
|
*r = rioFdsetIO;
|
||||||
r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
|
r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
|
||||||
|
r->io.fdset.state = zmalloc(sizeof(int)*numfds);
|
||||||
memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
|
memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
|
||||||
|
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
|
||||||
r->io.fdset.numfds = numfds;
|
r->io.fdset.numfds = numfds;
|
||||||
r->io.fdset.pos = 0;
|
r->io.fdset.pos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rioFreeFdset(rio *r) {
|
void rioFreeFdset(rio *r) {
|
||||||
zfree(r->io.fdset.fds);
|
zfree(r->io.fdset.fds);
|
||||||
|
zfree(r->io.fdset.state);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ---------------------------- Generic functions ---------------------------- */
|
/* ---------------------------- Generic functions ---------------------------- */
|
||||||
|
@ -74,7 +74,8 @@ struct _rio {
|
|||||||
} file;
|
} file;
|
||||||
/* Multiple FDs target (used to write to N sockets). */
|
/* Multiple FDs target (used to write to N sockets). */
|
||||||
struct {
|
struct {
|
||||||
int *fds;
|
int *fds; /* File descriptors. */
|
||||||
|
int *state; /* Error state of each fd. 0 (if ok) or errno. */
|
||||||
int numfds;
|
int numfds;
|
||||||
off_t pos;
|
off_t pos;
|
||||||
} fdset;
|
} fdset;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user