From 434f7613041045d6794bde8b93aa1e733bbab6ca Mon Sep 17 00:00:00 2001 From: artix Date: Sat, 29 Sep 2018 12:59:03 +0200 Subject: [PATCH] Redis benchmark: table-based slot hashtag placeholder replacement in cluster mode. --- src/redis-benchmark.c | 122 +++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 25 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 636114a2..8db554fe 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -98,7 +98,6 @@ static struct config { int cluster_mode; int cluster_node_count; struct clusterNode **cluster_nodes; - struct clusterNode *cluster_slots[CLUSTER_SLOTS]; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; @@ -111,6 +110,9 @@ typedef struct _client { char **randptr; /* Pointers to :rand: strings inside the command buf */ size_t randlen; /* Number of pointers in client->randptr */ size_t randfree; /* Number of unused pointers in client->randptr */ + char **stagptr; /* Pointers to slot hashtags (cluster mode only) */ + size_t staglen; /* Number of pointers in client->stagptr */ + size_t stagfree; /* Number of unused pointers in client->stagptr */ size_t written; /* Bytes of 'obuf' already written */ long long start; /* Start time of a request */ long long latency; /* Request latency */ @@ -120,6 +122,7 @@ typedef struct _client { benchmark commands and discarded after the first send. */ int prefixlen; /* Size in bytes of the pending prefix commands */ int thread_id; + struct clusterNode *cluster_node; } *client; /* Threads. */ @@ -137,8 +140,8 @@ typedef struct clusterNode { sds name; int flags; sds replicate; /* Master ID if node is a slave */ - uint8_t slots[CLUSTER_SLOTS]; - int slots_count; + list *slots; + listIter slot_iter; int replicas_count; sds *migrating; /* An array of sds where even strings are slots and odd * strings are the destination node IDs. */ @@ -188,6 +191,7 @@ static void freeClient(client c) { redisFree(c->context); sdsfree(c->obuf); zfree(c->randptr); + zfree(c->stagptr); zfree(c); if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); config.liveclients--; @@ -232,6 +236,28 @@ static void randomizeClientKey(client c) { } } +static void setClusterKeyHashTag(client c) { + assert(c->thread_id >= 0); + clusterNode *node = c->cluster_node; + assert(node); + listNode *ln = listNext(&node->slot_iter); + if (ln == NULL) { + listRewind(node->slots, &(node->slot_iter)); + ln = listNext(&(node->slot_iter)); + assert(ln != NULL); + } + int slot = (int) ln->value; + const char *tag = crc16_slot_table[slot]; + int taglen = strlen(tag); + size_t i; + for (i = 0; i < c->staglen; i++) { + char *p = c->stagptr[i] + 1; + p[0] = tag[0]; + p[1] = (taglen >= 2 ? tag[1] : '}'); + p[2] = (taglen == 3 ? tag[2] : '}'); + } +} + static void clientDone(client c) { int requests_finished = 0; if (!config.num_threads) requests_finished = config.requests_finished; @@ -294,7 +320,12 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { redisReply *r = reply; if (r->type == REDIS_REPLY_ERROR && lasterr_time != now) { lasterr_time = now; - printf("Error from server: %s\n", r->str); + if (c->cluster_node) { + printf("Error from server %s:%d: %s\n", + c->cluster_node->ip, + c->cluster_node->port, + r->str); + } else printf("Error from server: %s\n", r->str); } } @@ -354,6 +385,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); + if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); c->start = ustime(); c->latency = -1; } @@ -401,9 +433,10 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { int is_cluster_client = (config.cluster_mode && thread_id >= 0); client c = zmalloc(sizeof(struct _client)); + const char *ip; + int port; + c->cluster_node = NULL; if (config.hostsocket == NULL || is_cluster_client) { - const char *ip; - int port; if (!is_cluster_client) { ip = config.hostip; port = config.hostport; @@ -412,6 +445,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { if (node == NULL) exit(1); ip = (const char *) node->ip; port = node->port; + c->cluster_node = node; } c->context = redisConnectNonBlock(ip,port); } else { @@ -419,8 +453,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { } if (c->context->err) { fprintf(stderr,"Could not connect to Redis at "); - if (config.hostsocket == NULL) - fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,c->context->errstr); + if (config.hostsocket == NULL || is_cluster_client) + fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr); else fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr); exit(1); @@ -469,6 +503,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->pending = config.pipeline+c->prefix_pending; c->randptr = NULL; c->randlen = 0; + c->stagptr = NULL; + c->staglen = 0; /* Find substrings in the output buffer that need to be randomized. */ if (config.randomkeys) { @@ -499,6 +535,36 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { } } } + /* If cluster mode is enabled, set slot hashtags pointers. */ + if (config.cluster_mode) { + if (from) { + c->staglen = from->staglen; + c->stagfree = 0; + c->stagptr = zmalloc(sizeof(char*)*c->staglen); + /* copy the offsets. */ + for (j = 0; j < (int)c->staglen; j++) { + c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf); + /* Adjust for the different select prefix length. */ + c->stagptr[j] += c->prefixlen - from->prefixlen; + } + } else { + char *p = c->obuf; + + c->staglen = 0; + c->stagfree = RANDPTR_INITIAL_SIZE; + c->stagptr = zmalloc(sizeof(char*)*c->stagfree); + while ((p = strstr(p,"{tag}")) != NULL) { + if (c->stagfree == 0) { + c->stagptr = zrealloc(c->stagptr, + sizeof(char*)*c->staglen*2); + c->stagfree += c->staglen; + } + c->stagptr[c->staglen++] = p; + c->stagfree--; + p += 5; /* 12 is strlen("{tag}"). */ + } + } + } aeEventLoop *el = NULL; if (thread_id < 0) el = config.el; else { @@ -667,8 +733,7 @@ static clusterNode *createClusterNode(char *ip, int port) { node->flags = 0; node->replicate = NULL; node->replicas_count = 0; - memset(node->slots, 0, sizeof(node->slots)); - node->slots_count = 0; + node->slots = listCreate(); node->migrating = NULL; node->importing = NULL; node->migrating_count = 0; @@ -688,6 +753,11 @@ static void freeClusterNode(clusterNode *node) { for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); zfree(node->importing); } + /* If the node is not the reference node, that uses the address from + * config.hostip and config.hostport, then the node ip has been + * allocated by fetchClusterConfiguration, so it must be freed. */ + if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip); + listRelease(node->slots); zfree(node); } @@ -785,17 +855,13 @@ static int fetchClusterConfiguration() { /* If internal bus is specified, then just drop it. */ if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; int port = atoi(addr); - node = createClusterNode(ip, port); + node = createClusterNode(sdsnew(ip), port); } if (node == NULL) { success = 0; goto cleanup; } node->name = sdsnew(name); - if (!addClusterNode(node)) { - success = 0; - goto cleanup; - } if (i == 8) { int remaining = strlen(line); while (remaining > 0) { @@ -848,20 +914,26 @@ static int fetchClusterConfiguration() { *p = '\0'; start = atoi(slotsdef); stop = atoi(p + 1); - node->slots_count += (stop - (start - 1)); while (start <= stop) { int slot = start++; - node->slots[slot] = 1; - config.cluster_slots[slot] = node; + listAddNodeTail(node->slots, (void *)(uintptr_t)slot); } } else if (p > slotsdef) { int slot = atoi(slotsdef); - node->slots[slot] = 1; - node->slots_count++; - config.cluster_slots[slot] = node; + listAddNodeTail(node->slots, (void *)(uintptr_t)slot); } } } + if (listLength(node->slots) == 0) { + printf("WARNING: master node %s:%d has no slots, skipping...\n", + node->ip, node->port); + continue; + } + if (!addClusterNode(node)) { + success = 0; + goto cleanup; + } + listRewind(node->slots, &(node->slot_iter)); } cleanup: if (ctx) redisFree(ctx); @@ -1106,7 +1178,6 @@ int main(int argc, const char **argv) { config.cluster_mode = 0; config.cluster_node_count = 0; config.cluster_nodes = NULL; - memset(config.cluster_slots, 0, sizeof(config.cluster_slots)); i = parseOptions(argc,argv); argc -= i; @@ -1142,6 +1213,7 @@ int main(int argc, const char **argv) { if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); } + /* TODO: allow for more thrads per cluster node. */ config.num_threads = config.cluster_node_count; } @@ -1196,19 +1268,19 @@ int main(int argc, const char **argv) { } if (test_is_selected("set")) { - len = redisFormatCommand(&cmd,"SET key:__rand_int__ %s",data); + len = redisFormatCommand(&cmd,"SET key:__rand_int__{tag} %s",data); benchmark("SET",cmd,len); free(cmd); } if (test_is_selected("get")) { - len = redisFormatCommand(&cmd,"GET key:__rand_int__"); + len = redisFormatCommand(&cmd,"GET key:__rand_int__{tag}"); benchmark("GET",cmd,len); free(cmd); } if (test_is_selected("incr")) { - len = redisFormatCommand(&cmd,"INCR counter:__rand_int__"); + len = redisFormatCommand(&cmd,"INCR counter:__rand_int__{tag}"); benchmark("INCR",cmd,len); free(cmd); }