From 36babc1e31f434e95fc49a6a1f611a75b3827ade Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 30 Aug 2010 11:14:54 +0200 Subject: [PATCH 01/14] Refactor reply parsing code in redis-benchmark for efficiency --- src/redis-benchmark.c | 162 +++++++++++++++++++++++------------------- 1 file changed, 90 insertions(+), 72 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 123d8118..ceeab2b9 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -206,16 +206,27 @@ static void clientDone(client c) { } } +/* Read a length from the buffer pointed to by *p, store the length in *len, + * and return the number of bytes that the cursor advanced. */ +static int readLen(char *p, int *len) { + char *tail = strstr(p,"\r\n"); + if (tail == NULL) + return 0; + *tail = '\0'; + *len = atoi(p+1); + return tail+2-p; +} + static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[1024]; - int nread; + char buf[1024], *p; + int nread, pos=0, len=0; client c = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(fd); REDIS_NOTUSED(mask); - nread = read(c->fd, buf, 1024); + nread = read(c->fd,buf,sizeof(buf)); if (nread == -1) { fprintf(stderr, "Reading from socket: %s\n", strerror(errno)); freeClient(c); @@ -228,82 +239,89 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) } c->totreceived += nread; c->ibuf = sdscatlen(c->ibuf,buf,nread); + len = sdslen(c->ibuf); -processdata: - /* Are we waiting for the first line of the command of for sdf - * count in bulk or multi bulk operations? */ if (c->replytype == REPLY_INT || - c->replytype == REPLY_RETCODE || - (c->replytype == REPLY_BULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->mbulk == -1)) { - char *p; - - /* Check if the first line is complete. This is only true if - * there is a newline inside the buffer. */ - if ((p = strchr(c->ibuf,'\n')) != NULL) { - if (c->replytype == REPLY_BULK || - (c->replytype == REPLY_MBULK && c->mbulk != -1)) - { - /* Read the count of a bulk reply (being it a single bulk or - * a multi bulk reply). "$" for the protocol spec. */ - *p = '\0'; - *(p-1) = '\0'; - c->readlen = atoi(c->ibuf+1)+2; - // printf("BULK ATOI: %s\n", c->ibuf+1); - /* Handle null bulk reply "$-1" */ - if (c->readlen-2 == -1) { - clientDone(c); - return; - } - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - /* fall through to reach the point where the code will try - * to check if the bulk reply is complete. */ - } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) { - /* Read the count of a multi bulk reply. That is, how many - * bulk replies we have to read next. "*" protocol. */ - *p = '\0'; - *(p-1) = '\0'; - c->mbulk = atoi(c->ibuf+1); - /* Handle null bulk reply "*-1" */ - if (c->mbulk == -1) { - clientDone(c); - return; - } - // printf("%p) %d elements list\n", c, c->mbulk); - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - goto processdata; - } else { - c->ibuf = sdstrim(c->ibuf,"\r\n"); - clientDone(c); - return; - } - } - } - /* bulk read, did we read everything? */ - if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || - (c->replytype == REPLY_BULK)) && c->readlen != -1 && - (unsigned)c->readlen <= sdslen(c->ibuf)) + c->replytype == REPLY_RETCODE) { - // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n", - // c->mbulk,c->readlen,sdslen(c->ibuf)); - if (c->replytype == REPLY_BULK) { - clientDone(c); - } else if (c->replytype == REPLY_MBULK) { - // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen); - // fwrite(c->ibuf,c->readlen,1,stdout); - // printf("\n"); - if (--c->mbulk == 0) { - clientDone(c); + /* Check if the first line is complete. This is everything we need + * when waiting for an integer or status code reply.*/ + if ((p = strstr(c->ibuf,"\r\n")) != NULL) + goto done; + } else if (c->replytype == REPLY_BULK) { + int advance = 0; + if (c->readlen < 0) { + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + goto done; + } else { + /* include the trailing \r\n */ + c->readlen += 2; + } } else { - c->ibuf = sdsrange(c->ibuf,c->readlen,-1); - c->readlen = -1; - goto processdata; + goto skip; } } + + int canconsume; + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + } + + if (c->readlen == 0) + goto done; + } else if (c->replytype == REPLY_MBULK) { + int advance = 0; + if (c->mbulk == -1) { + advance = readLen(c->ibuf+pos,&c->mbulk); + if (advance) { + pos += advance; + if (c->mbulk == -1) + goto done; + } else { + goto skip; + } + } + + int canconsume; + while(c->mbulk > 0 && pos < len) { + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + if (c->readlen == 0) + c->mbulk--; + } else { + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + c->mbulk--; + continue; + } else { + /* include the trailing \r\n */ + c->readlen += 2; + } + } else { + goto skip; + } + } + } + + if (c->mbulk == 0) + goto done; } + +skip: + c->ibuf = sdsrange(c->ibuf,pos,-1); + return; +done: + clientDone(c); + return; } static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) From ed0dd55402710d5bb21ef66d81a7dff694737c22 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 30 Aug 2010 11:25:02 +0200 Subject: [PATCH 02/14] Show the current throughput while benchmarking --- src/redis-benchmark.c | 86 +++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index ceeab2b9..297ecc6c 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -75,6 +75,7 @@ static struct config { long long start; long long totlatency; int *latency; + char *title; list *clients; int quiet; int loop; @@ -389,13 +390,13 @@ static void createMissingClients(client c) { } } -static void showLatencyReport(char *title) { +static void showLatencyReport(void) { int j, seen = 0; float perc, reqpersec; reqpersec = (float)config.donerequests/((float)config.totlatency/1000); if (!config.quiet) { - printf("====== %s ======\n", title); + printf("====== %s ======\n", config.title); printf(" %d requests completed in %.2f seconds\n", config.donerequests, (float)config.totlatency/1000); printf(" %d parallel clients\n", config.numclients); @@ -411,20 +412,20 @@ static void showLatencyReport(char *title) { } printf("%.2f requests per second\n\n", reqpersec); } else { - printf("%s: %.2f requests per second\n", title, reqpersec); + printf("%s: %.2f requests per second\n", config.title, reqpersec); } } -static void prepareForBenchmark(void) -{ +static void prepareForBenchmark(char *title) { memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1)); + config.title = title; config.start = mstime(); config.donerequests = 0; } -static void endBenchmark(char *title) { +static void endBenchmark(void) { config.totlatency = mstime()-config.start; - showLatencyReport(title); + showLatencyReport(); freeAllClients(); } @@ -498,6 +499,18 @@ void parseOptions(int argc, char **argv) { } } +int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) { + REDIS_NOTUSED(eventLoop); + REDIS_NOTUSED(id); + REDIS_NOTUSED(clientData); + + float dt = (float)(mstime()-config.start)/1000.0; + float rps = (float)config.donerequests/dt; + printf("%s: %.2f\r", config.title, rps); + fflush(stdout); + return 250; /* every 250ms */ +} + int main(int argc, char **argv) { client c; @@ -509,6 +522,7 @@ int main(int argc, char **argv) { config.requests = 10000; config.liveclients = 0; config.el = aeCreateEventLoop(); + aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL); config.keepalive = 1; config.donerequests = 0; config.datasize = 3; @@ -532,7 +546,7 @@ int main(int argc, char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - prepareForBenchmark(); + prepareForBenchmark("IDLE"); c = createClient(); if (!c) exit(1); c->obuf = sdsempty(); @@ -543,25 +557,25 @@ int main(int argc, char **argv) { } do { - prepareForBenchmark(); + prepareForBenchmark("PING"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"PING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("PING (multi bulk)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING (multi bulk)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SET"); c = createClient(); if (!c) exit(1); c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize); @@ -575,106 +589,106 @@ int main(int argc, char **argv) { prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("GET"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("GET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("INCR"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("INCR"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPOP mylist\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SADD"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SADD"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SPOP myset\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("SPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH (again, in order to bench LRANGE)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 100 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 100 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 300 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 300 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 450 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 450 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 600 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 600 elements)"); + endBenchmark(); printf("\n"); } while(config.loop); From 834ef78e27a8690a91d727259aaece611664a368 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 30 Aug 2010 14:44:34 +0200 Subject: [PATCH 03/14] Refactor reply buildup for speed on large multi bulk replies --- src/networking.c | 239 +++++++++++++++++++++++++++-------------------- src/object.c | 1 + src/redis.h | 15 +++ 3 files changed, 156 insertions(+), 99 deletions(-) diff --git a/src/networking.c b/src/networking.c index a39be7c4..da0cd0a1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1,5 +1,4 @@ #include "redis.h" - #include void *dupClientReplyValue(void *o) { @@ -12,7 +11,16 @@ int listMatchObjects(void *a, void *b) { } redisClient *createClient(int fd) { - redisClient *c = zmalloc(sizeof(*c)); + redisClient *c; + + /* Make sure to allocate a multiple of the page size to prevent wasting + * memory. A page size of 4096 is assumed here. We need to compensate + * for the zmalloc overhead of sizeof(size_t) bytes. */ + size_t size = 8192-sizeof(size_t); + redisAssert(size > sizeof(redisClient)); + c = zmalloc(size); + c->buflen = size-sizeof(redisClient); + c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); @@ -53,70 +61,118 @@ redisClient *createClient(int fd) { return c; } -void addReply(redisClient *c, robj *obj) { - if (listLength(c->reply) == 0 && +int _ensureFileEvent(redisClient *c) { + if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c) == AE_ERR) return; + sendReplyToClient, c) == AE_ERR) return REDIS_ERR; + return REDIS_OK; +} - if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { - obj = dupStringObject(obj); - obj->refcount = 0; /* getDecodedObject() will increment the refcount */ +void _addReplyObjectToList(redisClient *c, robj *obj) { + redisAssert(obj->type == REDIS_STRING && + obj->encoding == REDIS_ENCODING_RAW); + listAddNodeTail(c->reply,obj); +} + +void _ensureBufferInReplyList(redisClient *c) { + sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE); + sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */ + listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer)); +} + +void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) { + size_t available = 0; + redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD); + if (listLength(c->reply) > 0) { + robj *o = listNodeValue(listLast(c->reply)); + + /* Make sure to append to a reply node with enough bytes available. */ + if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr); + if (o->type != REDIS_REPLY_NODE || len > available) { + _ensureBufferInReplyList(c); + _addReplyStringToBuffer(c,s,len); + } else { + o->ptr = sdscatlen(o->ptr,s,len); + } + } else { + available = c->buflen-c->bufpos; + if (len > available) { + _ensureBufferInReplyList(c); + _addReplyStringToBuffer(c,s,len); + } else { + memcpy(c->buf+c->bufpos,s,len); + c->bufpos += len; + } + } +} + +void addReply(redisClient *c, robj *obj) { + if (_ensureFileEvent(c) != REDIS_OK) return; + if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { + /* Returns a new object with refcount 1 */ + obj = dupStringObject(obj); + } else { + /* This increments the refcount. */ + obj = getDecodedObject(obj); + } + + if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) { + _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr)); + decrRefCount(obj); + } else { + _addReplyObjectToList(c,obj); } - listAddNodeTail(c->reply,getDecodedObject(obj)); } void addReplySds(redisClient *c, sds s) { - robj *o = createObject(REDIS_STRING,s); - addReply(c,o); - decrRefCount(o); + if (_ensureFileEvent(c) != REDIS_OK) return; + if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) { + _addReplyStringToBuffer(c,s,sdslen(s)); + sdsfree(s); + } else { + _addReplyObjectToList(c,createObject(REDIS_STRING,s)); + } +} + +void addReplyString(redisClient *c, char *s, size_t len) { + if (_ensureFileEvent(c) != REDIS_OK) return; + if (len < REDIS_REPLY_CHUNK_THRESHOLD) { + _addReplyStringToBuffer(c,s,len); + } else { + _addReplyObjectToList(c,createStringObject(s,len)); + } } void addReplyDouble(redisClient *c, double d) { - char buf[128]; - - snprintf(buf,sizeof(buf),"%.17g",d); - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", - (unsigned long) strlen(buf),buf)); + char dbuf[128], sbuf[128]; + int dlen, slen; + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); + addReplyString(c,sbuf,slen); } -void addReplyLongLong(redisClient *c, long long ll) { +void _addReplyLongLong(redisClient *c, long long ll, char prefix) { char buf[128]; - size_t len; - - if (ll == 0) { - addReply(c,shared.czero); - return; - } else if (ll == 1) { - addReply(c,shared.cone); - return; - } - buf[0] = ':'; + int len; + buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplySds(c,sdsnewlen(buf,len+3)); + addReplyString(c,buf,len+3); +} + +void addReplyLongLong(redisClient *c, long long ll) { + _addReplyLongLong(c,ll,':'); } void addReplyUlong(redisClient *c, unsigned long ul) { - char buf[128]; - size_t len; - - if (ul == 0) { - addReply(c,shared.czero); - return; - } else if (ul == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%lu\r\n",ul); - addReplySds(c,sdsnewlen(buf,len)); + _addReplyLongLong(c,(long long)ul,':'); } void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len, intlen; - char buf[128]; + size_t len; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -133,11 +189,7 @@ void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - buf[0] = '$'; - intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); - buf[intlen+1] = '\r'; - buf[intlen+2] = '\n'; - addReplySds(c,sdsnewlen(buf,intlen+3)); + _addReplyLongLong(c,len,'$'); } void addReplyBulk(redisClient *c, robj *obj) { @@ -287,34 +339,6 @@ void freeClient(redisClient *c) { zfree(c); } -#define GLUEREPLY_UP_TO (1024) -static void glueReplyBuffersIfNeeded(redisClient *c) { - int copylen = 0; - char buf[GLUEREPLY_UP_TO]; - listNode *ln; - listIter li; - robj *o; - - listRewind(c->reply,&li); - while((ln = listNext(&li))) { - int objlen; - - o = ln->value; - objlen = sdslen(o->ptr); - if (copylen + objlen <= GLUEREPLY_UP_TO) { - memcpy(buf+copylen,o->ptr,objlen); - copylen += objlen; - listDelNode(c->reply,ln); - } else { - if (copylen == 0) return; - break; - } - } - /* Now the output buffer is empty, add the new single element */ - o = createObject(REDIS_STRING,sdsnewlen(buf,copylen)); - listAddNodeHead(c->reply,o); -} - void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@ -331,31 +355,48 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - while(listLength(c->reply)) { - if (server.glueoutputbuf && listLength(c->reply) > 1) - glueReplyBuffersIfNeeded(c); + while(c->bufpos > 0 || listLength(c->reply)) { + if (c->bufpos > 0) { + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = c->bufpos - c->sentlen; + } else { + nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); - - if (objlen == 0) { - listDelNode(c->reply,listFirst(c->reply)); - continue; - } - - if (c->flags & REDIS_MASTER) { - /* Don't reply to a master */ - nwritten = objlen - c->sentlen; + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if (c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } } else { - nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - } - c->sentlen += nwritten; - totwritten += nwritten; - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); + + if (objlen == 0) { + listDelNode(c->reply,listFirst(c->reply)); + continue; + } + + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = objlen - c->sentlen; + } else { + nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve diff --git a/src/object.c b/src/object.c index 92af1d6a..5e8dbfa2 100644 --- a/src/object.c +++ b/src/object.c @@ -196,6 +196,7 @@ void decrRefCount(void *obj) { case REDIS_SET: freeSetObject(o); break; case REDIS_ZSET: freeZsetObject(o); break; case REDIS_HASH: freeHashObject(o); break; + case REDIS_REPLY_NODE: freeStringObject(o); break; default: redisPanic("Unknown object type"); break; } o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */ diff --git a/src/redis.h b/src/redis.h index 9e27d724..e2f69454 100644 --- a/src/redis.h +++ b/src/redis.h @@ -48,6 +48,15 @@ #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ #define REDIS_SHARED_INTEGERS 10000 +/* Size of a reply chunk, configured to exactly allocate 4k bytes */ +#define REDIS_REPLY_CHUNK_BYTES (4*1024) +#define REDIS_REPLY_CHUNK_SIZE (REDIS_REPLY_CHUNK_BYTES-sizeof(struct sdshdr)-1-sizeof(size_t)) +/* It doesn't make sense to memcpy objects to a chunk when the net result is + * not being able to glue other objects. We want to make sure it can be glued + * to at least a bulk length or \r\n, so set the threshold to be a couple + * of bytes less than the size of the buffer. */ +#define REDIS_REPLY_CHUNK_THRESHOLD (REDIS_REPLY_CHUNK_SIZE-16) + /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ #define REDIS_WRITEV_THRESHOLD 3 /* Max number of iovecs used for each writev call */ @@ -72,6 +81,7 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_REPLY_NODE 5 #define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be @@ -309,6 +319,11 @@ typedef struct redisClient { list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + + /* Response buffer */ + int bufpos; + int buflen; + char buf[]; } redisClient; struct saveparam { From b301c1fc2bbf977a7d9fd4718cd9914113541c75 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 30 Aug 2010 16:02:06 +0200 Subject: [PATCH 04/14] Wrapper for adding unknown multi bulk length to reply list --- src/config.c | 7 ++----- src/db.c | 6 ++---- src/networking.c | 28 ++++++++++++++++++++++++++++ src/redis.h | 2 ++ src/t_hash.c | 11 ++++------- src/t_set.c | 9 ++++----- src/t_zset.c | 12 +++++------- 7 files changed, 47 insertions(+), 28 deletions(-) diff --git a/src/config.c b/src/config.c index e1b743db..5c449886 100644 --- a/src/config.c +++ b/src/config.c @@ -332,13 +332,10 @@ badfmt: /* Bad format errors */ void configGetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[2]); - robj *lenobj = createObject(REDIS_STRING,NULL); + void *replylen = addDeferredMultiBulkLength(c); char *pattern = o->ptr; int matches = 0; - addReply(c,lenobj); - decrRefCount(lenobj); - if (stringmatch(pattern,"dbfilename",0)) { addReplyBulkCString(c,"dbfilename"); addReplyBulkCString(c,server.dbfilename); @@ -410,7 +407,7 @@ void configGetCommand(redisClient *c) { matches++; } decrRefCount(o); - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); + setDeferredMultiBulkLength(c,replylen,matches*2); } void configCommand(redisClient *c) { diff --git a/src/db.c b/src/db.c index 6d287d72..8c6c6bc8 100644 --- a/src/db.c +++ b/src/db.c @@ -223,11 +223,9 @@ void keysCommand(redisClient *c) { sds pattern = c->argv[1]->ptr; int plen = sdslen(pattern); unsigned long numkeys = 0; - robj *lenobj = createObject(REDIS_STRING,NULL); + void *replylen = addDeferredMultiBulkLength(c); di = dictGetIterator(c->db->dict); - addReply(c,lenobj); - decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { sds key = dictGetEntryKey(de); robj *keyobj; @@ -243,7 +241,7 @@ void keysCommand(redisClient *c) { } } dictReleaseIterator(di); - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys); + setDeferredMultiBulkLength(c,replylen,numkeys); } void dbsizeCommand(redisClient *c) { diff --git a/src/networking.c b/src/networking.c index da0cd0a1..464d5e02 100644 --- a/src/networking.c +++ b/src/networking.c @@ -145,6 +145,34 @@ void addReplyString(redisClient *c, char *s, size_t len) { } } +/* Adds an empty object to the reply list that will contain the multi bulk + * length, which is not known when this function is called. */ +void *addDeferredMultiBulkLength(redisClient *c) { + if (_ensureFileEvent(c) != REDIS_OK) return NULL; + _addReplyObjectToList(c,createObject(REDIS_STRING,NULL)); + return listLast(c->reply); +} + +/* Populate the length object and try glueing it to the next chunk. */ +void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { + listNode *ln = (listNode*)node; + robj *len, *next; + + /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ + if (node == NULL) return; + + len = listNodeValue(ln); + len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); + if (ln->next != NULL) { + next = listNodeValue(ln->next); + /* Only glue when the next node is a reply chunk. */ + if (next->type == REDIS_REPLY_NODE) { + len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); + listDelNode(c->reply,ln->next); + } + } +} + void addReplyDouble(redisClient *c, double d) { char dbuf[128], sbuf[128]; int dlen, slen; diff --git a/src/redis.h b/src/redis.h index e2f69454..752d56c3 100644 --- a/src/redis.h +++ b/src/redis.h @@ -603,6 +603,8 @@ void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); +void *addDeferredMultiBulkLength(redisClient *c); +void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); diff --git a/src/t_hash.c b/src/t_hash.c index b6be284f..c8be72f2 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -350,17 +350,15 @@ void hlenCommand(redisClient *c) { } void genericHgetallCommand(redisClient *c, int flags) { - robj *o, *lenobj, *obj; + robj *o, *obj; unsigned long count = 0; hashTypeIterator *hi; + void *replylen = NULL; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_HASH)) return; - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - + replylen = addDeferredMultiBulkLength(c); hi = hashTypeInitIterator(o); while (hashTypeNext(hi) != REDIS_ERR) { if (flags & REDIS_HASH_KEY) { @@ -377,8 +375,7 @@ void genericHgetallCommand(redisClient *c, int flags) { } } hashTypeReleaseIterator(hi); - - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); + setDeferredMultiBulkLength(c,replylen,count); } void hkeysCommand(redisClient *c) { diff --git a/src/t_set.c b/src/t_set.c index 68e13227..d6041e72 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -320,7 +320,8 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *lenobj = NULL, *dstset = NULL; + robj *ele, *dstset = NULL; + void *replylen = NULL; unsigned long j, cardinality = 0; for (j = 0; j < setnum; j++) { @@ -356,9 +357,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ @@ -400,7 +399,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, touchWatchedKey(c->db,dstkey); server.dirty++; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); + setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } diff --git a/src/t_zset.c b/src/t_zset.c index e93e5c40..d25b1a66 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -866,7 +866,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { zset *zsetobj = o->ptr; zskiplist *zsl = zsetobj->zsl; zskiplistNode *ln; - robj *ele, *lenobj = NULL; + robj *ele; + void *replylen = NULL; unsigned long rangelen = 0; /* Get the first node with the score >= min, or with @@ -884,11 +885,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { * are in the list, so we push this object that will represent * the multi-bulk length in the output buffer, and will "fix" * it later */ - if (!justcount) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - } + if (!justcount) + replylen = addDeferredMultiBulkLength(c); while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { if (offset) { @@ -910,7 +908,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (justcount) { addReplyLongLong(c,(long)rangelen); } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", + setDeferredMultiBulkLength(c,replylen, withscores ? (rangelen*2) : rangelen); } } From 57b0738011007e47ebe25d5c81acfe333c561e02 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 30 Aug 2010 16:51:39 +0200 Subject: [PATCH 05/14] Don't build a reply when replaying the AOF --- src/aof.c | 8 +++++--- src/networking.c | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/aof.c b/src/aof.c index 8f2dc96f..1ed2363a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -272,12 +272,14 @@ int loadAppendOnlyFile(char *filename) { fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient); - /* Discard the reply objects list from the fake client */ - while(listLength(fakeClient->reply)) - listDelNode(fakeClient->reply,listFirst(fakeClient->reply)); + + /* The fake client should not have a reply */ + redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); + /* Clean up, ready for the next command */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); + /* Handle swapping while loading big datasets when VM is on */ force_swapout = 0; if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) diff --git a/src/networking.c b/src/networking.c index 464d5e02..971cbfc1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -62,6 +62,7 @@ redisClient *createClient(int fd) { } int _ensureFileEvent(redisClient *c) { + if (c->fd <= 0) return REDIS_ERR; if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && From 0537e7bf8042cf9954d3b0abab567edf3b5c0516 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 12:38:34 +0200 Subject: [PATCH 06/14] Use specialized function to add multi bulk reply length --- src/multi.c | 2 +- src/networking.c | 4 ++++ src/redis.h | 1 + src/sort.c | 2 +- src/t_hash.c | 2 +- src/t_list.c | 6 +++--- src/t_set.c | 2 +- src/t_string.c | 2 +- src/t_zset.c | 3 +-- 9 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/multi.c b/src/multi.c index def1dd67..c85516df 100644 --- a/src/multi.c +++ b/src/multi.c @@ -107,7 +107,7 @@ void execCommand(redisClient *c) { unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); + addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; diff --git a/src/networking.c b/src/networking.c index 971cbfc1..d2a4e231 100644 --- a/src/networking.c +++ b/src/networking.c @@ -200,6 +200,10 @@ void addReplyUlong(redisClient *c, unsigned long ul) { _addReplyLongLong(c,(long long)ul,':'); } +void addReplyMultiBulkLen(redisClient *c, long length) { + _addReplyLongLong(c,length,'*'); +} + void addReplyBulkLen(redisClient *c, robj *obj) { size_t len; diff --git a/src/redis.h b/src/redis.h index 752d56c3..6ee1d2e3 100644 --- a/src/redis.h +++ b/src/redis.h @@ -617,6 +617,7 @@ void addReplySds(redisClient *c, sds s); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); void addReplyUlong(redisClient *c, unsigned long ul); +void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); /* List data type */ diff --git a/src/sort.c b/src/sort.c index aa1ce929..f53ad486 100644 --- a/src/sort.c +++ b/src/sort.c @@ -307,7 +307,7 @@ void sortCommand(redisClient *c) { outputlen = getop ? getop*(end-start+1) : end-start+1; if (storekey == NULL) { /* STORE option not specified, sent the sorting result to client */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); + addReplyMultiBulkLen(c,outputlen); for (j = start; j <= end; j++) { listNode *ln; listIter li; diff --git a/src/t_hash.c b/src/t_hash.c index c8be72f2..ad5d3e1e 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -315,7 +315,7 @@ void hmgetCommand(redisClient *c) { /* Note the check for o != NULL happens inside the loop. This is * done because objects that cannot be found are considered to be * an empty hash. The reply should then be a series of NULLs. */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2)); + addReplyMultiBulkLen(c,c->argc-2); for (i = 2; i < c->argc; i++) { if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) { addReplyBulk(c,value); diff --git a/src/t_list.c b/src/t_list.c index 2a981033..db9ca18e 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -494,7 +494,7 @@ void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + addReplyMultiBulkLen(c,rangelen); listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { redisAssert(listTypeNext(li,&entry)); @@ -772,7 +772,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplySds(receiver,sdsnew("*2\r\n")); + addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); unblockClientWaitingData(receiver); @@ -811,7 +811,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { * "real" command will add the last element (the value) * for us. If this souds like an hack to you it's just * because it is... */ - addReplySds(c,sdsnew("*2\r\n")); + addReplyMultiBulkLen(c,2); addReplyBulk(c,argv[1]); popGenericCommand(c,where); diff --git a/src/t_set.c b/src/t_set.c index d6041e72..17cac934 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -469,7 +469,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); while((ele = setTypeNext(si)) != NULL) { addReplyBulk(c,ele); diff --git a/src/t_string.c b/src/t_string.c index 3b8a39bb..411687a5 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -79,7 +79,7 @@ void getsetCommand(redisClient *c) { void mgetCommand(redisClient *c) { int j; - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); + addReplyMultiBulkLen(c,c->argc-1); for (j = 1; j < c->argc; j++) { robj *o = lookupKeyRead(c->db,c->argv[j]); if (o == NULL) { diff --git a/src/t_zset.c b/src/t_zset.c index d25b1a66..7de63158 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -782,8 +782,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { } /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n", - withscores ? (rangelen*2) : rangelen)); + addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen); for (j = 0; j < rangelen; j++) { ele = ln->obj; addReplyBulk(c,ele); From 2403fc9fdec6113f10aa54770714e550eaab1b69 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 14:17:53 +0200 Subject: [PATCH 07/14] Intialize bufpos in the fake client --- src/aof.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/aof.c b/src/aof.c index 1ed2363a..58dd5538 100644 --- a/src/aof.c +++ b/src/aof.c @@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) { c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; + c->bufpos = 0; c->flags = 0; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ From cd76bb651ddc9168451e6729fdf7793eb628f57c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 14:19:15 +0200 Subject: [PATCH 08/14] Free the sds in addReplySds when it cannot be added to the reply --- src/networking.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index d2a4e231..89613ced 100644 --- a/src/networking.c +++ b/src/networking.c @@ -128,7 +128,11 @@ void addReply(redisClient *c, robj *obj) { } void addReplySds(redisClient *c, sds s) { - if (_ensureFileEvent(c) != REDIS_OK) return; + if (_ensureFileEvent(c) != REDIS_OK) { + /* The caller expects the sds to be free'd. */ + sdsfree(s); + return; + } if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) { _addReplyStringToBuffer(c,s,sdslen(s)); sdsfree(s); From b70d355521fd02737c4de2a1583025699f1554f8 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 14:30:56 +0200 Subject: [PATCH 09/14] Use existing reply functions where possible --- src/db.c | 6 ++---- src/sort.c | 2 +- src/t_hash.c | 2 +- src/t_list.c | 6 +++--- src/t_set.c | 2 +- src/t_string.c | 2 +- src/t_zset.c | 2 +- 7 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/db.c b/src/db.c index 8c6c6bc8..4d119cf2 100644 --- a/src/db.c +++ b/src/db.c @@ -245,13 +245,11 @@ void keysCommand(redisClient *c) { } void dbsizeCommand(redisClient *c) { - addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); + addReplyLongLong(c,dictSize(c->db->dict)); } void lastsaveCommand(redisClient *c) { - addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave)); + addReplyLongLong(c,server.lastsave); } void typeCommand(redisClient *c) { diff --git a/src/sort.c b/src/sort.c index f53ad486..79f79010 100644 --- a/src/sort.c +++ b/src/sort.c @@ -369,7 +369,7 @@ void sortCommand(redisClient *c) { * replaced. */ server.dirty += 1+outputlen; touchWatchedKey(c->db,storekey); - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen)); + addReplyLongLong(c,outputlen); } /* Cleanup */ diff --git a/src/t_hash.c b/src/t_hash.c index ad5d3e1e..5745f88c 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -346,7 +346,7 @@ void hlenCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReplyUlong(c,hashTypeLength(o)); + addReplyLongLong(c,hashTypeLength(o)); } void genericHgetallCommand(redisClient *c, int flags) { diff --git a/src/t_list.c b/src/t_list.c index db9ca18e..4d948294 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -342,7 +342,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { server.dirty++; } - addReplyUlong(c,listTypeLength(subject)); + addReplyLongLong(c,listTypeLength(subject)); } void lpushxCommand(redisClient *c) { @@ -366,7 +366,7 @@ void linsertCommand(redisClient *c) { void llenCommand(redisClient *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,REDIS_LIST)) return; - addReplyUlong(c,listTypeLength(o)); + addReplyLongLong(c,listTypeLength(o)); } void lindexCommand(redisClient *c) { @@ -594,7 +594,7 @@ void lremCommand(redisClient *c) { decrRefCount(obj); if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); + addReplyLongLong(c,removed); if (removed) touchWatchedKey(c->db,c->argv[1]); } diff --git a/src/t_set.c b/src/t_set.c index 17cac934..e2ac5ae5 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -276,7 +276,7 @@ void scardCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - addReplyUlong(c,setTypeSize(o)); + addReplyLongLong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { diff --git a/src/t_string.c b/src/t_string.c index 411687a5..276f4dab 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -211,7 +211,7 @@ void appendCommand(redisClient *c) { } touchWatchedKey(c->db,c->argv[1]); server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen)); + addReplyLongLong(c,totlen); } void substrCommand(redisClient *c) { diff --git a/src/t_zset.c b/src/t_zset.c index 7de63158..6a332c6a 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -930,7 +930,7 @@ void zcardCommand(redisClient *c) { checkType(c,o,REDIS_ZSET)) return; zs = o->ptr; - addReplyUlong(c,zs->zsl->length); + addReplyLongLong(c,zs->zsl->length); } void zscoreCommand(redisClient *c) { From 4a7893ca9ce334f2a144faa96ef02113bef4b2b2 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 14:31:25 +0200 Subject: [PATCH 10/14] Removed unneeded function --- src/networking.c | 4 ---- src/redis.h | 1 - 2 files changed, 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 89613ced..f37ecac1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -200,10 +200,6 @@ void addReplyLongLong(redisClient *c, long long ll) { _addReplyLongLong(c,ll,':'); } -void addReplyUlong(redisClient *c, unsigned long ul) { - _addReplyLongLong(c,(long long)ul,':'); -} - void addReplyMultiBulkLen(redisClient *c, long length) { _addReplyLongLong(c,length,'*'); } diff --git a/src/redis.h b/src/redis.h index 6ee1d2e3..ea05fcd0 100644 --- a/src/redis.h +++ b/src/redis.h @@ -616,7 +616,6 @@ void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); -void addReplyUlong(redisClient *c, unsigned long ul); void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); From 36c19d03e08b94ea1bc246918cbd71ea810d38aa Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 19:18:55 +0200 Subject: [PATCH 11/14] Changed reply buildup internals --- src/networking.c | 151 ++++++++++++++++++++++++++++++----------------- src/object.c | 1 - src/redis.h | 11 +--- 3 files changed, 99 insertions(+), 64 deletions(-) diff --git a/src/networking.c b/src/networking.c index f37ecac1..dd005335 100644 --- a/src/networking.c +++ b/src/networking.c @@ -13,13 +13,9 @@ int listMatchObjects(void *a, void *b) { redisClient *createClient(int fd) { redisClient *c; - /* Make sure to allocate a multiple of the page size to prevent wasting - * memory. A page size of 4096 is assumed here. We need to compensate - * for the zmalloc overhead of sizeof(size_t) bytes. */ - size_t size = 8192-sizeof(size_t); - redisAssert(size > sizeof(redisClient)); - c = zmalloc(size); - c->buflen = size-sizeof(redisClient); + /* Allocate more space to hold a static write buffer. */ + c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES); + c->buflen = REDIS_REPLY_CHUNK_BYTES; c->bufpos = 0; anetNonBlock(NULL,fd); @@ -71,40 +67,95 @@ int _ensureFileEvent(redisClient *c) { return REDIS_OK; } -void _addReplyObjectToList(redisClient *c, robj *obj) { - redisAssert(obj->type == REDIS_STRING && - obj->encoding == REDIS_ENCODING_RAW); - listAddNodeTail(c->reply,obj); +/* Create a duplicate of the last object in the reply list when + * it is not exclusively owned by the reply list. */ +robj *dupLastObjectIfNeeded(list *reply) { + robj *new, *cur; + listNode *ln; + redisAssert(listLength(reply) > 0); + ln = listLast(reply); + cur = listNodeValue(ln); + if (cur->refcount > 1) { + new = dupStringObject(cur); + decrRefCount(cur); + listNodeValue(ln) = new; + } + return listNodeValue(ln); } -void _ensureBufferInReplyList(redisClient *c) { - sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE); - sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */ - listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer)); +int _addReplyToBuffer(redisClient *c, char *s, size_t len) { + size_t available = c->buflen-c->bufpos; + + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return REDIS_ERR; + + /* Check that the buffer has enough space available for this string. */ + if (len > available) return REDIS_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + return REDIS_OK; } -void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) { - size_t available = 0; - redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD); - if (listLength(c->reply) > 0) { - robj *o = listNodeValue(listLast(c->reply)); - - /* Make sure to append to a reply node with enough bytes available. */ - if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr); - if (o->type != REDIS_REPLY_NODE || len > available) { - _ensureBufferInReplyList(c); - _addReplyStringToBuffer(c,s,len); - } else { - o->ptr = sdscatlen(o->ptr,s,len); - } +void _addReplyObjectToList(redisClient *c, robj *o) { + robj *tail; + if (listLength(c->reply) == 0) { + incrRefCount(o); + listAddNodeTail(c->reply,o); } else { - available = c->buflen-c->bufpos; - if (len > available) { - _ensureBufferInReplyList(c); - _addReplyStringToBuffer(c,s,len); + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); } else { - memcpy(c->buf+c->bufpos,s,len); - c->bufpos += len; + incrRefCount(o); + listAddNodeTail(c->reply,o); + } + } +} + +/* This method takes responsibility over the sds. When it is no longer + * needed it will be free'd, otherwise it ends up in a robj. */ +void _addReplySdsToList(redisClient *c, sds s) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); + sdsfree(s); + } else { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } + } +} + +void _addReplyStringToList(redisClient *c, char *s, size_t len) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createStringObject(s,len)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,len); + } else { + listAddNodeTail(c->reply,createStringObject(s,len)); } } } @@ -118,13 +169,9 @@ void addReply(redisClient *c, robj *obj) { /* This increments the refcount. */ obj = getDecodedObject(obj); } - - if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); - } else { + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); - } + decrRefCount(obj); } void addReplySds(redisClient *c, sds s) { @@ -133,28 +180,25 @@ void addReplySds(redisClient *c, sds s) { sdsfree(s); return; } - if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,s,sdslen(s)); + if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { sdsfree(s); } else { - _addReplyObjectToList(c,createObject(REDIS_STRING,s)); + /* This method free's the sds when it is no longer needed. */ + _addReplySdsToList(c,s); } } void addReplyString(redisClient *c, char *s, size_t len) { if (_ensureFileEvent(c) != REDIS_OK) return; - if (len < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,s,len); - } else { - _addReplyObjectToList(c,createStringObject(s,len)); - } + if (_addReplyToBuffer(c,s,len) != REDIS_OK) + _addReplyStringToList(c,s,len); } /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addDeferredMultiBulkLength(redisClient *c) { if (_ensureFileEvent(c) != REDIS_OK) return NULL; - _addReplyObjectToList(c,createObject(REDIS_STRING,NULL)); + listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); return listLast(c->reply); } @@ -170,9 +214,10 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); if (ln->next != NULL) { next = listNodeValue(ln->next); - /* Only glue when the next node is a reply chunk. */ - if (next->type == REDIS_REPLY_NODE) { - len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); + + /* Only glue when the next node is an sds */ + if (next->ptr != NULL) { + len->ptr = sdscat(len->ptr,next->ptr); listDelNode(c->reply,ln->next); } } diff --git a/src/object.c b/src/object.c index 5e8dbfa2..92af1d6a 100644 --- a/src/object.c +++ b/src/object.c @@ -196,7 +196,6 @@ void decrRefCount(void *obj) { case REDIS_SET: freeSetObject(o); break; case REDIS_ZSET: freeZsetObject(o); break; case REDIS_HASH: freeHashObject(o); break; - case REDIS_REPLY_NODE: freeStringObject(o); break; default: redisPanic("Unknown object type"); break; } o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */ diff --git a/src/redis.h b/src/redis.h index ea05fcd0..328df08d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -47,15 +47,7 @@ #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ #define REDIS_SHARED_INTEGERS 10000 - -/* Size of a reply chunk, configured to exactly allocate 4k bytes */ -#define REDIS_REPLY_CHUNK_BYTES (4*1024) -#define REDIS_REPLY_CHUNK_SIZE (REDIS_REPLY_CHUNK_BYTES-sizeof(struct sdshdr)-1-sizeof(size_t)) -/* It doesn't make sense to memcpy objects to a chunk when the net result is - * not being able to glue other objects. We want to make sure it can be glued - * to at least a bulk length or \r\n, so set the threshold to be a couple - * of bytes less than the size of the buffer. */ -#define REDIS_REPLY_CHUNK_THRESHOLD (REDIS_REPLY_CHUNK_SIZE-16) +#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ #define REDIS_WRITEV_THRESHOLD 3 @@ -81,7 +73,6 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 -#define REDIS_REPLY_NODE 5 #define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be From 60361e5aac5b06ab06f4a63439ce84cd58c87f3d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 19:35:07 +0200 Subject: [PATCH 12/14] Add sds function that can be called with va_list --- src/sds.c | 19 +++++++++++++------ src/sds.h | 2 ++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/sds.c b/src/sds.c index a0ebb059..2f3ffedc 100644 --- a/src/sds.c +++ b/src/sds.c @@ -33,7 +33,6 @@ #include "sds.h" #include #include -#include #include #include #include "zmalloc.h" @@ -156,8 +155,8 @@ sds sdscpy(sds s, char *t) { return sdscpylen(s, t, strlen(t)); } -sds sdscatprintf(sds s, const char *fmt, ...) { - va_list ap; +sds sdscatvprintf(sds s, const char *fmt, va_list ap) { + va_list cpy; char *buf, *t; size_t buflen = 16; @@ -169,9 +168,8 @@ sds sdscatprintf(sds s, const char *fmt, ...) { if (buf == NULL) return NULL; #endif buf[buflen-2] = '\0'; - va_start(ap, fmt); - vsnprintf(buf, buflen, fmt, ap); - va_end(ap); + va_copy(cpy,ap); + vsnprintf(buf, buflen, fmt, cpy); if (buf[buflen-2] != '\0') { zfree(buf); buflen *= 2; @@ -184,6 +182,15 @@ sds sdscatprintf(sds s, const char *fmt, ...) { return t; } +sds sdscatprintf(sds s, const char *fmt, ...) { + va_list ap; + char *t; + va_start(ap, fmt); + t = sdscatvprintf(s,fmt,ap); + va_end(ap); + return t; +} + sds sdstrim(sds s, const char *cset) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); char *start, *end, *sp, *ep; diff --git a/src/sds.h b/src/sds.h index a0e224f5..ae0f84fb 100644 --- a/src/sds.h +++ b/src/sds.h @@ -32,6 +32,7 @@ #define __SDS_H #include +#include typedef char *sds; @@ -53,6 +54,7 @@ sds sdscat(sds s, char *t); sds sdscpylen(sds s, char *t, size_t len); sds sdscpy(sds s, char *t); +sds sdscatvprintf(sds s, const char *fmt, va_list ap); #ifdef __GNUC__ sds sdscatprintf(sds s, const char *fmt, ...) __attribute__((format(printf, 2, 3))); From 3ab203762f28ffec4036dc4f5a188d637cf78ff1 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 19:52:24 +0200 Subject: [PATCH 13/14] Use specialized function to add status and error replies --- src/aof.c | 5 ++--- src/config.c | 23 ++++++++++------------- src/db.c | 28 +++++++++++++--------------- src/debug.c | 33 ++++++++++++++++----------------- src/multi.c | 8 ++++---- src/networking.c | 38 ++++++++++++++++++++++++++++++++++++++ src/object.c | 12 ++++++------ src/redis.c | 25 +++++++++++-------------- src/redis.h | 12 ++++++++++++ src/replication.c | 6 +++--- src/t_hash.c | 2 +- src/t_string.c | 4 ++-- src/t_zset.c | 9 ++++----- 13 files changed, 122 insertions(+), 83 deletions(-) diff --git a/src/aof.c b/src/aof.c index 58dd5538..b639eb52 100644 --- a/src/aof.c +++ b/src/aof.c @@ -632,12 +632,11 @@ int rewriteAppendOnlyFileBackground(void) { void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { - addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n")); + addReplyError(c,"Background append only file rewriting already in progress"); return; } if (rewriteAppendOnlyFileBackground() == REDIS_OK) { - char *status = "+Background append only file rewriting started\r\n"; - addReplySds(c,sdsnew(status)); + addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); } diff --git a/src/config.c b/src/config.c index 5c449886..8a5ad6c2 100644 --- a/src/config.c +++ b/src/config.c @@ -270,8 +270,8 @@ void configSetCommand(redisClient *c) { stopAppendOnly(); } else { if (startAppendOnly() == REDIS_ERR) { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Unable to turn on AOF. Check server logs.\r\n")); + addReplyError(c, + "Unable to turn on AOF. Check server logs."); decrRefCount(o); return; } @@ -312,9 +312,8 @@ void configSetCommand(redisClient *c) { } sdsfreesplitres(v,vlen); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR not supported CONFIG parameter %s\r\n", - (char*)c->argv[2]->ptr)); + addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", + (char*)c->argv[2]->ptr); decrRefCount(o); return; } @@ -323,10 +322,9 @@ void configSetCommand(redisClient *c) { return; badfmt: /* Bad format errors */ - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'", (char*)o->ptr, - (char*)c->argv[2]->ptr)); + (char*)c->argv[2]->ptr); decrRefCount(o); } @@ -425,13 +423,12 @@ void configCommand(redisClient *c) { server.stat_starttime = time(NULL); addReply(c,shared.ok); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n")); + addReplyError(c, + "CONFIG subcommand must be one of GET, SET, RESETSTAT"); } return; badarity: - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Wrong number of arguments for CONFIG %s\r\n", - (char*) c->argv[1]->ptr)); + addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s", + (char*) c->argv[1]->ptr); } diff --git a/src/db.c b/src/db.c index 4d119cf2..4f1572a6 100644 --- a/src/db.c +++ b/src/db.c @@ -199,7 +199,7 @@ void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); + addReplyError(c,"invalid DB index"); } else { addReply(c,shared.ok); } @@ -258,24 +258,23 @@ void typeCommand(redisClient *c) { o = lookupKeyRead(c->db,c->argv[1]); if (o == NULL) { - type = "+none"; + type = "none"; } else { switch(o->type) { - case REDIS_STRING: type = "+string"; break; - case REDIS_LIST: type = "+list"; break; - case REDIS_SET: type = "+set"; break; - case REDIS_ZSET: type = "+zset"; break; - case REDIS_HASH: type = "+hash"; break; - default: type = "+unknown"; break; + case REDIS_STRING: type = "string"; break; + case REDIS_LIST: type = "list"; break; + case REDIS_SET: type = "set"; break; + case REDIS_ZSET: type = "zset"; break; + case REDIS_HASH: type = "hash"; break; + default: type = "unknown"; break; } } - addReplySds(c,sdsnew(type)); - addReply(c,shared.crlf); + addReplyStatus(c,type); } void saveCommand(redisClient *c) { if (server.bgsavechildpid != -1) { - addReplySds(c,sdsnew("-ERR background save in progress\r\n")); + addReplyError(c,"Background save already in progress"); return; } if (rdbSave(server.dbfilename) == REDIS_OK) { @@ -287,12 +286,11 @@ void saveCommand(redisClient *c) { void bgsaveCommand(redisClient *c) { if (server.bgsavechildpid != -1) { - addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); + addReplyError(c,"Background save already in progress"); return; } if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { - char *status = "+Background saving started\r\n"; - addReplySds(c,sdsnew(status)); + addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); } @@ -301,7 +299,7 @@ void bgsaveCommand(redisClient *c) { void shutdownCommand(redisClient *c) { if (prepareForShutdown() == REDIS_OK) exit(0); - addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); + addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); } void renameGenericCommand(redisClient *c, int nx) { diff --git a/src/debug.c b/src/debug.c index 76d18b21..2f7ab58f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -211,18 +211,18 @@ void debugCommand(redisClient *c) { char *strenc; strenc = strEncoding(val->encoding); - addReplySds(c,sdscatprintf(sdsempty(), - "+Value at:%p refcount:%d " - "encoding:%s serializedlength:%lld\r\n", + addReplyStatusFormat(c, + "Value at:%p refcount:%d " + "encoding:%s serializedlength:%lld", (void*)val, val->refcount, - strenc, (long long) rdbSavedObjectLen(val,NULL))); + strenc, (long long) rdbSavedObjectLen(val,NULL)); } else { vmpointer *vp = (vmpointer*) val; - addReplySds(c,sdscatprintf(sdsempty(), - "+Value swapped at: page %llu " - "using %llu pages\r\n", + addReplyStatusFormat(c, + "Value swapped at: page %llu " + "using %llu pages", (unsigned long long) vp->page, - (unsigned long long) vp->usedpages)); + (unsigned long long) vp->usedpages); } } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { lookupKeyRead(c->db,c->argv[2]); @@ -233,7 +233,7 @@ void debugCommand(redisClient *c) { vmpointer *vp; if (!server.vm_enabled) { - addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); + addReplyError(c,"Virtual Memory is disabled"); return; } if (!de) { @@ -243,9 +243,9 @@ void debugCommand(redisClient *c) { val = dictGetEntryVal(de); /* Swap it */ if (val->storage != REDIS_VM_MEMORY) { - addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); + addReplyError(c,"This key is not in memory"); } else if (val->refcount != 1) { - addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + addReplyError(c,"Object is shared"); } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { dictGetEntryVal(de) = vp; addReply(c,shared.ok); @@ -274,18 +274,17 @@ void debugCommand(redisClient *c) { addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { unsigned char digest[20]; - sds d = sdsnew("+"); + sds d = sdsempty(); int j; computeDatasetDigest(digest); for (j = 0; j < 20; j++) d = sdscatprintf(d, "%02x",digest[j]); - - d = sdscatlen(d,"\r\n",2); - addReplySds(c,d); + addReplyStatus(c,d); + sdsfree(d); } else { - addReplySds(c,sdsnew( - "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]\r\n")); + addReplyError(c, + "Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]"); } } diff --git a/src/multi.c b/src/multi.c index c85516df..47615eb0 100644 --- a/src/multi.c +++ b/src/multi.c @@ -42,7 +42,7 @@ void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { void multiCommand(redisClient *c) { if (c->flags & REDIS_MULTI) { - addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= REDIS_MULTI; @@ -51,7 +51,7 @@ void multiCommand(redisClient *c) { void discardCommand(redisClient *c) { if (!(c->flags & REDIS_MULTI)) { - addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n")); + addReplyError(c,"DISCARD without MULTI"); return; } @@ -82,7 +82,7 @@ void execCommand(redisClient *c) { int orig_argc; if (!(c->flags & REDIS_MULTI)) { - addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n")); + addReplyError(c,"EXEC without MULTI"); return; } @@ -251,7 +251,7 @@ void watchCommand(redisClient *c) { int j; if (c->flags & REDIS_MULTI) { - addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) diff --git a/src/networking.c b/src/networking.c index dd005335..d62456a3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -194,6 +194,44 @@ void addReplyString(redisClient *c, char *s, size_t len) { _addReplyStringToList(c,s,len); } +void _addReplyError(redisClient *c, char *s, size_t len) { + addReplyString(c,"-ERR ",5); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyError(redisClient *c, char *err) { + _addReplyError(c,err,strlen(err)); +} + +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyError(c,s,sdslen(s)); + sdsfree(s); +} + +void _addReplyStatus(redisClient *c, char *s, size_t len) { + addReplyString(c,"+",1); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyStatus(redisClient *c, char *status) { + _addReplyStatus(c,status,strlen(status)); +} + +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyStatus(c,s,sdslen(s)); + sdsfree(s); +} + /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addDeferredMultiBulkLength(redisClient *c) { diff --git a/src/object.c b/src/object.c index 92af1d6a..c1a08245 100644 --- a/src/object.c +++ b/src/object.c @@ -354,9 +354,9 @@ int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const ch double value; if (getDoubleFromObject(o, &value) != REDIS_OK) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is not a double\r\n")); + addReplyError(c,"value is not a double"); } return REDIS_ERR; } @@ -393,9 +393,9 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con long long value; if (getLongLongFromObject(o, &value) != REDIS_OK) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n")); + addReplyError(c,"value is not an integer or out of range"); } return REDIS_ERR; } @@ -410,9 +410,9 @@ int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char * if (getLongLongFromObjectOrReply(c, o, &value, msg) != REDIS_OK) return REDIS_ERR; if (value < LONG_MIN || value > LONG_MAX) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is out of range\r\n")); + addReplyError(c,"value is out of range"); } return REDIS_ERR; } diff --git a/src/redis.c b/src/redis.c index 77e67c58..5af9b235 100644 --- a/src/redis.c +++ b/src/redis.c @@ -909,7 +909,7 @@ int processCommand(redisClient *c) { } else if (c->multibulk) { if (c->bulklen == -1) { if (((char*)c->argv[0]->ptr)[0] != '$') { - addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n")); + addReplyError(c,"multi bulk protocol error"); resetClient(c); return 1; } else { @@ -922,7 +922,7 @@ int processCommand(redisClient *c) { bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@ -975,17 +975,14 @@ int processCommand(redisClient *c) { * such wrong arity, bad command name and so forth. */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - addReplySds(c, - sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n", - (char*)c->argv[0]->ptr)); + addReplyErrorFormat(c,"unknown command '%s'", + (char*)c->argv[0]->ptr); resetClient(c); return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { - addReplySds(c, - sdscatprintf(sdsempty(), - "-ERR wrong number of arguments for '%s' command\r\n", - cmd->name)); + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + cmd->name); resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { @@ -999,7 +996,7 @@ int processCommand(redisClient *c) { bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@ -1026,7 +1023,7 @@ int processCommand(redisClient *c) { /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { - addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + addReplyError(c,"operation not permitted"); resetClient(c); return 1; } @@ -1035,7 +1032,7 @@ int processCommand(redisClient *c) { if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && zmalloc_used_memory() > server.maxmemory) { - addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + addReplyError(c,"command not allowed when used memory > 'maxmemory'"); resetClient(c); return 1; } @@ -1045,7 +1042,7 @@ int processCommand(redisClient *c) { && cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { - addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); + addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); resetClient(c); return 1; } @@ -1109,7 +1106,7 @@ void authCommand(redisClient *c) { addReply(c,shared.ok); } else { c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + addReplyError(c,"invalid password"); } } diff --git a/src/redis.h b/src/redis.h index 328df08d..1ef56288 100644 --- a/src/redis.h +++ b/src/redis.h @@ -605,11 +605,23 @@ void addReplyBulkCString(redisClient *c, char *s); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); +void addReplyError(redisClient *c, char *err); +void addReplyStatus(redisClient *c, char *status); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +#ifdef __GNUC__ +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +#else +void addReplyErrorFormat(redisClient *c, const char *fmt, ...); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...); +#endif + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); diff --git a/src/replication.c b/src/replication.c index c2846088..8c629006 100644 --- a/src/replication.c +++ b/src/replication.c @@ -179,7 +179,7 @@ void syncCommand(redisClient *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { - addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n")); + addReplyError(c,"Can't SYNC while not connected with my master"); return; } @@ -188,7 +188,7 @@ void syncCommand(redisClient *c) { * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { - addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); + addReplyError(c,"SYNC is invalid with pending input"); return; } @@ -226,7 +226,7 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); + addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; diff --git a/src/t_hash.c b/src/t_hash.c index 5745f88c..5cef1cab 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -249,7 +249,7 @@ void hmsetCommand(redisClient *c) { robj *o; if ((c->argc % 2) == 1) { - addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n")); + addReplyError(c,"wrong number of arguments for HMSET"); return; } diff --git a/src/t_string.c b/src/t_string.c index 276f4dab..509c630a 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -12,7 +12,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) return; if (seconds <= 0) { - addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + addReplyError(c,"invalid expire time in SETEX"); return; } } @@ -98,7 +98,7 @@ void msetGenericCommand(redisClient *c, int nx) { int j, busykeys = 0; if ((c->argc % 2) == 0) { - addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n")); + addReplyError(c,"wrong number of arguments for MSET"); return; } /* Handle the NX flag. The MSETNX semantic is to return zero and don't diff --git a/src/t_zset.c b/src/t_zset.c index 6a332c6a..d944e923 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -355,8 +355,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i *score = scoreval; } if (isnan(*score)) { - addReplySds(c, - sdsnew("-ERR resulting score is not a number (NaN)\r\n")); + addReplyError(c,"resulting score is not a number (NaN)"); zfree(score); /* Note that we don't need to check if the zset may be empty and * should be removed here, as we can only obtain Nan as score if @@ -561,7 +560,8 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* expect setnum input keys to be given */ setnum = atoi(c->argv[2]->ptr); if (setnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); + addReplyError(c, + "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); return; } @@ -839,8 +839,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) badsyntax = 1; if (badsyntax) { - addReplySds(c, - sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); + addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE"); return; } From 49128f0b9da725de992e427fa341a837bcc2991b Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 2 Sep 2010 23:34:32 +0200 Subject: [PATCH 14/14] Fix bug in gluing a deferred multi bulk length to the next reply chunk --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index d62456a3..55b7475b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -253,9 +253,9 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { if (ln->next != NULL) { next = listNodeValue(ln->next); - /* Only glue when the next node is an sds */ + /* Only glue when the next node is non-NULL (an sds in this case) */ if (next->ptr != NULL) { - len->ptr = sdscat(len->ptr,next->ptr); + len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); listDelNode(c->reply,ln->next); } }