diff --git a/src/networking.c b/src/networking.c index af742217..58248ced 100644 --- a/src/networking.c +++ b/src/networking.c @@ -33,7 +33,7 @@ #include #include -static void setProtocolError(const char *errstr, client *c, long pos); +static void setProtocolError(const char *errstr, client *c); /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -110,6 +110,7 @@ client *createClient(int fd) { c->fd = fd; c->name = NULL; c->bufpos = 0; + c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; @@ -1119,29 +1120,29 @@ int processInlineBuffer(client *c) { size_t querylen; /* Search for end of line */ - newline = strchr(c->querybuf,'\n'); + newline = strchr(c->querybuf+c->qb_pos,'\n'); /* Nothing to do without a \r\n */ if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big inline request"); - setProtocolError("too big inline request",c,0); + setProtocolError("too big inline request",c); } return C_ERR; } /* Handle the \r\n case. */ - if (newline && newline != c->querybuf && *(newline-1) == '\r') + if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') newline--, linefeed_chars++; /* Split the input buffer up to the \r\n */ - querylen = newline-(c->querybuf); - aux = sdsnewlen(c->querybuf,querylen); + querylen = newline-(c->querybuf+c->qb_pos); + aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); argv = sdssplitargs(aux,&argc); sdsfree(aux); if (argv == NULL) { addReplyError(c,"Protocol error: unbalanced quotes in request"); - setProtocolError("unbalanced quotes in inline request",c,0); + setProtocolError("unbalanced quotes in inline request",c); return C_ERR; } @@ -1151,8 +1152,8 @@ int processInlineBuffer(client *c) { if (querylen == 0 && c->flags & CLIENT_SLAVE) c->repl_ack_time = server.unixtime; - /* Leave data after the first line of the query in the buffer */ - sdsrange(c->querybuf,querylen+linefeed_chars,-1); + /* Move querybuffer position to the next query in the buffer. */ + c->qb_pos += querylen+linefeed_chars; /* Setup argv array on client structure */ if (argc) { @@ -1173,19 +1174,19 @@ int processInlineBuffer(client *c) { return C_OK; } -/* Helper function. Trims query buffer to make the function that processes - * multi bulk requests idempotent. */ +/* Helper function. Record protocol erro details in server log, + * and set the client as CLIENT_CLOSE_AFTER_REPLY. */ #define PROTO_DUMP_LEN 128 -static void setProtocolError(const char *errstr, client *c, long pos) { +static void setProtocolError(const char *errstr, client *c) { if (server.verbosity <= LL_VERBOSE) { sds client = catClientInfoString(sdsempty(),c); /* Sample some protocol to given an idea about what was inside. */ char buf[256]; - if (sdslen(c->querybuf) < PROTO_DUMP_LEN) { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf); + if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { + snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos); } else { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); + snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); } /* Remove non printable chars. */ @@ -1201,7 +1202,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) { sdsfree(client); } c->flags |= CLIENT_CLOSE_AFTER_REPLY; - sdsrange(c->querybuf,pos,-1); } /* Process the query buffer for client 'c', setting up the client argument @@ -1217,7 +1217,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) { * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ int processMultibulkBuffer(client *c) { char *newline = NULL; - long pos = 0; int ok; long long ll; @@ -1226,34 +1225,32 @@ int processMultibulkBuffer(client *c) { serverAssertWithInfo(c,NULL,c->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf,'\r'); + newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big mbulk count string"); - setProtocolError("too big mbulk count string",c,0); + setProtocolError("too big mbulk count string",c); } return C_ERR; } /* Buffer should also contain \n */ - if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) + if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) return C_ERR; /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ - serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); - ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); + serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); + ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > 1024*1024) { addReplyError(c,"Protocol error: invalid multibulk length"); - setProtocolError("invalid mbulk count",c,pos); + setProtocolError("invalid mbulk count",c); return C_ERR; } - pos = (newline-c->querybuf)+2; - if (ll <= 0) { - sdsrange(c->querybuf,pos,-1); - return C_OK; - } + c->qb_pos = (newline-c->querybuf)+2; + + if (ll <= 0) return C_OK; c->multibulklen = ll; @@ -1266,37 +1263,37 @@ int processMultibulkBuffer(client *c) { while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+pos,'\r'); + newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { - if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { + if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { addReplyError(c, "Protocol error: too big bulk count string"); - setProtocolError("too big bulk count string",c,0); + setProtocolError("too big bulk count string",c); return C_ERR; } break; } /* Buffer should also contain \n */ - if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) + if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) break; - if (c->querybuf[pos] != '$') { + if (c->querybuf[c->qb_pos] != '$') { addReplyErrorFormat(c, "Protocol error: expected '$', got '%c'", - c->querybuf[pos]); - setProtocolError("expected $ but got something else",c,pos); + c->querybuf[c->qb_pos]); + setProtocolError("expected $ but got something else",c); return C_ERR; } - ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); + ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { addReplyError(c,"Protocol error: invalid bulk length"); - setProtocolError("invalid bulk length",c,pos); + setProtocolError("invalid bulk length",c); return C_ERR; } - pos += newline-(c->querybuf+pos)+2; + c->qb_pos = newline-c->querybuf+2; if (ll >= PROTO_MBULK_BIG_ARG) { size_t qblen; @@ -1304,8 +1301,8 @@ int processMultibulkBuffer(client *c) { * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ - sdsrange(c->querybuf,pos,-1); - pos = 0; + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; qblen = sdslen(c->querybuf); /* Hint the sds library about the amount of bytes this string is * going to contain. */ @@ -1316,14 +1313,14 @@ int processMultibulkBuffer(client *c) { } /* Read bulk argument */ - if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) { + if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { /* Not enough data (+2 == trailing \r\n) */ break; } else { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ - if (pos == 0 && + if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) { @@ -1333,20 +1330,16 @@ int processMultibulkBuffer(client *c) { * likely... */ c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); sdsclear(c->querybuf); - pos = 0; } else { c->argv[c->argc++] = - createStringObject(c->querybuf+pos,c->bulklen); - pos += c->bulklen+2; + createStringObject(c->querybuf+c->qb_pos,c->bulklen); + c->qb_pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; } } - /* Trim to pos */ - if (pos) sdsrange(c->querybuf,pos,-1); - /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) return C_OK; @@ -1360,8 +1353,9 @@ int processMultibulkBuffer(client *c) { * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { server.current_client = c; + /* Keep processing while there is something in the input buffer */ - while(sdslen(c->querybuf)) { + while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; @@ -1377,7 +1371,7 @@ void processInputBuffer(client *c) { /* Determine request type when unknown. */ if (!c->reqtype) { - if (c->querybuf[0] == '*') { + if (c->querybuf[c->qb_pos] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; @@ -1400,7 +1394,7 @@ void processInputBuffer(client *c) { if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf); + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } /* Don't reset the client structure for clients blocked in a @@ -1416,6 +1410,13 @@ void processInputBuffer(client *c) { if (server.current_client == NULL) break; } } + + /* Trim to pos */ + if (c->qb_pos) { + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; + } + server.current_client = NULL; } diff --git a/src/server.h b/src/server.h index 186d0825..ce127b58 100644 --- a/src/server.h +++ b/src/server.h @@ -710,6 +710,7 @@ typedef struct client { redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ + size_t qb_pos; /* The position we have read in querybuf. */ sds pending_querybuf; /* If this client is flagged as master, this buffer represents the yet not applied portion of the replication stream that we are receiving from diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index f6477d9c..2581eb83 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -1,7 +1,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*} + } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*} test {MONITOR can log executed commands} { set rd [redis_deferring_client]