From 14c4ddb5a62a52e30ee169d36b516a78a410a5b4 Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 14 Aug 2018 00:43:36 +0800
Subject: [PATCH 1/5] pipeline: do not sdsrange querybuf unless all commands
 processed

This is an optimization for processing pipeline, we discussed a
problem in issue #5229: clients may be paused if we apply `CLIENT
PAUSE` command, and then querybuf may grow too large, the cost of
memmove in sdsrange after parsing a completed command will be
horrible. The optimization is that parsing all commands in queyrbuf
, after that we can just call sdsrange only once.
---
 src/networking.c | 87 ++++++++++++++++++++++++++----------------------
 src/server.h     |  1 +
 2 files changed, 48 insertions(+), 40 deletions(-)

diff --git a/src/networking.c b/src/networking.c
index af742217..945fdd96 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -110,6 +110,7 @@ client *createClient(int fd) {
     c->fd = fd;
     c->name = NULL;
     c->bufpos = 0;
+    c->qb_pos = 0;
     c->querybuf = sdsempty();
     c->pending_querybuf = sdsempty();
     c->querybuf_peak = 0;
@@ -1119,11 +1120,11 @@ int processInlineBuffer(client *c) {
     size_t querylen;
 
     /* Search for end of line */
-    newline = strchr(c->querybuf,'\n');
+    newline = strchr(c->querybuf+c->qb_pos,'\n');
 
     /* Nothing to do without a \r\n */
     if (newline == NULL) {
-        if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+        if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
             addReplyError(c,"Protocol error: too big inline request");
             setProtocolError("too big inline request",c,0);
         }
@@ -1131,12 +1132,12 @@ int processInlineBuffer(client *c) {
     }
 
     /* Handle the \r\n case. */
-    if (newline && newline != c->querybuf && *(newline-1) == '\r')
+    if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
         newline--, linefeed_chars++;
 
     /* Split the input buffer up to the \r\n */
-    querylen = newline-(c->querybuf);
-    aux = sdsnewlen(c->querybuf,querylen);
+    querylen = newline-(c->querybuf+c->qb_pos);
+    aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
     argv = sdssplitargs(aux,&argc);
     sdsfree(aux);
     if (argv == NULL) {
@@ -1152,7 +1153,8 @@ int processInlineBuffer(client *c) {
         c->repl_ack_time = server.unixtime;
 
     /* Leave data after the first line of the query in the buffer */
-    sdsrange(c->querybuf,querylen+linefeed_chars,-1);
+    sdsrange(c->querybuf,c->qb_pos+querylen+linefeed_chars,-1);
+    c->qb_pos = 0;
 
     /* Setup argv array on client structure */
     if (argc) {
@@ -1182,10 +1184,10 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
 
         /* Sample some protocol to given an idea about what was inside. */
         char buf[256];
-        if (sdslen(c->querybuf) < PROTO_DUMP_LEN) {
-            snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf);
+        if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
+            snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
         } else {
-            snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
+            snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
         }
 
         /* Remove non printable chars. */
@@ -1202,6 +1204,7 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
     }
     c->flags |= CLIENT_CLOSE_AFTER_REPLY;
     sdsrange(c->querybuf,pos,-1);
+    c->qb_pos -= pos;
 }
 
 /* Process the query buffer for client 'c', setting up the client argument
@@ -1217,7 +1220,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
  * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
 int processMultibulkBuffer(client *c) {
     char *newline = NULL;
-    long pos = 0;
     int ok;
     long long ll;
 
@@ -1226,9 +1228,9 @@ int processMultibulkBuffer(client *c) {
         serverAssertWithInfo(c,NULL,c->argc == 0);
 
         /* Multi bulk length cannot be read without a \r\n */
-        newline = strchr(c->querybuf,'\r');
+        newline = strchr(c->querybuf+c->qb_pos,'\r');
         if (newline == NULL) {
-            if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                 addReplyError(c,"Protocol error: too big mbulk count string");
                 setProtocolError("too big mbulk count string",c,0);
             }
@@ -1236,22 +1238,23 @@ int processMultibulkBuffer(client *c) {
         }
 
         /* Buffer should also contain \n */
-        if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
+        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
             return C_ERR;
 
         /* We know for sure there is a whole line since newline != NULL,
          * so go ahead and find out the multi bulk length. */
-        serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
-        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
+        serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
+        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
         if (!ok || ll > 1024*1024) {
             addReplyError(c,"Protocol error: invalid multibulk length");
-            setProtocolError("invalid mbulk count",c,pos);
+            setProtocolError("invalid mbulk count",c,c->qb_pos);
             return C_ERR;
         }
 
-        pos = (newline-c->querybuf)+2;
+        c->qb_pos = (newline-c->querybuf)+2;
         if (ll <= 0) {
-            sdsrange(c->querybuf,pos,-1);
+            sdsrange(c->querybuf,c->qb_pos,-1);
+            c->qb_pos = 0;
             return C_OK;
         }
 
@@ -1266,9 +1269,9 @@ int processMultibulkBuffer(client *c) {
     while(c->multibulklen) {
         /* Read bulk length if unknown */
         if (c->bulklen == -1) {
-            newline = strchr(c->querybuf+pos,'\r');
+            newline = strchr(c->querybuf+c->qb_pos,'\r');
             if (newline == NULL) {
-                if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
+                if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                     addReplyError(c,
                         "Protocol error: too big bulk count string");
                     setProtocolError("too big bulk count string",c,0);
@@ -1278,25 +1281,25 @@ int processMultibulkBuffer(client *c) {
             }
 
             /* Buffer should also contain \n */
-            if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
+            if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
                 break;
 
-            if (c->querybuf[pos] != '$') {
+            if (c->querybuf[c->qb_pos] != '$') {
                 addReplyErrorFormat(c,
                     "Protocol error: expected '$', got '%c'",
-                    c->querybuf[pos]);
-                setProtocolError("expected $ but got something else",c,pos);
+                    c->querybuf[c->qb_pos]);
+                setProtocolError("expected $ but got something else",c,c->qb_pos);
                 return C_ERR;
             }
 
-            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
+            ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
                 addReplyError(c,"Protocol error: invalid bulk length");
-                setProtocolError("invalid bulk length",c,pos);
+                setProtocolError("invalid bulk length",c,c->qb_pos);
                 return C_ERR;
             }
 
-            pos += newline-(c->querybuf+pos)+2;
+            c->qb_pos = newline-c->querybuf+2;
             if (ll >= PROTO_MBULK_BIG_ARG) {
                 size_t qblen;
 
@@ -1304,8 +1307,8 @@ int processMultibulkBuffer(client *c) {
                  * try to make it likely that it will start at c->querybuf
                  * boundary so that we can optimize object creation
                  * avoiding a large copy of data. */
-                sdsrange(c->querybuf,pos,-1);
-                pos = 0;
+                sdsrange(c->querybuf,c->qb_pos,-1);
+                c->qb_pos = 0;
                 qblen = sdslen(c->querybuf);
                 /* Hint the sds library about the amount of bytes this string is
                  * going to contain. */
@@ -1316,14 +1319,14 @@ int processMultibulkBuffer(client *c) {
         }
 
         /* Read bulk argument */
-        if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) {
+        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
             /* Not enough data (+2 == trailing \r\n) */
             break;
         } else {
             /* Optimization: if the buffer contains JUST our bulk element
              * instead of creating a new object by *copying* the sds we
              * just use the current sds string. */
-            if (pos == 0 &&
+            if (c->qb_pos == 0 &&
                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
             {
@@ -1333,20 +1336,16 @@ int processMultibulkBuffer(client *c) {
                  * likely... */
                 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                 sdsclear(c->querybuf);
-                pos = 0;
             } else {
                 c->argv[c->argc++] =
-                    createStringObject(c->querybuf+pos,c->bulklen);
-                pos += c->bulklen+2;
+                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
+                c->qb_pos += c->bulklen+2;
             }
             c->bulklen = -1;
             c->multibulklen--;
         }
     }
 
-    /* Trim to pos */
-    if (pos) sdsrange(c->querybuf,pos,-1);
-
     /* We're done when c->multibulk == 0 */
     if (c->multibulklen == 0) return C_OK;
 
@@ -1360,8 +1359,9 @@ int processMultibulkBuffer(client *c) {
  * pending query buffer, already representing a full command, to process. */
 void processInputBuffer(client *c) {
     server.current_client = c;
+
     /* Keep processing while there is something in the input buffer */
-    while(sdslen(c->querybuf)) {
+    while(c->qb_pos < sdslen(c->querybuf)) {
         /* Return if clients are paused. */
         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
 
@@ -1377,7 +1377,7 @@ void processInputBuffer(client *c) {
 
         /* Determine request type when unknown. */
         if (!c->reqtype) {
-            if (c->querybuf[0] == '*') {
+            if (c->querybuf[c->qb_pos] == '*') {
                 c->reqtype = PROTO_REQ_MULTIBULK;
             } else {
                 c->reqtype = PROTO_REQ_INLINE;
@@ -1400,7 +1400,7 @@ void processInputBuffer(client *c) {
             if (processCommand(c) == C_OK) {
                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                     /* Update the applied replication offset of our master. */
-                    c->reploff = c->read_reploff - sdslen(c->querybuf);
+                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                 }
 
                 /* Don't reset the client structure for clients blocked in a
@@ -1416,6 +1416,13 @@ void processInputBuffer(client *c) {
             if (server.current_client == NULL) break;
         }
     }
+
+    /* Trim to pos */
+    if (c->qb_pos) {
+        sdsrange(c->querybuf,c->qb_pos,-1);
+        c->qb_pos = 0;
+    }
+
     server.current_client = NULL;
 }
 
diff --git a/src/server.h b/src/server.h
index 186d0825..ce127b58 100644
--- a/src/server.h
+++ b/src/server.h
@@ -710,6 +710,7 @@ typedef struct client {
     redisDb *db;            /* Pointer to currently SELECTed DB. */
     robj *name;             /* As set by CLIENT SETNAME. */
     sds querybuf;           /* Buffer we use to accumulate client queries. */
+    size_t qb_pos;          /* The position we have read in querybuf. */
     sds pending_querybuf;   /* If this client is flagged as master, this buffer
                                represents the yet not applied portion of the
                                replication stream that we are receiving from

From b89302c462bfa410977ed99e1e6fdc64a0fbd031 Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 14 Aug 2018 00:57:22 +0800
Subject: [PATCH 2/5] adjust qbuf to 26 in test case for client list

---
 tests/unit/introspection.tcl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl
index f6477d9c..2581eb83 100644
--- a/tests/unit/introspection.tcl
+++ b/tests/unit/introspection.tcl
@@ -1,7 +1,7 @@
 start_server {tags {"introspection"}} {
     test {CLIENT LIST} {
         r client list
-    } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*}
+    } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*}
 
     test {MONITOR can log executed commands} {
         set rd [redis_deferring_client]

From e623bd22bab45d8ce20d82bb2de5e8ed82f9e7bf Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 14 Aug 2018 13:55:30 +0800
Subject: [PATCH 3/5] networking: just return C_OK if multibulk processing saw
 a <= 0 length.

---
 src/networking.c | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)

diff --git a/src/networking.c b/src/networking.c
index 945fdd96..9e1a3a9e 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1252,11 +1252,8 @@ int processMultibulkBuffer(client *c) {
         }
 
         c->qb_pos = (newline-c->querybuf)+2;
-        if (ll <= 0) {
-            sdsrange(c->querybuf,c->qb_pos,-1);
-            c->qb_pos = 0;
-            return C_OK;
-        }
+
+        if (ll <= 0) return C_OK;
 
         c->multibulklen = ll;
 

From ef2a95c46125a5e5402a2ee1e433f78959596d0c Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Tue, 14 Aug 2018 14:50:37 +0800
Subject: [PATCH 4/5] networking: just move qb_pos instead of sdsrange in
 processInlineBuffer

---
 src/networking.c | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/networking.c b/src/networking.c
index 9e1a3a9e..24432339 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1152,9 +1152,8 @@ int processInlineBuffer(client *c) {
     if (querylen == 0 && c->flags & CLIENT_SLAVE)
         c->repl_ack_time = server.unixtime;
 
-    /* Leave data after the first line of the query in the buffer */
-    sdsrange(c->querybuf,c->qb_pos+querylen+linefeed_chars,-1);
-    c->qb_pos = 0;
+    /* Move querybuffer position to the next query in the buffer. */
+    c->qb_pos += querylen+linefeed_chars;
 
     /* Setup argv array on client structure */
     if (argc) {

From f2ad89a314a0be2ea50a598339ee903ec3f64b65 Mon Sep 17 00:00:00 2001
From: "zhaozhao.zz" <zhaozhao.zz@alibaba-inc.com>
Date: Thu, 23 Aug 2018 12:21:23 +0800
Subject: [PATCH 5/5] networking: make setProtocolError simple and clear

Function setProtocolError just records proctocol error
details in server log, set client as CLIENT_CLOSE_AFTER_REPLY.
It doesn't care about querybuf sdsrange, because we
will do it after procotol parsing.
---
 src/networking.c | 24 +++++++++++-------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git a/src/networking.c b/src/networking.c
index 24432339..58248ced 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -33,7 +33,7 @@
 #include <math.h>
 #include <ctype.h>
 
-static void setProtocolError(const char *errstr, client *c, long pos);
+static void setProtocolError(const char *errstr, client *c);
 
 /* Return the size consumed from the allocator, for the specified SDS string,
  * including internal fragmentation. This function is used in order to compute
@@ -1126,7 +1126,7 @@ int processInlineBuffer(client *c) {
     if (newline == NULL) {
         if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
             addReplyError(c,"Protocol error: too big inline request");
-            setProtocolError("too big inline request",c,0);
+            setProtocolError("too big inline request",c);
         }
         return C_ERR;
     }
@@ -1142,7 +1142,7 @@ int processInlineBuffer(client *c) {
     sdsfree(aux);
     if (argv == NULL) {
         addReplyError(c,"Protocol error: unbalanced quotes in request");
-        setProtocolError("unbalanced quotes in inline request",c,0);
+        setProtocolError("unbalanced quotes in inline request",c);
         return C_ERR;
     }
 
@@ -1174,10 +1174,10 @@ int processInlineBuffer(client *c) {
     return C_OK;
 }
 
-/* Helper function. Trims query buffer to make the function that processes
- * multi bulk requests idempotent. */
+/* Helper function. Record protocol erro details in server log,
+ * and set the client as CLIENT_CLOSE_AFTER_REPLY. */
 #define PROTO_DUMP_LEN 128
-static void setProtocolError(const char *errstr, client *c, long pos) {
+static void setProtocolError(const char *errstr, client *c) {
     if (server.verbosity <= LL_VERBOSE) {
         sds client = catClientInfoString(sdsempty(),c);
 
@@ -1202,8 +1202,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
         sdsfree(client);
     }
     c->flags |= CLIENT_CLOSE_AFTER_REPLY;
-    sdsrange(c->querybuf,pos,-1);
-    c->qb_pos -= pos;
 }
 
 /* Process the query buffer for client 'c', setting up the client argument
@@ -1231,7 +1229,7 @@ int processMultibulkBuffer(client *c) {
         if (newline == NULL) {
             if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                 addReplyError(c,"Protocol error: too big mbulk count string");
-                setProtocolError("too big mbulk count string",c,0);
+                setProtocolError("too big mbulk count string",c);
             }
             return C_ERR;
         }
@@ -1246,7 +1244,7 @@ int processMultibulkBuffer(client *c) {
         ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
         if (!ok || ll > 1024*1024) {
             addReplyError(c,"Protocol error: invalid multibulk length");
-            setProtocolError("invalid mbulk count",c,c->qb_pos);
+            setProtocolError("invalid mbulk count",c);
             return C_ERR;
         }
 
@@ -1270,7 +1268,7 @@ int processMultibulkBuffer(client *c) {
                 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                     addReplyError(c,
                         "Protocol error: too big bulk count string");
-                    setProtocolError("too big bulk count string",c,0);
+                    setProtocolError("too big bulk count string",c);
                     return C_ERR;
                 }
                 break;
@@ -1284,14 +1282,14 @@ int processMultibulkBuffer(client *c) {
                 addReplyErrorFormat(c,
                     "Protocol error: expected '$', got '%c'",
                     c->querybuf[c->qb_pos]);
-                setProtocolError("expected $ but got something else",c,c->qb_pos);
+                setProtocolError("expected $ but got something else",c);
                 return C_ERR;
             }
 
             ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
                 addReplyError(c,"Protocol error: invalid bulk length");
-                setProtocolError("invalid bulk length",c,c->qb_pos);
+                setProtocolError("invalid bulk length",c);
                 return C_ERR;
             }