diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index c7c4dfd3..8da5fc93 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -40,16 +40,24 @@ #include #include #include +#include #include /* Use hiredis sds. */ #include "ae.h" #include "hiredis.h" #include "adlist.h" #include "zmalloc.h" +#include "atomicvar.h" #define UNUSED(V) ((void) V) #define RANDPTR_INITIAL_SIZE 8 #define MAX_LATENCY_PRECISION 3 +#define MAX_THREADS 16 + +#define CLIENT_GET_EVENTLOOP(c) \ + (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el) + +struct benchmarkThread; static struct config { aeEventLoop *el; @@ -82,6 +90,12 @@ static struct config { char *tests; char *auth; int precision; + int num_threads; + struct benchmarkThread **threads; + /* Thread mutexes to be used as fallbacks by atomicvar.h */ + pthread_mutex_t requests_issued_mutex; + pthread_mutex_t requests_finished_mutex; + pthread_mutex_t liveclients_mutex; } config; typedef struct _client { @@ -98,11 +112,27 @@ typedef struct _client { such as auth and select are prefixed to the pipeline of benchmark commands and discarded after the first send. */ int prefixlen; /* Size in bytes of the pending prefix commands */ + int thread_id; } *client; +/* Threads. */ + +typedef struct benchmarkThread { + int index; + pthread_t thread; + aeEventLoop *el; +} benchmarkThread; + + /* Prototypes */ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); static void createMissingClients(client c); +static benchmarkThread *createBenchmarkThread(int index); +static void freeBenchmarkThread(benchmarkThread *thread); +static void freeBenchmarkThreads(); +static void *execBenchmarkThread(void *ptr); +int showThroughput(struct aeEventLoop *eventLoop, long long id, + void *clientData); /* Implementation */ static long long ustime(void) { @@ -126,17 +156,20 @@ static long long mstime(void) { } static void freeClient(client c) { + aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); listNode *ln; - aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE); - aeDeleteFileEvent(config.el,c->context->fd,AE_READABLE); + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeDeleteFileEvent(el,c->context->fd,AE_READABLE); redisFree(c->context); sdsfree(c->obuf); zfree(c->randptr); zfree(c); + if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); config.liveclients--; ln = listSearchKey(config.clients,c); assert(ln != NULL); listDelNode(config.clients,ln); + if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex)); } static void freeAllClients(void) { @@ -150,9 +183,10 @@ static void freeAllClients(void) { } static void resetClient(client c) { - aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE); - aeDeleteFileEvent(config.el,c->context->fd,AE_READABLE); - aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c); + aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeDeleteFileEvent(el,c->context->fd,AE_READABLE); + aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); c->written = 0; c->pending = config.pipeline; } @@ -174,17 +208,30 @@ static void randomizeClientKey(client c) { } static void clientDone(client c) { - if (config.requests_finished == config.requests) { + int requests_finished = 0; + if (!config.num_threads) requests_finished = config.requests_finished; + else atomicGet(config.requests_finished, requests_finished); + if (requests_finished == config.requests) { + aeStop(CLIENT_GET_EVENTLOOP(c)); freeClient(c); - aeStop(config.el); + if (config.num_threads) { + int i = 0; + for (;i < config.num_threads; i++) { + benchmarkThread *t = config.threads[i]; + if (t && t->el) aeStop(t->el); + } + } return; } if (config.keepalive) { resetClient(c); } else { + if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); config.liveclients--; createMissingClients(c); config.liveclients++; + if (config.num_threads) + pthread_mutex_unlock(&(config.liveclients_mutex)); freeClient(c); } } @@ -243,9 +290,14 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - - if (config.requests_finished < config.requests) - config.latency[config.requests_finished++] = c->latency; + if (config.num_threads) + pthread_mutex_lock(&(config.requests_finished_mutex)); + if (config.requests_finished < config.requests) { + config.requests_finished++; + config.latency[config.requests_finished] = c->latency; + } + if (config.num_threads) + pthread_mutex_unlock(&(config.requests_finished_mutex)); c->pending--; if (c->pending == 0) { clientDone(c); @@ -267,7 +319,10 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Initialize request when nothing was written. */ if (c->written == 0) { /* Enforce upper bound to number of requests. */ - if (config.requests_issued++ >= config.requests) { + int requests_issued = 0; + if (!config.num_threads) requests_issued = config.requests_issued++; + else atomicGetIncr(config.requests_issued, requests_issued, 1); + if (requests_issued >= config.requests) { freeClient(c); return; } @@ -289,8 +344,8 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } c->written += nwritten; if (sdslen(c->obuf) == c->written) { - aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE); - aeCreateFileEvent(config.el,c->context->fd,AE_READABLE,readHandler,c); + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); } } } @@ -316,7 +371,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * for arguments randomization. * * Even when cloning another client, prefix commands are applied if needed.*/ -static client createClient(char *cmd, size_t len, client from) { +static client createClient(char *cmd, size_t len, client from, int thread_id) { int j; client c = zmalloc(sizeof(struct _client)); @@ -333,6 +388,7 @@ static client createClient(char *cmd, size_t len, client from) { fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr); exit(1); } + c->thread_id = thread_id; /* Suppress hiredis cleanup of unused buffers for max speed. */ c->context->reader->maxbuf = 0; @@ -406,8 +462,14 @@ static client createClient(char *cmd, size_t len, client from) { } } } + aeEventLoop *el = NULL; + if (thread_id < 0) el = config.el; + else { + benchmarkThread *thread = config.threads[thread_id]; + el = thread->el; + } if (config.idlemode == 0) - aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c); + aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); listAddNodeTail(config.clients,c); config.liveclients++; return c; @@ -415,9 +477,11 @@ static client createClient(char *cmd, size_t len, client from) { static void createMissingClients(client c) { int n = 0; - while(config.liveclients < config.numclients) { - createClient(NULL,0,c); + int thread_id = -1; + if (config.num_threads) + thread_id = config.liveclients % config.num_threads; + createClient(NULL,0,c,thread_id); /* Listen backlog is quite limited on most systems */ if (++n > 64) { @@ -454,6 +518,9 @@ static void showLatencyReport(void) { printf(" %d parallel clients\n", config.numclients); printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); + printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no")); + if (config.num_threads) + printf(" threads: %d\n", config.num_threads); printf("\n"); qsort(config.latency,config.requests,sizeof(long long),compareLatency); @@ -484,21 +551,74 @@ static void showLatencyReport(void) { } static void benchmark(char *title, char *cmd, int len) { + int i; client c; config.title = title; config.requests_issued = 0; config.requests_finished = 0; - c = createClient(cmd,len,NULL); + if (config.num_threads) { + if (config.threads) freeBenchmarkThreads(); + config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*)); + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *thread = createBenchmarkThread(i); + config.threads[i] = thread; + } + } + + int thread_id = config.num_threads > 0 ? 0 : -1; + c = createClient(cmd,len,NULL,thread_id); createMissingClients(c); config.start = mstime(); - aeMain(config.el); + if (!config.num_threads) aeMain(config.el); + else { + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *t = config.threads[i]; + if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ + fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); + exit(1); + } + } + for (i = 0; i < config.num_threads; i++) + pthread_join(config.threads[i]->thread, NULL); + } config.totlatency = mstime()-config.start; showLatencyReport(); freeAllClients(); + if (config.threads) freeBenchmarkThreads(); +} + +static benchmarkThread *createBenchmarkThread(int index) { + benchmarkThread *thread = zmalloc(sizeof(*thread)); + if (thread == NULL) return NULL; + thread->index = index; + thread->el = aeCreateEventLoop(1024*10); + aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL); + return thread; +} + +static void freeBenchmarkThread(benchmarkThread *thread) { + if (thread->el) aeDeleteEventLoop(thread->el); + zfree(thread); +} + +static void freeBenchmarkThreads() { + int i = 0; + for (; i < config.num_threads; i++) { + benchmarkThread *thread = config.threads[i]; + if (thread) freeBenchmarkThread(thread); + } + zfree(config.threads); + config.threads = NULL; +} + +static void *execBenchmarkThread(void *ptr) { + benchmarkThread *thread = (benchmarkThread *) ptr; + aeMain(thread->el); + return NULL; } /* Returns number of consumed options. */ @@ -576,6 +696,14 @@ int parseOptions(int argc, const char **argv) { config.precision = atoi(argv[++i]); if (config.precision < 0) config.precision = 0; if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION; + } else if (!strcmp(argv[i],"--threads")) { + if (lastarg) goto invalid; + config.num_threads = atoi(argv[++i]); + if (config.num_threads > MAX_THREADS) { + printf("WARNING: too many threads, limiting threads to %d.\n", + MAX_THREADS); + config.num_threads = MAX_THREADS; + } else if (config.num_threads < 0) config.num_threads = 0; } else if (!strcmp(argv[i],"--help")) { exit_status = 0; goto usage; @@ -644,8 +772,17 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(eventLoop); UNUSED(id); UNUSED(clientData); + int liveclients = 0; + int requests_finished = 0; + if (!config.num_threads) { + liveclients = config.liveclients; + requests_finished = config.requests_finished; + } else { + atomicGet(config.liveclients, liveclients); + atomicGet(config.requests_finished, requests_finished); + } - if (config.liveclients == 0 && config.requests_finished != config.requests) { + if (liveclients == 0 && requests_finished != config.requests) { fprintf(stderr,"All clients disconnected... aborting.\n"); exit(1); } @@ -656,7 +793,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData return 250; } float dt = (float)(mstime()-config.start)/1000.0; - float rps = (float)config.requests_finished/dt; + float rps = (float)requests_finished/dt; printf("%s: %.2f\r", config.title, rps); fflush(stdout); return 250; /* every 250ms */ @@ -711,12 +848,19 @@ int main(int argc, const char **argv) { config.dbnum = 0; config.auth = NULL; config.precision = 1; + config.num_threads = 0; + config.threads = NULL; i = parseOptions(argc,argv); argc -= i; argv += i; config.latency = zmalloc(sizeof(long long)*config.requests); + if (config.num_threads > 0) { + pthread_mutex_init(&(config.requests_issued_mutex), NULL); + pthread_mutex_init(&(config.requests_finished_mutex), NULL); + pthread_mutex_init(&(config.liveclients_mutex), NULL); + } if (config.keepalive == 0) { printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n"); @@ -724,7 +868,7 @@ int main(int argc, const char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - c = createClient("",0,NULL); /* will never receive a reply */ + c = createClient("",0,NULL,-1); /* will never receive a reply */ createMissingClients(c); aeMain(config.el); /* and will wait for every */