diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index aed4c70d..8e38265f 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -61,6 +61,7 @@ struct benchmarkThread; struct clusterNode; +struct redisConfig; static struct config { aeEventLoop *el; @@ -98,6 +99,7 @@ static struct config { int cluster_mode; int cluster_node_count; struct clusterNode **cluster_nodes; + struct redisConfig *redis_config; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; @@ -150,8 +152,14 @@ typedef struct clusterNode { * strings are the source node IDs. */ int migrating_count; /* Length of the migrating array (migrating slots*2) */ int importing_count; /* Length of the importing array (importing slots*2) */ + struct redisConfig *redis_config; } clusterNode; +typedef struct redisConfig { + sds save; + sds appendonly; +} redisConfig; + /* Prototypes */ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); static void createMissingClients(client c); @@ -160,6 +168,9 @@ static void freeBenchmarkThread(benchmarkThread *thread); static void freeBenchmarkThreads(); static void *execBenchmarkThread(void *ptr); static clusterNode *createClusterNode(char *ip, int port); +static redisConfig *getRedisConfig(const char *ip, int port, + const char *hostsocket); +static void freeRedisConfig(redisConfig *cfg); int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData); @@ -184,6 +195,66 @@ static long long mstime(void) { return mst; } +static redisConfig *getRedisConfig(const char *ip, int port, + const char *hostsocket) +{ + redisConfig *cfg = zcalloc(sizeof(*cfg)); + if (!cfg) return NULL; + redisContext *c = NULL; + redisReply *reply = NULL, *sub_reply = NULL; + if (hostsocket == NULL) + c = redisConnect(ip, port); + else + c = redisConnectUnix(hostsocket); + if (c->err) { + fprintf(stderr,"Could not connect to Redis at "); + if (hostsocket == NULL) + fprintf(stderr,"%s:%d: %s\n",ip,port,c->errstr); + else fprintf(stderr,"%s: %s\n",hostsocket,c->errstr); + goto fail; + } + redisAppendCommand(c, "CONFIG GET %s", "save"); + redisAppendCommand(c, "CONFIG GET %s", "appendonly"); + int i = 0; + void *r = NULL; + for (; i < 2; i++) { + int res = redisGetReply(c, &r); + if (reply) freeReplyObject(reply); + reply = ((redisReply *) r); + if (res != REDIS_OK || !r) goto fail; + if (reply->type == REDIS_REPLY_ERROR) { + fprintf(stderr, "ERROR: %s\n", reply->str); + goto fail; + } + if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail; + sub_reply = reply->element[1]; + char *value = sub_reply->str; + if (!value) value = ""; + switch (i) { + case 0: cfg->save = sdsnew(value); break; + case 1: cfg->appendonly = sdsnew(value); break; + } + } + if (reply) freeReplyObject(reply); + if (c) redisFree(c); + return cfg; +fail: + if (reply) freeReplyObject(reply); + if (c) redisFree(c); + zfree(cfg); + fprintf(stderr, "ERROR: failed to fetch CONFIG from "); + if (c->connection_type == REDIS_CONN_TCP) + fprintf(stderr, "%s:%d\n", c->tcp.host, c->tcp.port); + else if (c->connection_type == REDIS_CONN_UNIX) + fprintf(stderr, "%s\n", c->unix_sock.path); + return NULL; +} +static void freeRedisConfig(redisConfig *cfg) { + if (cfg->save) sdsfree(cfg->save); + if (cfg->appendonly) sdsfree(cfg->appendonly); + zfree(cfg); +} + static void freeClient(client c) { aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); listNode *ln; @@ -455,7 +526,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { else node_idx = thread_id % config.cluster_node_count; clusterNode *node = config.cluster_nodes[node_idx]; - if (node == NULL) exit(1); + assert(node != NULL); ip = (const char *) node->ip; port = node->port; c->cluster_node = node; @@ -634,9 +705,31 @@ static void showLatencyReport(void) { printf(" %d parallel clients\n", config.numclients); printf(" %d bytes payload\n", config.datasize); printf(" keep alive: %d\n", config.keepalive); + if (config.cluster_mode) { + printf(" cluster mode: yes (%d masters)\n", + config.cluster_node_count); + int m ; + for (m = 0; m < config.cluster_node_count; m++) { + clusterNode *node = config.cluster_nodes[m]; + redisConfig *cfg = node->redis_config; + if (cfg == NULL) continue; + printf(" node [%d] configuration:\n",m ); + printf(" save: %s\n", + sdslen(cfg->save) ? cfg->save : "NONE"); + printf(" appendonly: %s\n", cfg->appendonly); + } + } else { + if (config.redis_config) { + printf(" host configuration \"save\": %s\n", + config.redis_config->save); + printf(" host configuration \"appendonly\": %s\n", + config.redis_config->appendonly); + } + } printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no")); if (config.num_threads) printf(" threads: %d\n", config.num_threads); + printf("\n"); qsort(config.latency,config.requests,sizeof(long long),compareLatency); @@ -707,6 +800,8 @@ static void benchmark(char *title, char *cmd, int len) { if (config.threads) freeBenchmarkThreads(); } +/* Thread functions. */ + static benchmarkThread *createBenchmarkThread(int index) { benchmarkThread *thread = zmalloc(sizeof(*thread)); if (thread == NULL) return NULL; @@ -737,6 +832,8 @@ static void *execBenchmarkThread(void *ptr) { return NULL; } +/* Cluster helper functions. */ + static clusterNode *createClusterNode(char *ip, int port) { clusterNode *node = zmalloc(sizeof(*node)); if (!node) return NULL; @@ -753,6 +850,7 @@ static clusterNode *createClusterNode(char *ip, int port) { node->importing = NULL; node->migrating_count = 0; node->importing_count = 0; + node->redis_config = NULL; return node; } @@ -772,6 +870,7 @@ static void freeClusterNode(clusterNode *node) { * config.hostip and config.hostport, then the node ip has been * allocated by fetchClusterConfiguration, so it must be freed. */ if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip); + if (node->redis_config != NULL) freeRedisConfig(node->redis_config); zfree(node->slots); zfree(node); } @@ -783,6 +882,7 @@ static void freeClusterNodes() { if (n) freeClusterNode(n); } zfree(config.cluster_nodes); + config.cluster_nodes = NULL; } static clusterNode **addClusterNode(clusterNode *node) { @@ -1200,6 +1300,7 @@ int main(int argc, const char **argv) { config.cluster_mode = 0; config.cluster_node_count = 0; config.cluster_nodes = NULL; + config.redis_config = NULL; i = parseOptions(argc,argv); argc -= i; @@ -1232,13 +1333,21 @@ int main(int argc, const char **argv) { fprintf(stderr, "Invalid cluster node #%d\n", i); exit(1); } + printf("Master %d: ", i); if (node->name) printf("%s ", node->name); printf("%s:%d\n", node->ip, node->port); + node->redis_config = getRedisConfig(node->ip, node->port, NULL); + if (node->redis_config == NULL) exit(1); } + printf("\n"); /* Automatically set thread number to node count if not specified * by the user. */ if (config.num_threads == 0) config.num_threads = config.cluster_node_count; + } else { + config.redis_config = + getRedisConfig(config.hostip, config.hostport, config.hostsocket); + if (config.redis_config == NULL) exit(1); } if (config.num_threads > 0) { @@ -1273,6 +1382,7 @@ int main(int argc, const char **argv) { free(cmd); } while(config.loop); + if (config.redis_config != NULL) freeRedisConfig(config.redis_config); return 0; } @@ -1403,5 +1513,7 @@ int main(int argc, const char **argv) { if (!config.csv) printf("\n"); } while(config.loop); + if (config.redis_config != NULL) freeRedisConfig(config.redis_config); + return 0; }