mirror of
https://github.com/fluencelabs/redis
synced 2025-03-20 17:40:50 +00:00
rio.c: draft implementation of fdset target implemented.
This commit is contained in:
parent
29db3227ab
commit
850ea57c37
60
src/rio.c
60
src/rio.c
@ -142,6 +142,66 @@ void rioInitWithFile(rio *r, FILE *fp) {
|
|||||||
r->io.file.autosync = 0;
|
r->io.file.autosync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ------------------- File descriptors set implementation ------------------- */
|
||||||
|
|
||||||
|
/* Returns 1 or 0 for success/failure. */
|
||||||
|
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
|
||||||
|
size_t retval;
|
||||||
|
int j;
|
||||||
|
unsigned char *p = (unsigned char*) buf;
|
||||||
|
|
||||||
|
/* Write in little chunchs so that when there are big writes we
|
||||||
|
* parallelize while the kernel is sending data in background to
|
||||||
|
* the TCP socket. */
|
||||||
|
while(len) {
|
||||||
|
size_t count = len < 1024 ? len : 1024;
|
||||||
|
for (j = 0; j < r->io.fdset.numfds; j++) {
|
||||||
|
retval = write(r->io.fdset.fds[j],p,count);
|
||||||
|
if (retval != count) return 0;
|
||||||
|
}
|
||||||
|
p += count;
|
||||||
|
len -= count;
|
||||||
|
r->io.fdset.pos += count;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns 1 or 0 for success/failure. */
|
||||||
|
static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
|
||||||
|
REDIS_NOTUSED(r);
|
||||||
|
REDIS_NOTUSED(buf);
|
||||||
|
REDIS_NOTUSED(len);
|
||||||
|
return 0; /* Error, this target does not support reading. */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns read/write position in file. */
|
||||||
|
static off_t rioFdsetTell(rio *r) {
|
||||||
|
return r->io.fdset.pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const rio rioFdsetIO = {
|
||||||
|
rioFdsetRead,
|
||||||
|
rioFdsetWrite,
|
||||||
|
rioFdsetTell,
|
||||||
|
NULL, /* update_checksum */
|
||||||
|
0, /* current checksum */
|
||||||
|
0, /* bytes read or written */
|
||||||
|
0, /* read/write chunk size */
|
||||||
|
{ { NULL, 0 } } /* union for io-specific vars */
|
||||||
|
};
|
||||||
|
|
||||||
|
void rioInitWithFdset(rio *r, int *fds, int numfds) {
|
||||||
|
*r = rioFdsetIO;
|
||||||
|
r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
|
||||||
|
memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
|
||||||
|
r->io.fdset.numfds = numfds;
|
||||||
|
r->io.fdset.pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void rioFreeFdset(rio *r) {
|
||||||
|
zfree(r->io.fdset.fds);
|
||||||
|
}
|
||||||
|
|
||||||
/* ---------------------------- Generic functions ---------------------------- */
|
/* ---------------------------- Generic functions ---------------------------- */
|
||||||
|
|
||||||
/* This function can be installed both in memory and file streams when checksum
|
/* This function can be installed both in memory and file streams when checksum
|
||||||
|
@ -61,15 +61,23 @@ struct _rio {
|
|||||||
|
|
||||||
/* Backend-specific vars. */
|
/* Backend-specific vars. */
|
||||||
union {
|
union {
|
||||||
|
/* In-memory buffer target. */
|
||||||
struct {
|
struct {
|
||||||
sds ptr;
|
sds ptr;
|
||||||
off_t pos;
|
off_t pos;
|
||||||
} buffer;
|
} buffer;
|
||||||
|
/* Stdio file pointer target. */
|
||||||
struct {
|
struct {
|
||||||
FILE *fp;
|
FILE *fp;
|
||||||
off_t buffered; /* Bytes written since last fsync. */
|
off_t buffered; /* Bytes written since last fsync. */
|
||||||
off_t autosync; /* fsync after 'autosync' bytes written. */
|
off_t autosync; /* fsync after 'autosync' bytes written. */
|
||||||
} file;
|
} file;
|
||||||
|
/* Multiple FDs target (used to write to N sockets). */
|
||||||
|
struct {
|
||||||
|
int *fds;
|
||||||
|
int numfds;
|
||||||
|
off_t pos;
|
||||||
|
} fdset;
|
||||||
} io;
|
} io;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -111,6 +119,7 @@ static inline off_t rioTell(rio *r) {
|
|||||||
|
|
||||||
void rioInitWithFile(rio *r, FILE *fp);
|
void rioInitWithFile(rio *r, FILE *fp);
|
||||||
void rioInitWithBuffer(rio *r, sds s);
|
void rioInitWithBuffer(rio *r, sds s);
|
||||||
|
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
||||||
|
|
||||||
size_t rioWriteBulkCount(rio *r, char prefix, int count);
|
size_t rioWriteBulkCount(rio *r, char prefix, int count);
|
||||||
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
|
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user