From 4e78d5cd40f388819a0d304c915c589788ac6137 Mon Sep 17 00:00:00 2001 From: artix Date: Mon, 11 Feb 2019 17:57:20 +0100 Subject: [PATCH] Redis Benchmark: update slots configuration after MOVED/ASK reply --- src/Makefile | 2 +- src/redis-benchmark.c | 184 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 184 insertions(+), 2 deletions(-) diff --git a/src/Makefile b/src/Makefile index d4874f7c..9da1da8d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -168,7 +168,7 @@ REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc. REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark -REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o zmalloc.o redis-benchmark.o +REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o siphash.o redis-benchmark.o REDIS_CHECK_RDB_NAME=redis-check-rdb REDIS_CHECK_AOF_NAME=redis-check-aof diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 4722b84a..8f239ac1 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -46,6 +46,7 @@ #include "ae.h" #include "hiredis.h" #include "adlist.h" +#include "dict.h" #include "zmalloc.h" #include "atomicvar.h" #include "crc16_slottable.h" @@ -100,10 +101,17 @@ static struct config { int cluster_node_count; struct clusterNode **cluster_nodes; struct redisConfig *redis_config; + int is_fetching_slots; + int is_updating_slots; + int slots_last_update; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; pthread_mutex_t liveclients_mutex; + pthread_mutex_t is_fetching_slots_mutex; + pthread_mutex_t is_updating_slots_mutex; + pthread_mutex_t updating_slots_mutex; + pthread_mutex_t slots_last_update_mutex; } config; typedef struct _client { @@ -125,6 +133,7 @@ typedef struct _client { int prefixlen; /* Size in bytes of the pending prefix commands */ int thread_id; struct clusterNode *cluster_node; + int slots_last_update; } *client; /* Threads. */ @@ -143,8 +152,10 @@ typedef struct clusterNode { int flags; sds replicate; /* Master ID if node is a slave */ int *slots; - int current_slot_index; int slots_count; + int current_slot_index; + int *updated_slots; /* Used by updateClusterSlotsConfiguration */ + int updated_slots_count; /* Used by updateClusterSlotsConfiguration */ int replicas_count; sds *migrating; /* An array of sds where even strings are slots and odd * strings are the destination node IDs. */ @@ -171,9 +182,16 @@ static clusterNode *createClusterNode(char *ip, int port); static redisConfig *getRedisConfig(const char *ip, int port, const char *hostsocket); static void freeRedisConfig(redisConfig *cfg); +static int fetchClusterSlotsConfiguration(client c); +static void updateClusterSlotsConfiguration(); int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData); +/* Dict callbacks */ +static uint64_t dictSdsHash(const void *key); +static int dictSdsKeyCompare(void *privdata, const void *key1, + const void *key2); + /* Implementation */ static long long ustime(void) { struct timeval tv; @@ -195,6 +213,29 @@ static long long mstime(void) { return mst; } +static uint64_t dictSdsHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); +} + +static int dictSdsKeyCompare(void *privdata, const void *key1, + const void *key2) +{ + int l1,l2; + DICT_NOTUSED(privdata); + + l1 = sdslen((sds)key1); + l2 = sdslen((sds)key2); + if (l1 != l2) return 0; + return memcmp(key1, key2, l1) == 0; +} + +/* _serverAssert is needed by dict */ +void _serverAssert(const char *estr, const char *file, int line) { + fprintf(stderr, "=== ASSERTION FAILED ==="); + fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr); + *((char*)-1) = 'x'; +} + static redisConfig *getRedisConfig(const char *ip, int port, const char *hostsocket) { @@ -320,6 +361,15 @@ static void setClusterKeyHashTag(client c) { clusterNode *node = c->cluster_node; assert(node); assert(node->current_slot_index < node->slots_count); + int is_updating_slots = 0; + atomicGet(config.is_updating_slots, is_updating_slots); + /* If updateClusterSlotsConfiguration is updating the slots array, + * call updateClusterSlotsConfiguration is order to block the thread + * since the mutex is locked. When the slots will be updated by the + * thread that's actually performing the update, the execution of + * updateClusterSlotsConfiguration won't actually do anything, since + * the updated_slots_count array will be already NULL. */ + if (is_updating_slots) updateClusterSlotsConfiguration(); int slot = node->slots[node->current_slot_index]; const char *tag = crc16_slot_table[slot]; int taglen = strlen(tag); @@ -401,6 +451,11 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { (!strncmp(r->str,"MOVED",5) || !strcmp(r->str,"ASK"))) { + /* Try to update slots configuration if the key of the + * command is using the slot hash tag. */ + if (c->staglen && !fetchClusterSlotsConfiguration(c)) + exit(1); + /* clusterNode *node = c->cluster_node; assert(node); if (++node->current_slot_index >= node->slots_count) { @@ -412,6 +467,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { freeClient(c); return; } + */ } freeReplyObject(reply); @@ -466,6 +522,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); + atomicGet(config.slots_last_update, c->slots_last_update); c->start = ustime(); c->latency = -1; } @@ -659,6 +716,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); listAddNodeTail(config.clients,c); atomicIncr(config.liveclients, 1); + atomicGet(config.slots_last_update, c->slots_last_update); return c; } @@ -846,6 +904,8 @@ static clusterNode *createClusterNode(char *ip, int port) { node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int)); node->slots_count = 0; node->current_slot_index = 0; + node->updated_slots = NULL; + node->updated_slots_count = 0; node->migrating = NULL; node->importing = NULL; node->migrating_count = 0; @@ -1067,6 +1127,121 @@ cleanup: return success; } +/* Request the current cluster slots configuration by calling CLUSTER SLOTS + * and atomically update the slots after a successful reply. */ +static int fetchClusterSlotsConfiguration(client c) { + UNUSED(c); + int success = 1, is_fetching_slots = 0, last_update = 0; + size_t i; + atomicGet(config.slots_last_update, last_update); + if (c->slots_last_update < last_update) { + c->slots_last_update = last_update; + return -1; + } + redisReply *reply = NULL; + atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); + if (is_fetching_slots) return -1; //TODO: use other codes || errno ? + atomicSet(config.is_fetching_slots, 1); + if (config.showerrors) + printf("Cluster slots configuration changed, fetching new one...\n"); + const char *errmsg = "Failed to update cluster slots configuration"; + static dictType dtype = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL /* val destructor */ + }; + /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ + dict *masters = dictCreate(&dtype, NULL); + redisContext *ctx = NULL; + for (i = 0; i < (size_t) config.cluster_node_count; i++) { + clusterNode *node = config.cluster_nodes[i]; + assert(node->ip != NULL); + assert(node->name != NULL); + assert(node->port); + /* Use first node as entry point to connect to. */ + if (ctx == NULL) { + ctx = redisConnect(node->ip, node->port); + if (!ctx || ctx->err) { + success = 0; + if (ctx && ctx->err) + fprintf(stderr, "REDIS CONNECTION ERROR: %s\n", ctx->errstr); + goto cleanup; + } + } + if (node->updated_slots != NULL) + zfree(node->updated_slots); + node->updated_slots = NULL; + node->updated_slots_count = 0; + dictReplace(masters, node->name, node) ; + } + reply = redisCommand(ctx, "CLUSTER SLOTS"); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { + success = 0; + if (reply) + fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str); + goto cleanup; + } + assert(reply->type == REDIS_REPLY_ARRAY); + for (i = 0; i < reply->elements; i++) { + redisReply *r = reply->element[i]; + assert(r->type = REDIS_REPLY_ARRAY); + assert(r->elements >= 3); + int from, to, slot; + from = r->element[0]->integer; + to = r->element[1]->integer; + redisReply *nr = r->element[2]; + assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); + assert(nr->element[2]->str != NULL); + sds name = sdsnew(nr->element[2]->str); + dictEntry *entry = dictFind(masters, name); + if (entry == NULL) { + success = 0; + fprintf(stderr, "%s: could not find node with ID %s in current " + "configuration.\n", errmsg, name); + if (name) sdsfree(name); + goto cleanup; + } + sdsfree(name); + clusterNode *node = dictGetVal(entry); + if (node->updated_slots == NULL) + node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int)); + for (slot = from; slot <= to; slot++) + node->updated_slots[node->updated_slots_count++] = slot; + } + updateClusterSlotsConfiguration(); +cleanup: + freeReplyObject(reply); + redisFree(ctx); + dictRelease(masters); + atomicSet(config.is_fetching_slots, 0); + return success; +} + +/* Atomically update the new slots configuration. */ +static void updateClusterSlotsConfiguration() { + pthread_mutex_lock(&config.is_updating_slots_mutex); + atomicSet(config.is_updating_slots, 1); + int i; + for (i = 0; i < config.cluster_node_count; i++) { + clusterNode *node = config.cluster_nodes[i]; + if (node->updated_slots != NULL) { + int *oldslots = node->slots; + node->slots = node->updated_slots; + node->slots_count = node->updated_slots_count; + node->current_slot_index = 0; + node->updated_slots = NULL; + node->updated_slots_count = 0; + zfree(oldslots); + } + } + atomicSet(config.is_updating_slots, 0); + atomicIncr(config.slots_last_update, 1); + pthread_mutex_unlock(&config.is_updating_slots_mutex); +} + /* Returns number of consumed options. */ int parseOptions(int argc, const char **argv) { int i; @@ -1301,6 +1476,9 @@ int main(int argc, const char **argv) { config.cluster_node_count = 0; config.cluster_nodes = NULL; config.redis_config = NULL; + config.is_fetching_slots = 0; + config.is_updating_slots = 0; + config.slots_last_update = 0; i = parseOptions(argc,argv); argc -= i; @@ -1354,6 +1532,10 @@ int main(int argc, const char **argv) { pthread_mutex_init(&(config.requests_issued_mutex), NULL); pthread_mutex_init(&(config.requests_finished_mutex), NULL); pthread_mutex_init(&(config.liveclients_mutex), NULL); + pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL); + pthread_mutex_init(&(config.is_updating_slots_mutex), NULL); + pthread_mutex_init(&(config.updating_slots_mutex), NULL); + pthread_mutex_init(&(config.slots_last_update_mutex), NULL); } if (config.keepalive == 0) {