From 850ea57c37e517eb0f10d8fc319332ca339d0ba2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 10 Oct 2014 17:44:06 +0200 Subject: [PATCH] rio.c: draft implementation of fdset target implemented. --- src/rio.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/rio.h | 9 +++++++++ 2 files changed, 69 insertions(+) diff --git a/src/rio.c b/src/rio.c index aa5061b7..88f6781e 100644 --- a/src/rio.c +++ b/src/rio.c @@ -142,6 +142,66 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } +/* ------------------- File descriptors set implementation ------------------- */ + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { + size_t retval; + int j; + unsigned char *p = (unsigned char*) buf; + + /* Write in little chunchs so that when there are big writes we + * parallelize while the kernel is sending data in background to + * the TCP socket. */ + while(len) { + size_t count = len < 1024 ? len : 1024; + for (j = 0; j < r->io.fdset.numfds; j++) { + retval = write(r->io.fdset.fds[j],p,count); + if (retval != count) return 0; + } + p += count; + len -= count; + r->io.fdset.pos += count; + } + return 1; +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdsetRead(rio *r, void *buf, size_t len) { + REDIS_NOTUSED(r); + REDIS_NOTUSED(buf); + REDIS_NOTUSED(len); + return 0; /* Error, this target does not support reading. */ +} + +/* Returns read/write position in file. */ +static off_t rioFdsetTell(rio *r) { + return r->io.fdset.pos; +} + +static const rio rioFdsetIO = { + rioFdsetRead, + rioFdsetWrite, + rioFdsetTell, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +void rioInitWithFdset(rio *r, int *fds, int numfds) { + *r = rioFdsetIO; + r->io.fdset.fds = zmalloc(sizeof(int)*numfds); + memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds); + r->io.fdset.numfds = numfds; + r->io.fdset.pos = 0; +} + +void rioFreeFdset(rio *r) { + zfree(r->io.fdset.fds); +} + /* ---------------------------- Generic functions ---------------------------- */ /* This function can be installed both in memory and file streams when checksum diff --git a/src/rio.h b/src/rio.h index 2d12c6cc..0d485d45 100644 --- a/src/rio.h +++ b/src/rio.h @@ -61,15 +61,23 @@ struct _rio { /* Backend-specific vars. */ union { + /* In-memory buffer target. */ struct { sds ptr; off_t pos; } buffer; + /* Stdio file pointer target. */ struct { FILE *fp; off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* Multiple FDs target (used to write to N sockets). */ + struct { + int *fds; + int numfds; + off_t pos; + } fdset; } io; }; @@ -111,6 +119,7 @@ static inline off_t rioTell(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFdset(rio *r, int *fds, int numfds); size_t rioWriteBulkCount(rio *r, char prefix, int count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len);