mirror of
https://github.com/fluencelabs/redis
synced 2025-04-03 16:21:03 +00:00
Better syncio.c with millisecond resolution.
This commit is contained in:
parent
c2672a06cd
commit
04d360fdcd
@ -945,9 +945,9 @@ int equalStringObjects(robj *a, robj *b);
|
|||||||
unsigned long estimateObjectIdleTime(robj *o);
|
unsigned long estimateObjectIdleTime(robj *o);
|
||||||
|
|
||||||
/* Synchronous I/O with timeout */
|
/* Synchronous I/O with timeout */
|
||||||
int syncWrite(int fd, char *ptr, ssize_t size, int timeout);
|
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout);
|
||||||
int syncRead(int fd, char *ptr, ssize_t size, int timeout);
|
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
|
||||||
int syncReadLine(int fd, char *ptr, ssize_t size, int timeout);
|
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
||||||
|
|
||||||
/* Replication */
|
/* Replication */
|
||||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
||||||
|
61
src/syncio.c
61
src/syncio.c
@ -36,50 +36,79 @@
|
|||||||
* of the SYNC command where the slave does it in a blocking way, and
|
* of the SYNC command where the slave does it in a blocking way, and
|
||||||
* the MIGRATE command that must be blocking in order to be atomic from the
|
* the MIGRATE command that must be blocking in order to be atomic from the
|
||||||
* point of view of the two instances (one migrating the key and one receiving
|
* point of view of the two instances (one migrating the key and one receiving
|
||||||
* the key). This is why need the following blocking I/O functions. */
|
* the key). This is why need the following blocking I/O functions.
|
||||||
|
*
|
||||||
|
* All the functions take the timeout in milliseconds. */
|
||||||
|
|
||||||
int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
|
#define REDIS_SYNCIO_RESOLUTION 10 /* Resolution in milliseconds */
|
||||||
|
|
||||||
|
/* Write the specified payload to 'fd'. If writing the whole payload will be done
|
||||||
|
* within 'timeout' milliseconds the operation succeeds and 'size' is returned.
|
||||||
|
* Otherwise the operation fails, -1 is returned, and an unspecified partial write
|
||||||
|
* could be performed against the file descriptor. */
|
||||||
|
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
|
||||||
ssize_t nwritten, ret = size;
|
ssize_t nwritten, ret = size;
|
||||||
time_t start = time(NULL);
|
long long start = mstime();
|
||||||
|
long long remaining = timeout;
|
||||||
|
|
||||||
timeout++;
|
while(1) {
|
||||||
while(size) {
|
long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
|
||||||
if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
|
remaining : REDIS_SYNCIO_RESOLUTION;
|
||||||
|
long long elapsed;
|
||||||
|
|
||||||
|
if (aeWait(fd,AE_WRITABLE,wait) & AE_WRITABLE) {
|
||||||
nwritten = write(fd,ptr,size);
|
nwritten = write(fd,ptr,size);
|
||||||
if (nwritten == -1) return -1;
|
if (nwritten == -1) return -1;
|
||||||
ptr += nwritten;
|
ptr += nwritten;
|
||||||
size -= nwritten;
|
size -= nwritten;
|
||||||
|
if (size == 0) return ret;
|
||||||
}
|
}
|
||||||
if ((time(NULL)-start) > timeout) {
|
elapsed = mstime() - start;
|
||||||
|
if (elapsed >= timeout) {
|
||||||
errno = ETIMEDOUT;
|
errno = ETIMEDOUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
remaining = timeout - elapsed;
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
|
/* Read the specified amount of bytes from 'fd'. If all the bytes are read within
|
||||||
|
* 'timeout' milliseconds the operation succeed and 'size' is returned.
|
||||||
|
* Otherwise the operation fails, -1 is returned, and an unspecified amount of
|
||||||
|
* data could be read from the file descriptor. */
|
||||||
|
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
|
||||||
ssize_t nread, totread = 0;
|
ssize_t nread, totread = 0;
|
||||||
time_t start = time(NULL);
|
long long start = mstime();
|
||||||
|
long long remaining = timeout;
|
||||||
|
|
||||||
timeout++;
|
while(1) {
|
||||||
while(size) {
|
long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
|
||||||
if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
|
remaining : REDIS_SYNCIO_RESOLUTION;
|
||||||
|
long long elapsed;
|
||||||
|
|
||||||
|
if (aeWait(fd,AE_READABLE,wait) & AE_READABLE) {
|
||||||
nread = read(fd,ptr,size);
|
nread = read(fd,ptr,size);
|
||||||
if (nread <= 0) return -1;
|
if (nread <= 0) return -1;
|
||||||
ptr += nread;
|
ptr += nread;
|
||||||
size -= nread;
|
size -= nread;
|
||||||
totread += nread;
|
totread += nread;
|
||||||
|
if (size == 0) return totread;
|
||||||
}
|
}
|
||||||
if ((time(NULL)-start) > timeout) {
|
elapsed = mstime() - start;
|
||||||
|
if (elapsed >= timeout) {
|
||||||
errno = ETIMEDOUT;
|
errno = ETIMEDOUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
remaining = timeout - elapsed;
|
||||||
}
|
}
|
||||||
return totread;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
|
/* Read a line making sure that every char will not require more than 'timeout'
|
||||||
|
* milliseconds to be read.
|
||||||
|
*
|
||||||
|
* On success the number of bytes read is returned, otherwise -1.
|
||||||
|
* On success the string is always correctly terminated with a 0 byte. */
|
||||||
|
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
|
||||||
ssize_t nread = 0;
|
ssize_t nread = 0;
|
||||||
|
|
||||||
size--;
|
size--;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user