From 19a438e2c09363d161ed1cfae415222d3d16bfb4 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 7 Jun 2018 14:24:45 +0200 Subject: [PATCH] Streams: use non static macro node limits. Also add the concept of size/items limit, instead of just having as limit the number of bytes. --- redis.conf | 8 ++++++++ src/server.c | 2 ++ src/server.h | 6 +++++- src/t_stream.c | 14 +++++++++++++- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/redis.conf b/redis.conf index f5b7d5fe..e7a227ab 100644 --- a/redis.conf +++ b/redis.conf @@ -1106,6 +1106,14 @@ zset-max-ziplist-value 64 # composed of many HyperLogLogs with cardinality in the 0 - 15000 range. hll-sparse-max-bytes 3000 +# Streams macro node max size / items. The stream data structure is a radix +# tree of big nodes that encode multiple items inside. Using this configuration +# it is possible to configure how big a single node can be in bytes, and the +# maximum number of items it may contain before switching to a new node when +# appending new stream entries. +stream-node-max-bytes 4096 +stream-node-max-entires 100 + # Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in # order to help rehashing the main Redis hash table (the one mapping top-level # keys to values). The hash table implementation Redis uses (see dict.c) diff --git a/src/server.c b/src/server.c index 647aee24..375c6477 100644 --- a/src/server.c +++ b/src/server.c @@ -1485,6 +1485,8 @@ void initServerConfig(void) { server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES; server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; + server.stream_node_max_bytes = OBJ_STREAM_NODE_MAX_BYTES; + server.stream_node_max_entries = OBJ_STREAM_NODE_MAX_ENTRIES; server.shutdown_asap = 0; server.cluster_enabled = 0; server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; diff --git a/src/server.h b/src/server.h index d9c512c5..c34cdcfb 100644 --- a/src/server.h +++ b/src/server.h @@ -348,12 +348,14 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_FSYNC_EVERYSEC 2 #define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC -/* Zip structure related defaults */ +/* Zipped structures related defaults */ #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512 #define OBJ_HASH_MAX_ZIPLIST_VALUE 64 #define OBJ_SET_MAX_INTSET_ENTRIES 512 #define OBJ_ZSET_MAX_ZIPLIST_ENTRIES 128 #define OBJ_ZSET_MAX_ZIPLIST_VALUE 64 +#define OBJ_STREAM_NODE_MAX_BYTES 4096 +#define OBJ_STREAM_NODE_MAX_ENTRIES 100 /* List defaults */ #define OBJ_LIST_MAX_ZIPLIST_SIZE -2 @@ -1177,6 +1179,8 @@ struct redisServer { size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; size_t hll_sparse_max_bytes; + size_t stream_node_max_bytes; + int64_t stream_node_max_entries; /* List parameters */ int list_max_ziplist_size; int list_compress_depth; diff --git a/src/t_stream.c b/src/t_stream.c index 07af3ff8..e5d29764 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -237,8 +237,20 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, * regular stream entries (see below), and marks the fact that there are * no more entires, when we scan the stream from right to left. */ + /* First of all, check if we can append to the current macro node or + * if we need to switch to the next one. 'lp' will be set to NULL if + * the current node is full. */ + if (lp != NULL) { + if (lp_bytes > server.stream_node_max_bytes) { + lp = NULL; + } else { + int64_t count = lpGetInteger(lpFirst(lp)); + if (count > server.stream_node_max_entries) lp = NULL; + } + } + int flags = STREAM_ITEM_FLAG_NONE; - if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { + if (lp == NULL || lp_bytes > server.stream_node_max_bytes) { master_id = id; streamEncodeID(rax_key,&id); /* Create the listpack having the master entry ID and fields. */