Redis benchmark: fixed issued with config.hostip and code cleanup

This commit is contained in:
artix 2019-01-18 17:31:32 +01:00
parent f95e01266e
commit eb8b4feef7

View File

@ -263,8 +263,7 @@ static void setClusterKeyHashTag(client c) {
static void clientDone(client c) { static void clientDone(client c) {
int requests_finished = 0; int requests_finished = 0;
if (!config.num_threads) requests_finished = config.requests_finished; atomicGet(config.requests_finished, requests_finished);
else atomicGet(config.requests_finished, requests_finished);
if (requests_finished >= config.requests) { if (requests_finished >= config.requests) {
freeClient(c); freeClient(c);
if (!config.num_threads && config.el) aeStop(config.el); if (!config.num_threads && config.el) aeStop(config.el);
@ -387,8 +386,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (c->written == 0) { if (c->written == 0) {
/* Enforce upper bound to number of requests. */ /* Enforce upper bound to number of requests. */
int requests_issued = 0; int requests_issued = 0;
if (!config.num_threads) requests_issued = config.requests_issued++; atomicGetIncr(config.requests_issued, requests_issued, 1);
else atomicGetIncr(config.requests_issued, requests_issued, 1);
if (requests_issued >= config.requests) { if (requests_issued >= config.requests) {
freeClient(c); freeClient(c);
return; return;
@ -498,7 +496,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
* buffer with the SELECT command, that will be discarded the first * buffer with the SELECT command, that will be discarded the first
* time the replies are received, so if the client is reused the * time the replies are received, so if the client is reused the
* SELECT command will not be used again. */ * SELECT command will not be used again. */
if (config.dbnum != 0) { if (config.dbnum != 0 && !is_cluster_client) {
c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
(int)sdslen(config.dbnumstr),config.dbnumstr); (int)sdslen(config.dbnumstr),config.dbnumstr);
c->prefix_pending++; c->prefix_pending++;
@ -820,8 +818,13 @@ static int fetchClusterConfiguration() {
if (!success) goto cleanup; if (!success) goto cleanup;
success = (reply->type != REDIS_REPLY_ERROR); success = (reply->type != REDIS_REPLY_ERROR);
if (!success) { if (!success) {
fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n", if (config.hostsocket == NULL) {
config.hostip, config.hostport, reply->str); fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
config.hostip, config.hostport, reply->str);
} else {
fprintf(stderr, "Cluster node %s replied with error:\n%s\n",
config.hostsocket, reply->str);
}
goto cleanup; goto cleanup;
} }
char *lines = reply->str, *p, *line; char *lines = reply->str, *p, *line;
@ -852,33 +855,37 @@ static int fetchClusterConfiguration() {
int is_replica = (strstr(flags, "slave") != NULL || int is_replica = (strstr(flags, "slave") != NULL ||
(master_id != NULL && master_id[0] != '-')); (master_id != NULL && master_id[0] != '-'));
if (is_replica) continue; if (is_replica) continue;
if (addr == NULL) {
fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
success = 0;
goto cleanup;
}
clusterNode *node = NULL; clusterNode *node = NULL;
if (myself) { char *ip = NULL;
node = firstNode; int port = 0;
} else { char *paddr = strchr(addr, ':');
if (addr == NULL) { if (paddr != NULL) {
fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
success = 0;
goto cleanup;
}
char *paddr = strchr(addr, ':');
if (paddr == NULL) {
success = 0;
goto cleanup;
}
*paddr = '\0'; *paddr = '\0';
char *ip = addr; ip = addr;
addr = paddr + 1; addr = paddr + 1;
/* If internal bus is specified, then just drop it. */ /* If internal bus is specified, then just drop it. */
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
int port = atoi(addr); port = atoi(addr);
}
if (myself) {
node = firstNode;
if (node->ip == NULL && ip != NULL) {
node->ip = ip;
node->port = port;
}
} else {
node = createClusterNode(sdsnew(ip), port); node = createClusterNode(sdsnew(ip), port);
} }
if (node == NULL) { if (node == NULL) {
success = 0; success = 0;
goto cleanup; goto cleanup;
} }
node->name = sdsnew(name); if (name != NULL) node->name = sdsnew(name);
if (i == 8) { if (i == 8) {
int remaining = strlen(line); int remaining = strlen(line);
while (remaining > 0) { while (remaining > 0) {
@ -1115,13 +1122,8 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
UNUSED(clientData); UNUSED(clientData);
int liveclients = 0; int liveclients = 0;
int requests_finished = 0; int requests_finished = 0;
if (!config.num_threads) { atomicGet(config.liveclients, liveclients);
liveclients = config.liveclients; atomicGet(config.requests_finished, requests_finished);
requests_finished = config.requests_finished;
} else {
atomicGet(config.liveclients, liveclients);
atomicGet(config.requests_finished, requests_finished);
}
if (liveclients == 0 && requests_finished != config.requests) { if (liveclients == 0 && requests_finished != config.requests) {
fprintf(stderr,"All clients disconnected... aborting.\n"); fprintf(stderr,"All clients disconnected... aborting.\n");