diff --git a/src/rdb.c b/src/rdb.c index abb966e0..3f1e2252 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -717,7 +717,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { size_t len; unsigned int i; - redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdb->tell(rdb)); + redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdbTell(rdb)); if (rdbtype == REDIS_RDB_TYPE_STRING) { /* Read string value */ if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; @@ -1039,7 +1039,7 @@ int rdbLoad(char *filename) { /* Serve the clients from time to time */ if (!(loops++ % 1000)) { - loadingProgress(rdb.tell(&rdb)); + loadingProgress(rdbTell(&rdb)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } diff --git a/src/rio.c b/src/rio.c index 5a745840..bb977c74 100644 --- a/src/rio.c +++ b/src/rio.c @@ -58,6 +58,8 @@ static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, + NULL, /* update_checksum */ + 0, /* current checksum */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -65,6 +67,8 @@ static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, + NULL, /* update_checksum */ + 0, /* current checksum */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -79,6 +83,12 @@ void rioInitWithBuffer(rio *r, sds s) { r->io.buffer.pos = 0; } +/* This function can be installed both in memory and file streams when checksum + * computation is needed. */ +void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { + r->checksum = crc64(r->checksum,buf,len); +} + /* ------------------------------ Higher level interface --------------------------- * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ diff --git a/src/rio.h b/src/rio.h index 2a830eb5..31746303 100644 --- a/src/rio.h +++ b/src/rio.h @@ -2,6 +2,7 @@ #define __REDIS_RIO_H #include +#include #include "sds.h" struct _rio { @@ -11,6 +12,14 @@ struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); + /* The update_cksum method if not NULL is used to compute the checksum of all the + * data that was read or written so far. The method should be designed so that + * can be called with the current checksum, and the buf and len fields pointing + * to the new block of data to add to the checksum computation. */ + void (*update_cksum)(struct _rio *, void *buf, size_t len); + + /* The current checksum */ + uint64_t cksum; /* Backend-specific vars. */ union { @@ -26,8 +35,26 @@ struct _rio { typedef struct _rio rio; -#define rioWrite(rio,buf,len) ((rio)->write((rio),(buf),(len))) -#define rioRead(rio,buf,len) ((rio)->read((rio),(buf),(len))) +/* The following functions are our interface with the stream. They'll call the + * actual implementation of read / write / tell, and will update the checksum + * if needed. */ + +inline size_t rioWrite(rio *r, const void *buf, size_t len) { + if (r->udpate_cksum) r->update_cksum(r,buf,len); + return r->write(r,buf,len); +} + +inline size_t rioRead(rio *r, void *buf, size_t len) { + if (r->read(r,buf,len) == 1) { + if (r->udpate_cksum) r->update_cksum(r,buf,len); + return 1; + } + return 0; +} + +inline off_t rioTell(rio *r) { + return r->tell(r); +} void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); @@ -37,4 +64,6 @@ size_t rioWriteBulkString(rio *r, const char *buf, size_t len); size_t rioWriteBulkLongLong(rio *r, long long l); size_t rioWriteBulkDouble(rio *r, double d); +void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); + #endif