Various changes to redis-benchmark thread and cluster support

- MOVED or ASK replies are now handled in cluster mode.
- Only the first slot per node is used in cluster mode.
- Mutlithreading: reduced usage of mutexes in favor of atomic vars.
This commit is contained in:
artix 2018-10-25 17:38:17 +02:00
parent 434f761304
commit 5fd5799cf9

View File

@ -140,8 +140,9 @@ typedef struct clusterNode {
sds name;
int flags;
sds replicate; /* Master ID if node is a slave */
list *slots;
listIter slot_iter;
int *slots;
int current_slot_index;
int slots_count;
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
@ -188,6 +189,7 @@ static void freeClient(client c) {
listNode *ln;
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
aeStop(el);
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
@ -240,13 +242,8 @@ static void setClusterKeyHashTag(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
assert(node);
listNode *ln = listNext(&node->slot_iter);
if (ln == NULL) {
listRewind(node->slots, &(node->slot_iter));
ln = listNext(&(node->slot_iter));
assert(ln != NULL);
}
int slot = (int) ln->value;
assert(node->current_slot_index < node->slots_count);
int slot = node->slots[node->current_slot_index];
const char *tag = crc16_slot_table[slot];
int taglen = strlen(tag);
size_t i;
@ -262,16 +259,8 @@ static void clientDone(client c) {
int requests_finished = 0;
if (!config.num_threads) requests_finished = config.requests_finished;
else atomicGet(config.requests_finished, requests_finished);
if (requests_finished == config.requests) {
aeStop(CLIENT_GET_EVENTLOOP(c));
if (requests_finished >= config.requests) {
freeClient(c);
if (config.num_threads) {
int i = 0;
for (;i < config.num_threads; i++) {
benchmarkThread *t = config.threads[i];
if (t && t->el) aeStop(t->el);
}
}
return;
}
if (config.keepalive) {
@ -313,12 +302,13 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
fprintf(stderr,"Unexpected error reply, exiting...\n");
exit(1);
}
redisReply *r = reply;
int is_err = (r->type == REDIS_REPLY_ERROR);
if (config.showerrors) {
if (is_err && config.showerrors) {
static time_t lasterr_time = 0;
time_t now = time(NULL);
redisReply *r = reply;
if (r->type == REDIS_REPLY_ERROR && lasterr_time != now) {
if (lasterr_time != now) {
lasterr_time = now;
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
@ -329,6 +319,20 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
if (config.cluster_mode && is_err && c->cluster_node &&
(!strncmp(r->str,"MOVED",5) ||
!strcmp(r->str,"ASK")))
{
clusterNode *node = c->cluster_node;
assert(node);
if (++node->current_slot_index >= node->slots_count) {
fprintf(stderr,"Cluster node %s:%d has no more "
"valid slots, aborting...\n", node->ip,
node->port);
exit(1);
}
}
freeReplyObject(reply);
/* This is an OK for prefix commands such as auth and select.*/
if (c->prefix_pending > 0) {
@ -346,14 +350,11 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
continue;
}
if (config.num_threads)
pthread_mutex_lock(&(config.requests_finished_mutex));
if (config.requests_finished < config.requests) {
config.requests_finished++;
config.latency[config.requests_finished] = c->latency;
}
if (config.num_threads)
pthread_mutex_unlock(&(config.requests_finished_mutex));
int requests_finished = 0;
atomicGetIncr(config.requests_finished, requests_finished, 1);
requests_finished--;
if (requests_finished < config.requests)
config.latency[++requests_finished] = c->latency;
c->pending--;
if (c->pending == 0) {
clientDone(c);
@ -389,7 +390,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
c->start = ustime();
c->latency = -1;
}
if (sdslen(c->obuf) > c->written) {
void *ptr = c->obuf+c->written;
ssize_t nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written);
@ -733,7 +733,9 @@ static clusterNode *createClusterNode(char *ip, int port) {
node->flags = 0;
node->replicate = NULL;
node->replicas_count = 0;
node->slots = listCreate();
node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int));
node->slots_count = 0;
node->current_slot_index = 0;
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
@ -757,7 +759,7 @@ static void freeClusterNode(clusterNode *node) {
* config.hostip and config.hostport, then the node ip has been
* allocated by fetchClusterConfiguration, so it must be freed. */
if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip);
listRelease(node->slots);
zfree(node->slots);
zfree(node);
}
@ -916,15 +918,15 @@ static int fetchClusterConfiguration() {
stop = atoi(p + 1);
while (start <= stop) {
int slot = start++;
listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
node->slots[node->slots_count++] = slot;
}
} else if (p > slotsdef) {
int slot = atoi(slotsdef);
listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
node->slots[node->slots_count++] = slot;
}
}
}
if (listLength(node->slots) == 0) {
if (node->slots_count == 0) {
printf("WARNING: master node %s:%d has no slots, skipping...\n",
node->ip, node->port);
continue;
@ -933,7 +935,6 @@ static int fetchClusterConfiguration() {
success = 0;
goto cleanup;
}
listRewind(node->slots, &(node->slot_iter));
}
cleanup:
if (ctx) redisFree(ctx);
@ -1268,19 +1269,19 @@ int main(int argc, const char **argv) {
}
if (test_is_selected("set")) {
len = redisFormatCommand(&cmd,"SET key:__rand_int__{tag} %s",data);
len = redisFormatCommand(&cmd,"SET key:{tag}:__rand_int__ %s",data);
benchmark("SET",cmd,len);
free(cmd);
}
if (test_is_selected("get")) {
len = redisFormatCommand(&cmd,"GET key:__rand_int__{tag}");
len = redisFormatCommand(&cmd,"GET key:{tag}:__rand_int__");
benchmark("GET",cmd,len);
free(cmd);
}
if (test_is_selected("incr")) {
len = redisFormatCommand(&cmd,"INCR counter:__rand_int__{tag}");
len = redisFormatCommand(&cmd,"INCR counter:{tag}:__rand_int__");
benchmark("INCR",cmd,len);
free(cmd);
}