From f95e01266eb42d7442f3b62242957e9e99e3ac90 Mon Sep 17 00:00:00 2001 From: artix Date: Thu, 17 Jan 2019 17:40:15 +0100 Subject: [PATCH] Redis benchmark: configurable thread count in cluster mode and fixes --- src/redis-benchmark.c | 49 ++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index ab71839a..4e15fa58 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -53,7 +53,7 @@ #define UNUSED(V) ((void) V) #define RANDPTR_INITIAL_SIZE 8 #define MAX_LATENCY_PRECISION 3 -#define MAX_THREADS 16 +#define MAX_THREADS 500 #define CLUSTER_SLOTS 16384 #define CLIENT_GET_EVENTLOOP(c) \ @@ -189,7 +189,13 @@ static void freeClient(client c) { listNode *ln; aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); aeDeleteFileEvent(el,c->context->fd,AE_READABLE); - aeStop(el); + if (c->thread_id >= 0) { + int requests_finished = 0; + atomicGet(config.requests_finished, requests_finished); + if (requests_finished >= config.requests) { + aeStop(el); + } + } redisFree(c->context); sdsfree(c->obuf); zfree(c->randptr); @@ -261,6 +267,7 @@ static void clientDone(client c) { else atomicGet(config.requests_finished, requests_finished); if (requests_finished >= config.requests) { freeClient(c); + if (!config.num_threads && config.el) aeStop(config.el); return; } if (config.keepalive) { @@ -306,6 +313,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int is_err = (r->type == REDIS_REPLY_ERROR); if (is_err && config.showerrors) { + /* TODO: static lasterr_time not thread-safe */ static time_t lasterr_time = 0; time_t now = time(NULL); if (lasterr_time != now) { @@ -326,10 +334,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { clusterNode *node = c->cluster_node; assert(node); if (++node->current_slot_index >= node->slots_count) { - fprintf(stderr,"Cluster node %s:%d has no more " - "valid slots, aborting...\n", node->ip, - node->port); - exit(1); + if (config.showerrors) { + fprintf(stderr, "WARN: No more available slots in " + "node %s:%d\n", node->ip, node->port); + } + freeReplyObject(reply); + freeClient(c); + return; } } @@ -352,9 +363,8 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } int requests_finished = 0; atomicGetIncr(config.requests_finished, requests_finished, 1); - requests_finished--; if (requests_finished < config.requests) - config.latency[++requests_finished] = c->latency; + config.latency[requests_finished] = c->latency; c->pending--; if (c->pending == 0) { clientDone(c); @@ -433,15 +443,20 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { int is_cluster_client = (config.cluster_mode && thread_id >= 0); client c = zmalloc(sizeof(struct _client)); - const char *ip; - int port; + const char *ip = NULL; + int port = 0; c->cluster_node = NULL; if (config.hostsocket == NULL || is_cluster_client) { if (!is_cluster_client) { ip = config.hostip; port = config.hostport; } else { - clusterNode *node = config.cluster_nodes[thread_id]; + int node_idx = 0; + if (config.num_threads < config.cluster_node_count) + node_idx = config.liveclients % config.cluster_node_count; + else + node_idx = thread_id % config.cluster_node_count; + clusterNode *node = config.cluster_nodes[node_idx]; if (node == NULL) exit(1); ip = (const char *) node->ip; port = node->port; @@ -556,7 +571,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { while ((p = strstr(p,"{tag}")) != NULL) { if (c->stagfree == 0) { c->stagptr = zrealloc(c->stagptr, - sizeof(char*)*c->staglen*2); + sizeof(char*) * c->staglen*2); c->stagfree += c->staglen; } c->stagptr[c->staglen++] = p; @@ -1112,6 +1127,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData fprintf(stderr,"All clients disconnected... aborting.\n"); exit(1); } + if (config.num_threads && requests_finished >= config.requests) { + aeStop(eventLoop); + return AE_NOMORE; + } if (config.csv) return 250; if (config.idlemode == 1) { printf("clients: %d\r", config.liveclients); @@ -1214,8 +1233,10 @@ int main(int argc, const char **argv) { if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); } - /* TODO: allow for more thrads per cluster node. */ - config.num_threads = config.cluster_node_count; + /* Automatically set thread number to node count if not specified + * by the user. */ + if (config.num_threads == 0) + config.num_threads = config.cluster_node_count; } if (config.num_threads > 0) {