Redis benchmark: configurable thread count in cluster mode and fixes

This commit is contained in:
artix 2019-01-17 17:40:15 +01:00
parent 5fd5799cf9
commit f95e01266e

View File

@ -53,7 +53,7 @@
#define UNUSED(V) ((void) V) #define UNUSED(V) ((void) V)
#define RANDPTR_INITIAL_SIZE 8 #define RANDPTR_INITIAL_SIZE 8
#define MAX_LATENCY_PRECISION 3 #define MAX_LATENCY_PRECISION 3
#define MAX_THREADS 16 #define MAX_THREADS 500
#define CLUSTER_SLOTS 16384 #define CLUSTER_SLOTS 16384
#define CLIENT_GET_EVENTLOOP(c) \ #define CLIENT_GET_EVENTLOOP(c) \
@ -189,7 +189,13 @@ static void freeClient(client c) {
listNode *ln; listNode *ln;
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE); aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
aeStop(el); if (c->thread_id >= 0) {
int requests_finished = 0;
atomicGet(config.requests_finished, requests_finished);
if (requests_finished >= config.requests) {
aeStop(el);
}
}
redisFree(c->context); redisFree(c->context);
sdsfree(c->obuf); sdsfree(c->obuf);
zfree(c->randptr); zfree(c->randptr);
@ -261,6 +267,7 @@ static void clientDone(client c) {
else atomicGet(config.requests_finished, requests_finished); else atomicGet(config.requests_finished, requests_finished);
if (requests_finished >= config.requests) { if (requests_finished >= config.requests) {
freeClient(c); freeClient(c);
if (!config.num_threads && config.el) aeStop(config.el);
return; return;
} }
if (config.keepalive) { if (config.keepalive) {
@ -306,6 +313,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int is_err = (r->type == REDIS_REPLY_ERROR); int is_err = (r->type == REDIS_REPLY_ERROR);
if (is_err && config.showerrors) { if (is_err && config.showerrors) {
/* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0; static time_t lasterr_time = 0;
time_t now = time(NULL); time_t now = time(NULL);
if (lasterr_time != now) { if (lasterr_time != now) {
@ -326,10 +334,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
clusterNode *node = c->cluster_node; clusterNode *node = c->cluster_node;
assert(node); assert(node);
if (++node->current_slot_index >= node->slots_count) { if (++node->current_slot_index >= node->slots_count) {
fprintf(stderr,"Cluster node %s:%d has no more " if (config.showerrors) {
"valid slots, aborting...\n", node->ip, fprintf(stderr, "WARN: No more available slots in "
node->port); "node %s:%d\n", node->ip, node->port);
exit(1); }
freeReplyObject(reply);
freeClient(c);
return;
} }
} }
@ -352,9 +363,8 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
int requests_finished = 0; int requests_finished = 0;
atomicGetIncr(config.requests_finished, requests_finished, 1); atomicGetIncr(config.requests_finished, requests_finished, 1);
requests_finished--;
if (requests_finished < config.requests) if (requests_finished < config.requests)
config.latency[++requests_finished] = c->latency; config.latency[requests_finished] = c->latency;
c->pending--; c->pending--;
if (c->pending == 0) { if (c->pending == 0) {
clientDone(c); clientDone(c);
@ -433,15 +443,20 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
int is_cluster_client = (config.cluster_mode && thread_id >= 0); int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = zmalloc(sizeof(struct _client)); client c = zmalloc(sizeof(struct _client));
const char *ip; const char *ip = NULL;
int port; int port = 0;
c->cluster_node = NULL; c->cluster_node = NULL;
if (config.hostsocket == NULL || is_cluster_client) { if (config.hostsocket == NULL || is_cluster_client) {
if (!is_cluster_client) { if (!is_cluster_client) {
ip = config.hostip; ip = config.hostip;
port = config.hostport; port = config.hostport;
} else { } else {
clusterNode *node = config.cluster_nodes[thread_id]; int node_idx = 0;
if (config.num_threads < config.cluster_node_count)
node_idx = config.liveclients % config.cluster_node_count;
else
node_idx = thread_id % config.cluster_node_count;
clusterNode *node = config.cluster_nodes[node_idx];
if (node == NULL) exit(1); if (node == NULL) exit(1);
ip = (const char *) node->ip; ip = (const char *) node->ip;
port = node->port; port = node->port;
@ -556,7 +571,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
while ((p = strstr(p,"{tag}")) != NULL) { while ((p = strstr(p,"{tag}")) != NULL) {
if (c->stagfree == 0) { if (c->stagfree == 0) {
c->stagptr = zrealloc(c->stagptr, c->stagptr = zrealloc(c->stagptr,
sizeof(char*)*c->staglen*2); sizeof(char*) * c->staglen*2);
c->stagfree += c->staglen; c->stagfree += c->staglen;
} }
c->stagptr[c->staglen++] = p; c->stagptr[c->staglen++] = p;
@ -1112,6 +1127,10 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
fprintf(stderr,"All clients disconnected... aborting.\n"); fprintf(stderr,"All clients disconnected... aborting.\n");
exit(1); exit(1);
} }
if (config.num_threads && requests_finished >= config.requests) {
aeStop(eventLoop);
return AE_NOMORE;
}
if (config.csv) return 250; if (config.csv) return 250;
if (config.idlemode == 1) { if (config.idlemode == 1) {
printf("clients: %d\r", config.liveclients); printf("clients: %d\r", config.liveclients);
@ -1214,8 +1233,10 @@ int main(int argc, const char **argv) {
if (node->name) printf("%s ", node->name); if (node->name) printf("%s ", node->name);
printf("%s:%d\n", node->ip, node->port); printf("%s:%d\n", node->ip, node->port);
} }
/* TODO: allow for more thrads per cluster node. */ /* Automatically set thread number to node count if not specified
config.num_threads = config.cluster_node_count; * by the user. */
if (config.num_threads == 0)
config.num_threads = config.cluster_node_count;
} }
if (config.num_threads > 0) { if (config.num_threads > 0) {