mirror of
https://github.com/fluencelabs/redis
synced 2025-03-21 10:00:51 +00:00
Diskless replication: rio fdset target new supports buffering.
To perform a socket write() for each RDB rio API write call was extremely unefficient, so now rio has minimal buffering capabilities. Writes are accumulated into a buffer and only when a given limit is reacehd are actually wrote to the N slaves FDs. Trivia: rio lacked support for buffering since our targets were: 1) Memory buffers. 2) C standard I/O. Both were buffered already.
This commit is contained in:
parent
b1337b15b6
commit
10aafdad56
@ -1425,6 +1425,9 @@ int rdbSaveToSlavesSockets(void) {
|
|||||||
redisSetProcTitle("redis-rdb-to-slaves");
|
redisSetProcTitle("redis-rdb-to-slaves");
|
||||||
|
|
||||||
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
|
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
|
||||||
|
if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0)
|
||||||
|
retval = REDIS_ERR;
|
||||||
|
|
||||||
if (retval == REDIS_OK) {
|
if (retval == REDIS_OK) {
|
||||||
size_t private_dirty = zmalloc_get_private_dirty();
|
size_t private_dirty = zmalloc_get_private_dirty();
|
||||||
|
|
||||||
|
48
src/rio.c
48
src/rio.c
@ -78,10 +78,18 @@ static off_t rioBufferTell(rio *r) {
|
|||||||
return r->io.buffer.pos;
|
return r->io.buffer.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
|
* and 0 on failures. */
|
||||||
|
static int rioBufferFlush(rio *r) {
|
||||||
|
REDIS_NOTUSED(r);
|
||||||
|
return 1; /* Nothing to do, our write just appends to the buffer. */
|
||||||
|
}
|
||||||
|
|
||||||
static const rio rioBufferIO = {
|
static const rio rioBufferIO = {
|
||||||
rioBufferRead,
|
rioBufferRead,
|
||||||
rioBufferWrite,
|
rioBufferWrite,
|
||||||
rioBufferTell,
|
rioBufferTell,
|
||||||
|
rioBufferFlush,
|
||||||
NULL, /* update_checksum */
|
NULL, /* update_checksum */
|
||||||
0, /* current checksum */
|
0, /* current checksum */
|
||||||
0, /* bytes read or written */
|
0, /* bytes read or written */
|
||||||
@ -124,10 +132,17 @@ static off_t rioFileTell(rio *r) {
|
|||||||
return ftello(r->io.file.fp);
|
return ftello(r->io.file.fp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
|
* and 0 on failures. */
|
||||||
|
static int rioFileFlush(rio *r) {
|
||||||
|
return (fflush(r->io.file.fp) == 0) ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
static const rio rioFileIO = {
|
static const rio rioFileIO = {
|
||||||
rioFileRead,
|
rioFileRead,
|
||||||
rioFileWrite,
|
rioFileWrite,
|
||||||
rioFileTell,
|
rioFileTell,
|
||||||
|
rioFileFlush,
|
||||||
NULL, /* update_checksum */
|
NULL, /* update_checksum */
|
||||||
0, /* current checksum */
|
0, /* current checksum */
|
||||||
0, /* bytes read or written */
|
0, /* bytes read or written */
|
||||||
@ -146,11 +161,29 @@ void rioInitWithFile(rio *r, FILE *fp) {
|
|||||||
|
|
||||||
/* Returns 1 or 0 for success/failure.
|
/* Returns 1 or 0 for success/failure.
|
||||||
* The function returns success as long as we are able to correctly write
|
* The function returns success as long as we are able to correctly write
|
||||||
* to at least one file descriptor. */
|
* to at least one file descriptor.
|
||||||
|
*
|
||||||
|
* When buf is NULL adn len is 0, the function performs a flush operation
|
||||||
|
* if there is some pending buffer, so this function is also used in order
|
||||||
|
* to implement rioFdsetFlush(). */
|
||||||
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
||||||
size_t retval;
|
size_t retval;
|
||||||
int j;
|
int j;
|
||||||
unsigned char *p = (unsigned char*) buf;
|
unsigned char *p = (unsigned char*) buf;
|
||||||
|
int doflush = (buf == NULL && len == 0);
|
||||||
|
|
||||||
|
/* To start we always append to our buffer. If it gets larger than
|
||||||
|
* a given size, we actually write to the sockets. */
|
||||||
|
if (len) {
|
||||||
|
r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
|
||||||
|
len = 0; /* Prevent entering the while belove if we don't flush. */
|
||||||
|
if (sdslen(r->io.fdset.buf) > REDIS_IOBUF_LEN) doflush = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doflush) {
|
||||||
|
p = (unsigned char*) r->io.fdset.buf;
|
||||||
|
len = sdslen(r->io.fdset.buf);
|
||||||
|
}
|
||||||
|
|
||||||
/* Write in little chunchs so that when there are big writes we
|
/* Write in little chunchs so that when there are big writes we
|
||||||
* parallelize while the kernel is sending data in background to
|
* parallelize while the kernel is sending data in background to
|
||||||
@ -176,6 +209,8 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
|||||||
len -= count;
|
len -= count;
|
||||||
r->io.fdset.pos += count;
|
r->io.fdset.pos += count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (doflush) sdsclear(r->io.fdset.buf);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,10 +227,19 @@ static off_t rioFdsetTell(rio *r) {
|
|||||||
return r->io.fdset.pos;
|
return r->io.fdset.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||||
|
* and 0 on failures. */
|
||||||
|
static int rioFdsetFlush(rio *r) {
|
||||||
|
/* Our flush is implemented by the write method, that recognizes a
|
||||||
|
* buffer set to NULL with a count of zero as a flush request. */
|
||||||
|
return rioFdsetWrite(r,NULL,0);
|
||||||
|
}
|
||||||
|
|
||||||
static const rio rioFdsetIO = {
|
static const rio rioFdsetIO = {
|
||||||
rioFdsetRead,
|
rioFdsetRead,
|
||||||
rioFdsetWrite,
|
rioFdsetWrite,
|
||||||
rioFdsetTell,
|
rioFdsetTell,
|
||||||
|
rioFdsetFlush,
|
||||||
NULL, /* update_checksum */
|
NULL, /* update_checksum */
|
||||||
0, /* current checksum */
|
0, /* current checksum */
|
||||||
0, /* bytes read or written */
|
0, /* bytes read or written */
|
||||||
@ -213,11 +257,13 @@ void rioInitWithFdset(rio *r, int *fds, int numfds) {
|
|||||||
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
|
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
|
||||||
r->io.fdset.numfds = numfds;
|
r->io.fdset.numfds = numfds;
|
||||||
r->io.fdset.pos = 0;
|
r->io.fdset.pos = 0;
|
||||||
|
r->io.fdset.buf = sdsempty();
|
||||||
}
|
}
|
||||||
|
|
||||||
void rioFreeFdset(rio *r) {
|
void rioFreeFdset(rio *r) {
|
||||||
zfree(r->io.fdset.fds);
|
zfree(r->io.fdset.fds);
|
||||||
zfree(r->io.fdset.state);
|
zfree(r->io.fdset.state);
|
||||||
|
sdsfree(r->io.fdset.buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ---------------------------- Generic functions ---------------------------- */
|
/* ---------------------------- Generic functions ---------------------------- */
|
||||||
|
@ -43,6 +43,7 @@ struct _rio {
|
|||||||
size_t (*read)(struct _rio *, void *buf, size_t len);
|
size_t (*read)(struct _rio *, void *buf, size_t len);
|
||||||
size_t (*write)(struct _rio *, const void *buf, size_t len);
|
size_t (*write)(struct _rio *, const void *buf, size_t len);
|
||||||
off_t (*tell)(struct _rio *);
|
off_t (*tell)(struct _rio *);
|
||||||
|
int (*flush)(struct _rio *);
|
||||||
/* The update_cksum method if not NULL is used to compute the checksum of
|
/* The update_cksum method if not NULL is used to compute the checksum of
|
||||||
* all the data that was read or written so far. The method should be
|
* all the data that was read or written so far. The method should be
|
||||||
* designed so that can be called with the current checksum, and the buf
|
* designed so that can be called with the current checksum, and the buf
|
||||||
@ -78,6 +79,7 @@ struct _rio {
|
|||||||
int *state; /* Error state of each fd. 0 (if ok) or errno. */
|
int *state; /* Error state of each fd. 0 (if ok) or errno. */
|
||||||
int numfds;
|
int numfds;
|
||||||
off_t pos;
|
off_t pos;
|
||||||
|
sds buf;
|
||||||
} fdset;
|
} fdset;
|
||||||
} io;
|
} io;
|
||||||
};
|
};
|
||||||
@ -118,6 +120,10 @@ static inline off_t rioTell(rio *r) {
|
|||||||
return r->tell(r);
|
return r->tell(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int rioFlush(rio *r) {
|
||||||
|
return r->flush(r);
|
||||||
|
}
|
||||||
|
|
||||||
void rioInitWithFile(rio *r, FILE *fp);
|
void rioInitWithFile(rio *r, FILE *fp);
|
||||||
void rioInitWithBuffer(rio *r, sds s);
|
void rioInitWithBuffer(rio *r, sds s);
|
||||||
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user