RESP3: Aggregate deferred lengths functions.

This commit is contained in:
antirez 2018-11-08 12:28:56 +01:00
parent 914ee43108
commit 57c5a766a2
2 changed files with 39 additions and 10 deletions

View File

@ -416,28 +416,28 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) {
/* Adds an empty object to the reply list that will contain the multi bulk
* length, which is not known when this function is called. */
void *addDeferredMultiBulkLength(client *c) {
void *addReplyDeferredLen(client *c) {
/* Note that we install the write event here even if the object is not
* ready to be sent, since we are sure that before returning to the
* event loop setDeferredMultiBulkLength() will be called. */
* event loop setDeferredAggregateLen() will be called. */
if (prepareClientToWrite(c) != C_OK) return NULL;
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
return listLast(c->reply);
}
/* Populate the length object and try gluing it to the next chunk. */
void setDeferredMultiBulkLength(client *c, void *node, long length) {
void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
listNode *ln = (listNode*)node;
clientReplyBlock *next;
char lenstr[128];
size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
/* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addDeferredMultiBulkLength() */
* we return NULL in addReplyDeferredLen() */
if (node == NULL) return;
serverAssert(!listNodeValue(ln));
/* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(),
/* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
* with a new buffer structure containing the protocol needed to specify
* the length of the array following. However sometimes when there is
* little memory to move, we may instead remove this NULL node, and prefix
@ -467,6 +467,30 @@ void setDeferredMultiBulkLength(client *c, void *node, long length) {
asyncCloseClientOnOutputBufferLimitReached(c);
}
void setDeferredArrayLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'*');
}
void setDeferredMapLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'%');
}
void setDeferredSetLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'~');
}
void setDeferredAttributeLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'|');
}
void setDeferredPushLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'>');
}
void setDeferredHelloLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'@');
}
/* Add a double as a bulk reply */
void addReplyDouble(client *c, double d) {
if (isinf(d)) {
@ -627,7 +651,7 @@ void addReplyBulkLongLong(client *c, long long ll) {
* is terminated by NULL sentinel. */
void addReplyHelp(client *c, const char **help) {
sds cmd = sdsnew((char*) c->argv[0]->ptr);
void *blenp = addDeferredMultiBulkLength(c);
void *blenp = addReplyDeferredLen(c);
int blen = 0;
sdstoupper(cmd);
@ -638,7 +662,7 @@ void addReplyHelp(client *c, const char **help) {
while (help[blen]) addReplyStatus(c,help[blen++]);
blen++; /* Account for the header line(s). */
setDeferredMultiBulkLength(c,blenp,blen);
setDeferredArrayLen(c,blenp,blen);
}
/* Add a suggestive error reply.

View File

@ -1424,8 +1424,13 @@ void freeClient(client *c);
void freeClientAsync(client *c);
void resetClient(client *c);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void *addDeferredMultiBulkLength(client *c);
void setDeferredMultiBulkLength(client *c, void *node, long length);
void *addReplyDeferredLen(client *c);
void setDeferredArrayLen(client *c, void *node, long length);
void setDeferredMapLen(client *c, void *node, long length);
void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
void setDeferredHelloLen(client *c, void *node, long length);
void processInputBuffer(client *c);
void processInputBufferAndReplicate(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);