Merge pull request #5244 from soloestoy/optimize-pipeline

pipeline: do not sdsrange querybuf unless all commands processed
This commit is contained in:
Salvatore Sanfilippo 2018-08-26 16:30:49 +02:00 committed by GitHub
commit 80e1695652
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 54 deletions

View File

@ -33,7 +33,7 @@
#include <math.h> #include <math.h>
#include <ctype.h> #include <ctype.h>
static void setProtocolError(const char *errstr, client *c, long pos); static void setProtocolError(const char *errstr, client *c);
/* Return the size consumed from the allocator, for the specified SDS string, /* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute * including internal fragmentation. This function is used in order to compute
@ -110,6 +110,7 @@ client *createClient(int fd) {
c->fd = fd; c->fd = fd;
c->name = NULL; c->name = NULL;
c->bufpos = 0; c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty(); c->querybuf = sdsempty();
c->pending_querybuf = sdsempty(); c->pending_querybuf = sdsempty();
c->querybuf_peak = 0; c->querybuf_peak = 0;
@ -1119,29 +1120,29 @@ int processInlineBuffer(client *c) {
size_t querylen; size_t querylen;
/* Search for end of line */ /* Search for end of line */
newline = strchr(c->querybuf,'\n'); newline = strchr(c->querybuf+c->qb_pos,'\n');
/* Nothing to do without a \r\n */ /* Nothing to do without a \r\n */
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request"); addReplyError(c,"Protocol error: too big inline request");
setProtocolError("too big inline request",c,0); setProtocolError("too big inline request",c);
} }
return C_ERR; return C_ERR;
} }
/* Handle the \r\n case. */ /* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r') if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
newline--, linefeed_chars++; newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */ /* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf); querylen = newline-(c->querybuf+c->qb_pos);
aux = sdsnewlen(c->querybuf,querylen); aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
argv = sdssplitargs(aux,&argc); argv = sdssplitargs(aux,&argc);
sdsfree(aux); sdsfree(aux);
if (argv == NULL) { if (argv == NULL) {
addReplyError(c,"Protocol error: unbalanced quotes in request"); addReplyError(c,"Protocol error: unbalanced quotes in request");
setProtocolError("unbalanced quotes in inline request",c,0); setProtocolError("unbalanced quotes in inline request",c);
return C_ERR; return C_ERR;
} }
@ -1151,8 +1152,8 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && c->flags & CLIENT_SLAVE) if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime; c->repl_ack_time = server.unixtime;
/* Leave data after the first line of the query in the buffer */ /* Move querybuffer position to the next query in the buffer. */
sdsrange(c->querybuf,querylen+linefeed_chars,-1); c->qb_pos += querylen+linefeed_chars;
/* Setup argv array on client structure */ /* Setup argv array on client structure */
if (argc) { if (argc) {
@ -1173,19 +1174,19 @@ int processInlineBuffer(client *c) {
return C_OK; return C_OK;
} }
/* Helper function. Trims query buffer to make the function that processes /* Helper function. Record protocol erro details in server log,
* multi bulk requests idempotent. */ * and set the client as CLIENT_CLOSE_AFTER_REPLY. */
#define PROTO_DUMP_LEN 128 #define PROTO_DUMP_LEN 128
static void setProtocolError(const char *errstr, client *c, long pos) { static void setProtocolError(const char *errstr, client *c) {
if (server.verbosity <= LL_VERBOSE) { if (server.verbosity <= LL_VERBOSE) {
sds client = catClientInfoString(sdsempty(),c); sds client = catClientInfoString(sdsempty(),c);
/* Sample some protocol to given an idea about what was inside. */ /* Sample some protocol to given an idea about what was inside. */
char buf[256]; char buf[256];
if (sdslen(c->querybuf) < PROTO_DUMP_LEN) { if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf); snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos);
} else { } else {
snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
} }
/* Remove non printable chars. */ /* Remove non printable chars. */
@ -1201,7 +1202,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
sdsfree(client); sdsfree(client);
} }
c->flags |= CLIENT_CLOSE_AFTER_REPLY; c->flags |= CLIENT_CLOSE_AFTER_REPLY;
sdsrange(c->querybuf,pos,-1);
} }
/* Process the query buffer for client 'c', setting up the client argument /* Process the query buffer for client 'c', setting up the client argument
@ -1217,7 +1217,6 @@ static void setProtocolError(const char *errstr, client *c, long pos) {
* to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
int processMultibulkBuffer(client *c) { int processMultibulkBuffer(client *c) {
char *newline = NULL; char *newline = NULL;
long pos = 0;
int ok; int ok;
long long ll; long long ll;
@ -1226,34 +1225,32 @@ int processMultibulkBuffer(client *c) {
serverAssertWithInfo(c,NULL,c->argc == 0); serverAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */ /* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r'); newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string"); addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError("too big mbulk count string",c,0); setProtocolError("too big mbulk count string",c);
} }
return C_ERR; return C_ERR;
} }
/* Buffer should also contain \n */ /* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR; return C_ERR;
/* We know for sure there is a whole line since newline != NULL, /* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */ * so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > 1024*1024) { if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length"); addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count",c,pos); setProtocolError("invalid mbulk count",c);
return C_ERR; return C_ERR;
} }
pos = (newline-c->querybuf)+2; c->qb_pos = (newline-c->querybuf)+2;
if (ll <= 0) {
sdsrange(c->querybuf,pos,-1); if (ll <= 0) return C_OK;
return C_OK;
}
c->multibulklen = ll; c->multibulklen = ll;
@ -1266,37 +1263,37 @@ int processMultibulkBuffer(client *c) {
while(c->multibulklen) { while(c->multibulklen) {
/* Read bulk length if unknown */ /* Read bulk length if unknown */
if (c->bulklen == -1) { if (c->bulklen == -1) {
newline = strchr(c->querybuf+pos,'\r'); newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) { if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c, addReplyError(c,
"Protocol error: too big bulk count string"); "Protocol error: too big bulk count string");
setProtocolError("too big bulk count string",c,0); setProtocolError("too big bulk count string",c);
return C_ERR; return C_ERR;
} }
break; break;
} }
/* Buffer should also contain \n */ /* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break; break;
if (c->querybuf[pos] != '$') { if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c, addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'", "Protocol error: expected '$', got '%c'",
c->querybuf[pos]); c->querybuf[c->qb_pos]);
setProtocolError("expected $ but got something else",c,pos); setProtocolError("expected $ but got something else",c);
return C_ERR; return C_ERR;
} }
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
addReplyError(c,"Protocol error: invalid bulk length"); addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError("invalid bulk length",c,pos); setProtocolError("invalid bulk length",c);
return C_ERR; return C_ERR;
} }
pos += newline-(c->querybuf+pos)+2; c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) { if (ll >= PROTO_MBULK_BIG_ARG) {
size_t qblen; size_t qblen;
@ -1304,8 +1301,8 @@ int processMultibulkBuffer(client *c) {
* try to make it likely that it will start at c->querybuf * try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation * boundary so that we can optimize object creation
* avoiding a large copy of data. */ * avoiding a large copy of data. */
sdsrange(c->querybuf,pos,-1); sdsrange(c->querybuf,c->qb_pos,-1);
pos = 0; c->qb_pos = 0;
qblen = sdslen(c->querybuf); qblen = sdslen(c->querybuf);
/* Hint the sds library about the amount of bytes this string is /* Hint the sds library about the amount of bytes this string is
* going to contain. */ * going to contain. */
@ -1316,14 +1313,14 @@ int processMultibulkBuffer(client *c) {
} }
/* Read bulk argument */ /* Read bulk argument */
if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) { if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */ /* Not enough data (+2 == trailing \r\n) */
break; break;
} else { } else {
/* Optimization: if the buffer contains JUST our bulk element /* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we * instead of creating a new object by *copying* the sds we
* just use the current sds string. */ * just use the current sds string. */
if (pos == 0 && if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG && c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2)) sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{ {
@ -1333,20 +1330,16 @@ int processMultibulkBuffer(client *c) {
* likely... */ * likely... */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf); sdsclear(c->querybuf);
pos = 0;
} else { } else {
c->argv[c->argc++] = c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen); createStringObject(c->querybuf+c->qb_pos,c->bulklen);
pos += c->bulklen+2; c->qb_pos += c->bulklen+2;
} }
c->bulklen = -1; c->bulklen = -1;
c->multibulklen--; c->multibulklen--;
} }
} }
/* Trim to pos */
if (pos) sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */ /* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK; if (c->multibulklen == 0) return C_OK;
@ -1360,8 +1353,9 @@ int processMultibulkBuffer(client *c) {
* pending query buffer, already representing a full command, to process. */ * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) { void processInputBuffer(client *c) {
server.current_client = c; server.current_client = c;
/* Keep processing while there is something in the input buffer */ /* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) { while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */ /* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
@ -1377,7 +1371,7 @@ void processInputBuffer(client *c) {
/* Determine request type when unknown. */ /* Determine request type when unknown. */
if (!c->reqtype) { if (!c->reqtype) {
if (c->querybuf[0] == '*') { if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; c->reqtype = PROTO_REQ_MULTIBULK;
} else { } else {
c->reqtype = PROTO_REQ_INLINE; c->reqtype = PROTO_REQ_INLINE;
@ -1400,7 +1394,7 @@ void processInputBuffer(client *c) {
if (processCommand(c) == C_OK) { if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */ /* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf); c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
} }
/* Don't reset the client structure for clients blocked in a /* Don't reset the client structure for clients blocked in a
@ -1416,6 +1410,13 @@ void processInputBuffer(client *c) {
if (server.current_client == NULL) break; if (server.current_client == NULL) break;
} }
} }
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
server.current_client = NULL; server.current_client = NULL;
} }

View File

@ -710,6 +710,7 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */ redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */ robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */ sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the represents the yet not applied portion of the
replication stream that we are receiving from replication stream that we are receiving from

View File

@ -1,7 +1,7 @@
start_server {tags {"introspection"}} { start_server {tags {"introspection"}} {
test {CLIENT LIST} { test {CLIENT LIST} {
r client list r client list
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*} } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* obl=0 oll=0 omem=0 events=r cmd=client*}
test {MONITOR can log executed commands} { test {MONITOR can log executed commands} {
set rd [redis_deferring_client] set rd [redis_deferring_client]