diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 8db554fe..ab71839a 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -140,8 +140,9 @@ typedef struct clusterNode { sds name; int flags; sds replicate; /* Master ID if node is a slave */ - list *slots; - listIter slot_iter; + int *slots; + int current_slot_index; + int slots_count; int replicas_count; sds *migrating; /* An array of sds where even strings are slots and odd * strings are the destination node IDs. */ @@ -188,6 +189,7 @@ static void freeClient(client c) { listNode *ln; aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); aeDeleteFileEvent(el,c->context->fd,AE_READABLE); + aeStop(el); redisFree(c->context); sdsfree(c->obuf); zfree(c->randptr); @@ -240,13 +242,8 @@ static void setClusterKeyHashTag(client c) { assert(c->thread_id >= 0); clusterNode *node = c->cluster_node; assert(node); - listNode *ln = listNext(&node->slot_iter); - if (ln == NULL) { - listRewind(node->slots, &(node->slot_iter)); - ln = listNext(&(node->slot_iter)); - assert(ln != NULL); - } - int slot = (int) ln->value; + assert(node->current_slot_index < node->slots_count); + int slot = node->slots[node->current_slot_index]; const char *tag = crc16_slot_table[slot]; int taglen = strlen(tag); size_t i; @@ -262,16 +259,8 @@ static void clientDone(client c) { int requests_finished = 0; if (!config.num_threads) requests_finished = config.requests_finished; else atomicGet(config.requests_finished, requests_finished); - if (requests_finished == config.requests) { - aeStop(CLIENT_GET_EVENTLOOP(c)); + if (requests_finished >= config.requests) { freeClient(c); - if (config.num_threads) { - int i = 0; - for (;i < config.num_threads; i++) { - benchmarkThread *t = config.threads[i]; - if (t && t->el) aeStop(t->el); - } - } return; } if (config.keepalive) { @@ -313,12 +302,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { fprintf(stderr,"Unexpected error reply, exiting...\n"); exit(1); } + redisReply *r = reply; + int is_err = (r->type == REDIS_REPLY_ERROR); - if (config.showerrors) { + if (is_err && config.showerrors) { static time_t lasterr_time = 0; time_t now = time(NULL); - redisReply *r = reply; - if (r->type == REDIS_REPLY_ERROR && lasterr_time != now) { + if (lasterr_time != now) { lasterr_time = now; if (c->cluster_node) { printf("Error from server %s:%d: %s\n", @@ -329,6 +319,20 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } + if (config.cluster_mode && is_err && c->cluster_node && + (!strncmp(r->str,"MOVED",5) || + !strcmp(r->str,"ASK"))) + { + clusterNode *node = c->cluster_node; + assert(node); + if (++node->current_slot_index >= node->slots_count) { + fprintf(stderr,"Cluster node %s:%d has no more " + "valid slots, aborting...\n", node->ip, + node->port); + exit(1); + } + } + freeReplyObject(reply); /* This is an OK for prefix commands such as auth and select.*/ if (c->prefix_pending > 0) { @@ -346,14 +350,11 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } continue; } - if (config.num_threads) - pthread_mutex_lock(&(config.requests_finished_mutex)); - if (config.requests_finished < config.requests) { - config.requests_finished++; - config.latency[config.requests_finished] = c->latency; - } - if (config.num_threads) - pthread_mutex_unlock(&(config.requests_finished_mutex)); + int requests_finished = 0; + atomicGetIncr(config.requests_finished, requests_finished, 1); + requests_finished--; + if (requests_finished < config.requests) + config.latency[++requests_finished] = c->latency; c->pending--; if (c->pending == 0) { clientDone(c); @@ -389,7 +390,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { c->start = ustime(); c->latency = -1; } - if (sdslen(c->obuf) > c->written) { void *ptr = c->obuf+c->written; ssize_t nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written); @@ -733,7 +733,9 @@ static clusterNode *createClusterNode(char *ip, int port) { node->flags = 0; node->replicate = NULL; node->replicas_count = 0; - node->slots = listCreate(); + node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int)); + node->slots_count = 0; + node->current_slot_index = 0; node->migrating = NULL; node->importing = NULL; node->migrating_count = 0; @@ -757,7 +759,7 @@ static void freeClusterNode(clusterNode *node) { * config.hostip and config.hostport, then the node ip has been * allocated by fetchClusterConfiguration, so it must be freed. */ if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip); - listRelease(node->slots); + zfree(node->slots); zfree(node); } @@ -916,15 +918,15 @@ static int fetchClusterConfiguration() { stop = atoi(p + 1); while (start <= stop) { int slot = start++; - listAddNodeTail(node->slots, (void *)(uintptr_t)slot); + node->slots[node->slots_count++] = slot; } } else if (p > slotsdef) { int slot = atoi(slotsdef); - listAddNodeTail(node->slots, (void *)(uintptr_t)slot); + node->slots[node->slots_count++] = slot; } } } - if (listLength(node->slots) == 0) { + if (node->slots_count == 0) { printf("WARNING: master node %s:%d has no slots, skipping...\n", node->ip, node->port); continue; @@ -933,7 +935,6 @@ static int fetchClusterConfiguration() { success = 0; goto cleanup; } - listRewind(node->slots, &(node->slot_iter)); } cleanup: if (ctx) redisFree(ctx); @@ -1268,19 +1269,19 @@ int main(int argc, const char **argv) { } if (test_is_selected("set")) { - len = redisFormatCommand(&cmd,"SET key:__rand_int__{tag} %s",data); + len = redisFormatCommand(&cmd,"SET key:{tag}:__rand_int__ %s",data); benchmark("SET",cmd,len); free(cmd); } if (test_is_selected("get")) { - len = redisFormatCommand(&cmd,"GET key:__rand_int__{tag}"); + len = redisFormatCommand(&cmd,"GET key:{tag}:__rand_int__"); benchmark("GET",cmd,len); free(cmd); } if (test_is_selected("incr")) { - len = redisFormatCommand(&cmd,"INCR counter:__rand_int__{tag}"); + len = redisFormatCommand(&cmd,"INCR counter:{tag}:__rand_int__"); benchmark("INCR",cmd,len); free(cmd); }