1
0
mirror of https://github.com/fluencelabs/redis synced 2025-03-18 00:20:50 +00:00

slave buffers were wasteful and incorrectly counted causing eviction

A) slave buffers didn't count internal fragmentation and sds unused space,
   this caused them to induce eviction although we didn't mean for it.

B) slave buffers were consuming about twice the memory of what they actually needed.
- this was mainly due to sdsMakeRoomFor growing to twice as much as needed each time
  but networking.c not storing more than 16k (partially fixed recently in 237a38737).
- besides it wasn't able to store half of the new string into one buffer and the
  other half into the next (so the above mentioned fix helped mainly for small items).
- lastly, the sds buffers had up to 30% internal fragmentation that was wasted,
  consumed but not used.

C) inefficient performance due to starting from a small string and reallocing many times.

what i changed:
- creating dedicated buffers for reply list, counting their size with zmalloc_size
- when creating a new reply node from, preallocate it to at least 16k.
- when appending a new reply to the buffer, first fill all the unused space of the
  previous node before starting a new one.

other changes:
- expose mem_not_counted_for_evict info field for the benefit of the test suite
- add a test to make sure slave buffers are counted correctly and that they don't cause eviction
This commit is contained in:
Oran Agra 2018-02-21 20:18:34 +02:00
parent ab33bcd346
commit bf680b6f8c
10 changed files with 180 additions and 48 deletions

@ -645,7 +645,7 @@ struct client *createFakeClient(void) {
c->obuf_soft_limit_reached_time = 0; c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate(); c->watched_keys = listCreate();
c->peerid = NULL; c->peerid = NULL;
listSetFreeMethod(c->reply,decrRefCountVoid); listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c); initClientMultiState(c);
return c; return c;

@ -2712,9 +2712,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
sds proto = sdsnewlen(c->buf,c->bufpos); sds proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0; c->bufpos = 0;
while(listLength(c->reply)) { while(listLength(c->reply)) {
sds o = listNodeValue(listFirst(c->reply)); clientReplyBlock *o = listNodeValue(listFirst(c->reply));
proto = sdscatsds(proto,o); proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
} }
reply = moduleCreateCallReplyFromProto(ctx,proto); reply = moduleCreateCallReplyFromProto(ctx,proto);

@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
/* Client.reply list dup and free methods. */ /* Client.reply list dup and free methods. */
void *dupClientReplyValue(void *o) { void *dupClientReplyValue(void *o) {
return sdsdup(o); clientReplyBlock *old = o;
clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
return buf;
} }
void freeClientReplyValue(void *o) { void freeClientReplyValue(void *o) {
sdsfree(o); zfree(o);
} }
int listMatchObjects(void *a, void *b) { int listMatchObjects(void *a, void *b) {
@ -240,25 +243,31 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
void _addReplyStringToList(client *c, const char *s, size_t len) { void _addReplyStringToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) { listNode *ln = listLast(c->reply);
sds node = sdsnewlen(s,len); clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
listAddNodeTail(c->reply,node); /* It is possible that we have a tail list node, but no tail buffer.
c->reply_bytes += len; * if addDeferredMultiBulkLength() was used. */
} else {
listNode *ln = listLast(c->reply);
sds tail = listNodeValue(ln);
/* Append to this object when possible. If tail == NULL it was /* Append to tail string when possible. */
* set via addDeferredMultiBulkLength(). */ if (tail) {
if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) { /* Copy the part we can fit into the tail, and leave the rest for a new node */
tail = sdscatlen(tail,s,len); size_t avail = tail->size - tail->used;
listNodeValue(ln) = tail; size_t copy = avail >= len? len: avail;
c->reply_bytes += len; memcpy(tail->buf + tail->used, s, copy);
} else { tail->used += copy;
sds node = sdsnewlen(s,len); s += copy;
listAddNodeTail(c->reply,node); len -= copy;
c->reply_bytes += len; }
} if (len) {
/* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES */
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
tail = zmalloc(size + sizeof(clientReplyBlock));
/* take over the allocation's internal fragmentation */
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
listAddNodeTail(c->reply, tail);
c->reply_bytes += tail->size;
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
} }
@ -390,26 +399,35 @@ void *addDeferredMultiBulkLength(client *c) {
/* Populate the length object and try gluing it to the next chunk. */ /* Populate the length object and try gluing it to the next chunk. */
void setDeferredMultiBulkLength(client *c, void *node, long length) { void setDeferredMultiBulkLength(client *c, void *node, long length) {
listNode *ln = (listNode*)node; listNode *ln = (listNode*)node;
sds len, next; clientReplyBlock *next;
char lenstr[128];
size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
/* Abort when *node is NULL: when the client should not accept writes /* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addDeferredMultiBulkLength() */ * we return NULL in addDeferredMultiBulkLength() */
if (node == NULL) return; if (node == NULL) return;
serverAssert(!listNodeValue(ln));
len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length); /* Glue into next node when:
listNodeValue(ln) = len; * - the next node is non-NULL,
c->reply_bytes += sdslen(len); * - it has enough room already allocated
if (ln->next != NULL) { * - and not too large (avoid large memmove) */
next = listNodeValue(ln->next); if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
next->size - next->used >= lenstr_len &&
/* Only glue when the next node is non-NULL (an sds in this case) */ next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
if (next != NULL) { memmove(next->buf + lenstr_len, next->buf, next->used);
len = sdscatsds(len,next); memcpy(next->buf, lenstr, lenstr_len);
listDelNode(c->reply,ln->next); next->used += lenstr_len;
listNodeValue(ln) = len; listDelNode(c->reply,ln);
/* No need to update c->reply_bytes: we are just moving the same } else {
* amount of bytes from one node to another. */ /* Create a new node */
} clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
/* take over the allocation's internal fragmentation */
buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock);
buf->used = lenstr_len;
memcpy(buf->buf, lenstr, lenstr_len);
listNodeValue(ln) = buf;
c->reply_bytes += buf->size;
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
} }
@ -895,7 +913,7 @@ client *lookupClientByID(uint64_t id) {
int writeToClient(int fd, client *c, int handler_installed) { int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0; ssize_t nwritten = 0, totwritten = 0;
size_t objlen; size_t objlen;
sds o; clientReplyBlock *o;
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) { if (c->bufpos > 0) {
@ -912,23 +930,24 @@ int writeToClient(int fd, client *c, int handler_installed) {
} }
} else { } else {
o = listNodeValue(listFirst(c->reply)); o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o); objlen = o->used;
if (objlen == 0) { if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
continue; continue;
} }
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen); nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break; if (nwritten <= 0) break;
c->sentlen += nwritten; c->sentlen += nwritten;
totwritten += nwritten; totwritten += nwritten;
/* If we fully sent the object on head go to the next one */ /* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) { if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0; c->sentlen = 0;
c->reply_bytes -= objlen;
/* If there are no longer objects in the list, we expect /* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */ * the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0) if (listLength(c->reply) == 0)
@ -1899,10 +1918,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is * the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */ * enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long getClientOutputBufferMemoryUsage(client *c) {
unsigned long list_item_size = sizeof(listNode)+5; unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
/* The +5 above means we assume an sds16 hdr, may not be true
* but is not going to be a problem. */
return c->reply_bytes + (list_item_size*listLength(c->reply)); return c->reply_bytes + (list_item_size*listLength(c->reply));
} }

@ -2148,6 +2148,7 @@ void replicationCacheMaster(client *c) {
server.master->read_reploff = server.master->reploff; server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c); if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply); listEmpty(c->reply);
c->reply_bytes = 0;
c->bufpos = 0; c->bufpos = 0;
resetClient(c); resetClient(c);

@ -575,9 +575,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
reply = sdsnewlen(c->buf,c->bufpos); reply = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0; c->bufpos = 0;
while(listLength(c->reply)) { while(listLength(c->reply)) {
sds o = listNodeValue(listFirst(c->reply)); clientReplyBlock *o = listNodeValue(listFirst(c->reply));
reply = sdscatsds(reply,o); reply = sdscatlen(reply,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
} }
} }

@ -3101,6 +3101,11 @@ sds genRedisInfoString(char *section) {
"rss_overhead_bytes:%zu\r\n" "rss_overhead_bytes:%zu\r\n"
"mem_fragmentation_ratio:%.2f\r\n" "mem_fragmentation_ratio:%.2f\r\n"
"mem_fragmentation_bytes:%zu\r\n" "mem_fragmentation_bytes:%zu\r\n"
"mem_not_counted_for_evict:%zu\r\n"
"mem_replication_backlog:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
"mem_aof_buffer:%zu\r\n"
"mem_allocator:%s\r\n" "mem_allocator:%s\r\n"
"active_defrag_running:%d\r\n" "active_defrag_running:%d\r\n"
"lazyfree_pending_objects:%zu\r\n", "lazyfree_pending_objects:%zu\r\n",
@ -3133,6 +3138,11 @@ sds genRedisInfoString(char *section) {
mh->rss_extra_bytes, mh->rss_extra_bytes,
mh->total_frag, /* this is the total RSS overhead, including fragmentation, */ mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
mh->total_frag_bytes, /* named so for backwards compatibility */ mh->total_frag_bytes, /* named so for backwards compatibility */
freeMemoryGetNotCountedMemory(),
mh->repl_backlog,
mh->clients_slaves,
mh->clients_normal,
mh->aof_buffer,
ZMALLOC_LIB, ZMALLOC_LIB,
server.active_defrag_running, server.active_defrag_running,
lazyfreeGetPendingObjectsCount() lazyfreeGetPendingObjectsCount()

@ -619,6 +619,11 @@ typedef struct redisObject {
struct evictionPoolEntry; /* Defined in evict.c */ struct evictionPoolEntry; /* Defined in evict.c */
typedef struct clientReplyBlock {
size_t size, used;
char buf[];
} clientReplyBlock;
/* Redis database representation. There are multiple databases identified /* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured * by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */ * database. The database number is the 'id' field in the structure. */
@ -1423,6 +1428,7 @@ void addReplySubcommandSyntaxError(client *c);
void copyClientOutputBuffer(client *dst, client *src); void copyClientOutputBuffer(client *dst, client *src);
size_t sdsZmallocSize(sds s); size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o); size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
void *dupClientReplyValue(void *o); void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer); unsigned long *biggest_input_buffer);
@ -1664,6 +1670,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
/* Core functions */ /* Core functions */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level); int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void); int freeMemoryIfNeeded(void);
int processCommand(client *c); int processCommand(client *c);
void setupSignalHandlers(void); void setupSignalHandlers(void);

@ -182,6 +182,9 @@ size_t zmalloc_size(void *ptr) {
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1)); if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
return size+PREFIX_SIZE; return size+PREFIX_SIZE;
} }
size_t zmalloc_usable(void *ptr) {
return zmalloc_usable(ptr)-PREFIX_SIZE;
}
#endif #endif
void zfree(void *ptr) { void zfree(void *ptr) {

@ -98,6 +98,9 @@ void *zmalloc_no_tcache(size_t size);
#ifndef HAVE_MALLOC_SIZE #ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr); size_t zmalloc_size(void *ptr);
size_t zmalloc_usable(void *ptr);
#else
#define zmalloc_usable(p) zmalloc_size(p)
#endif #endif
#endif /* __ZMALLOC_H */ #endif /* __ZMALLOC_H */

@ -142,3 +142,95 @@ start_server {tags {"maxmemory"}} {
} }
} }
} }
proc test_slave_buffers {cmd_count payload_len limit_memory pipeline} {
start_server {tags {"maxmemory"}} {
start_server {} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
# add 100 keys of 100k (10MB total)
for {set j 0} {$j < 100} {incr j} {
$master setrange "key:$j" 100000 asdf
}
$master config set maxmemory-policy allkeys-random
$master config set client-output-buffer-limit "slave 100000000 100000000 60"
$master config set repl-backlog-size [expr {10*1024}]
$slave slaveof $master_host $master_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication not started."
}
# measure used memory after the slave connected and set maxmemory
set orig_used [s -1 used_memory]
set orig_client_buf [s -1 mem_clients_normal]
set orig_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
set orig_used_no_repl [expr {$orig_used - $orig_mem_not_counted_for_evict}]
set limit [expr {$orig_used - $orig_mem_not_counted_for_evict + 20*1024}]
if {$limit_memory==1} {
$master config set maxmemory $limit
}
# put the slave to sleep
set rd_slave [redis_deferring_client]
$rd_slave debug sleep 60
# send some 10mb woth of commands that don't increase the memory usage
if {$pipeline == 1} {
set rd_master [redis_deferring_client -1]
for {set k 0} {$k < $cmd_count} {incr k} {
$rd_master setrange key:0 0 [string repeat A $payload_len]
}
for {set k 0} {$k < $cmd_count} {incr k} {
#$rd_master read
}
} else {
for {set k 0} {$k < $cmd_count} {incr k} {
$master setrange key:0 0 [string repeat A $payload_len]
}
}
set new_used [s -1 used_memory]
set slave_buf [s -1 mem_clients_slaves]
set client_buf [s -1 mem_clients_normal]
set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}]
set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}]
assert {[$master dbsize] == 100}
assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers
assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
$master client kill type slave
set killed_used [s -1 used_memory]
set killed_slave_buf [s -1 mem_clients_slaves]
set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}]
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
assert {$killed_slave_buf == 0}
assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
}
}
}
test {slave buffer are counted correctly} {
# we wanna use many small commands, and we don't wanna wait long
# so we need to use a pipeline (redis_deferring_client)
# that may cause query buffer to fill and induce eviction, so we disable it
test_slave_buffers 1000000 10 0 1
}
test {slave buffer don't induce eviction} {
# test again with fewer (and bigger) commands without pipeline, but with eviction
test_slave_buffers 100000 100 1 0
}