Redis benchmark: table-based slot hashtag placeholder replacement in cluster mode.

This commit is contained in:
artix 2018-09-29 12:59:03 +02:00
parent dfd3cc5f78
commit 434f761304

View File

@ -98,7 +98,6 @@ static struct config {
int cluster_mode;
int cluster_node_count;
struct clusterNode **cluster_nodes;
struct clusterNode *cluster_slots[CLUSTER_SLOTS];
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
@ -111,6 +110,9 @@ typedef struct _client {
char **randptr; /* Pointers to :rand: strings inside the command buf */
size_t randlen; /* Number of pointers in client->randptr */
size_t randfree; /* Number of unused pointers in client->randptr */
char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
size_t staglen; /* Number of pointers in client->stagptr */
size_t stagfree; /* Number of unused pointers in client->stagptr */
size_t written; /* Bytes of 'obuf' already written */
long long start; /* Start time of a request */
long long latency; /* Request latency */
@ -120,6 +122,7 @@ typedef struct _client {
benchmark commands and discarded after the first send. */
int prefixlen; /* Size in bytes of the pending prefix commands */
int thread_id;
struct clusterNode *cluster_node;
} *client;
/* Threads. */
@ -137,8 +140,8 @@ typedef struct clusterNode {
sds name;
int flags;
sds replicate; /* Master ID if node is a slave */
uint8_t slots[CLUSTER_SLOTS];
int slots_count;
list *slots;
listIter slot_iter;
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
@ -188,6 +191,7 @@ static void freeClient(client c) {
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
zfree(c->stagptr);
zfree(c);
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
config.liveclients--;
@ -232,6 +236,28 @@ static void randomizeClientKey(client c) {
}
}
static void setClusterKeyHashTag(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
assert(node);
listNode *ln = listNext(&node->slot_iter);
if (ln == NULL) {
listRewind(node->slots, &(node->slot_iter));
ln = listNext(&(node->slot_iter));
assert(ln != NULL);
}
int slot = (int) ln->value;
const char *tag = crc16_slot_table[slot];
int taglen = strlen(tag);
size_t i;
for (i = 0; i < c->staglen; i++) {
char *p = c->stagptr[i] + 1;
p[0] = tag[0];
p[1] = (taglen >= 2 ? tag[1] : '}');
p[2] = (taglen == 3 ? tag[2] : '}');
}
}
static void clientDone(client c) {
int requests_finished = 0;
if (!config.num_threads) requests_finished = config.requests_finished;
@ -294,7 +320,12 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
redisReply *r = reply;
if (r->type == REDIS_REPLY_ERROR && lasterr_time != now) {
lasterr_time = now;
printf("Error from server: %s\n", r->str);
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
}
}
@ -354,6 +385,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Really initialize: randomize keys and set start time. */
if (config.randomkeys) randomizeClientKey(c);
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
c->start = ustime();
c->latency = -1;
}
@ -401,9 +433,10 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = zmalloc(sizeof(struct _client));
const char *ip;
int port;
c->cluster_node = NULL;
if (config.hostsocket == NULL || is_cluster_client) {
const char *ip;
int port;
if (!is_cluster_client) {
ip = config.hostip;
port = config.hostport;
@ -412,6 +445,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
if (node == NULL) exit(1);
ip = (const char *) node->ip;
port = node->port;
c->cluster_node = node;
}
c->context = redisConnectNonBlock(ip,port);
} else {
@ -419,8 +453,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
}
if (c->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
if (config.hostsocket == NULL)
fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,c->context->errstr);
if (config.hostsocket == NULL || is_cluster_client)
fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
else
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
@ -469,6 +503,8 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->pending = config.pipeline+c->prefix_pending;
c->randptr = NULL;
c->randlen = 0;
c->stagptr = NULL;
c->staglen = 0;
/* Find substrings in the output buffer that need to be randomized. */
if (config.randomkeys) {
@ -499,6 +535,36 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
}
}
}
/* If cluster mode is enabled, set slot hashtags pointers. */
if (config.cluster_mode) {
if (from) {
c->staglen = from->staglen;
c->stagfree = 0;
c->stagptr = zmalloc(sizeof(char*)*c->staglen);
/* copy the offsets. */
for (j = 0; j < (int)c->staglen; j++) {
c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
/* Adjust for the different select prefix length. */
c->stagptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;
c->staglen = 0;
c->stagfree = RANDPTR_INITIAL_SIZE;
c->stagptr = zmalloc(sizeof(char*)*c->stagfree);
while ((p = strstr(p,"{tag}")) != NULL) {
if (c->stagfree == 0) {
c->stagptr = zrealloc(c->stagptr,
sizeof(char*)*c->staglen*2);
c->stagfree += c->staglen;
}
c->stagptr[c->staglen++] = p;
c->stagfree--;
p += 5; /* 12 is strlen("{tag}"). */
}
}
}
aeEventLoop *el = NULL;
if (thread_id < 0) el = config.el;
else {
@ -667,8 +733,7 @@ static clusterNode *createClusterNode(char *ip, int port) {
node->flags = 0;
node->replicate = NULL;
node->replicas_count = 0;
memset(node->slots, 0, sizeof(node->slots));
node->slots_count = 0;
node->slots = listCreate();
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
@ -688,6 +753,11 @@ static void freeClusterNode(clusterNode *node) {
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
zfree(node->importing);
}
/* If the node is not the reference node, that uses the address from
* config.hostip and config.hostport, then the node ip has been
* allocated by fetchClusterConfiguration, so it must be freed. */
if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip);
listRelease(node->slots);
zfree(node);
}
@ -785,17 +855,13 @@ static int fetchClusterConfiguration() {
/* If internal bus is specified, then just drop it. */
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
int port = atoi(addr);
node = createClusterNode(ip, port);
node = createClusterNode(sdsnew(ip), port);
}
if (node == NULL) {
success = 0;
goto cleanup;
}
node->name = sdsnew(name);
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
}
if (i == 8) {
int remaining = strlen(line);
while (remaining > 0) {
@ -848,20 +914,26 @@ static int fetchClusterConfiguration() {
*p = '\0';
start = atoi(slotsdef);
stop = atoi(p + 1);
node->slots_count += (stop - (start - 1));
while (start <= stop) {
int slot = start++;
node->slots[slot] = 1;
config.cluster_slots[slot] = node;
listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
}
} else if (p > slotsdef) {
int slot = atoi(slotsdef);
node->slots[slot] = 1;
node->slots_count++;
config.cluster_slots[slot] = node;
listAddNodeTail(node->slots, (void *)(uintptr_t)slot);
}
}
}
if (listLength(node->slots) == 0) {
printf("WARNING: master node %s:%d has no slots, skipping...\n",
node->ip, node->port);
continue;
}
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
}
listRewind(node->slots, &(node->slot_iter));
}
cleanup:
if (ctx) redisFree(ctx);
@ -1106,7 +1178,6 @@ int main(int argc, const char **argv) {
config.cluster_mode = 0;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
memset(config.cluster_slots, 0, sizeof(config.cluster_slots));
i = parseOptions(argc,argv);
argc -= i;
@ -1142,6 +1213,7 @@ int main(int argc, const char **argv) {
if (node->name) printf("%s ", node->name);
printf("%s:%d\n", node->ip, node->port);
}
/* TODO: allow for more thrads per cluster node. */
config.num_threads = config.cluster_node_count;
}
@ -1196,19 +1268,19 @@ int main(int argc, const char **argv) {
}
if (test_is_selected("set")) {
len = redisFormatCommand(&cmd,"SET key:__rand_int__ %s",data);
len = redisFormatCommand(&cmd,"SET key:__rand_int__{tag} %s",data);
benchmark("SET",cmd,len);
free(cmd);
}
if (test_is_selected("get")) {
len = redisFormatCommand(&cmd,"GET key:__rand_int__");
len = redisFormatCommand(&cmd,"GET key:__rand_int__{tag}");
benchmark("GET",cmd,len);
free(cmd);
}
if (test_is_selected("incr")) {
len = redisFormatCommand(&cmd,"INCR counter:__rand_int__");
len = redisFormatCommand(&cmd,"INCR counter:__rand_int__{tag}");
benchmark("INCR",cmd,len);
free(cmd);
}