Redis Benchmark: update slots configuration after MOVED/ASK reply

This commit is contained in:
artix 2019-02-11 17:57:20 +01:00
parent daaff484a6
commit 4e78d5cd40
2 changed files with 184 additions and 2 deletions

View File

@ -168,7 +168,7 @@ REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o zmalloc.o redis-benchmark.o
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o siphash.o redis-benchmark.o
REDIS_CHECK_RDB_NAME=redis-check-rdb
REDIS_CHECK_AOF_NAME=redis-check-aof

View File

@ -46,6 +46,7 @@
#include "ae.h"
#include "hiredis.h"
#include "adlist.h"
#include "dict.h"
#include "zmalloc.h"
#include "atomicvar.h"
#include "crc16_slottable.h"
@ -100,10 +101,17 @@ static struct config {
int cluster_node_count;
struct clusterNode **cluster_nodes;
struct redisConfig *redis_config;
int is_fetching_slots;
int is_updating_slots;
int slots_last_update;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
pthread_mutex_t is_fetching_slots_mutex;
pthread_mutex_t is_updating_slots_mutex;
pthread_mutex_t updating_slots_mutex;
pthread_mutex_t slots_last_update_mutex;
} config;
typedef struct _client {
@ -125,6 +133,7 @@ typedef struct _client {
int prefixlen; /* Size in bytes of the pending prefix commands */
int thread_id;
struct clusterNode *cluster_node;
int slots_last_update;
} *client;
/* Threads. */
@ -143,8 +152,10 @@ typedef struct clusterNode {
int flags;
sds replicate; /* Master ID if node is a slave */
int *slots;
int current_slot_index;
int slots_count;
int current_slot_index;
int *updated_slots; /* Used by updateClusterSlotsConfiguration */
int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
@ -171,9 +182,16 @@ static clusterNode *createClusterNode(char *ip, int port);
static redisConfig *getRedisConfig(const char *ip, int port,
const char *hostsocket);
static void freeRedisConfig(redisConfig *cfg);
static int fetchClusterSlotsConfiguration(client c);
static void updateClusterSlotsConfiguration();
int showThroughput(struct aeEventLoop *eventLoop, long long id,
void *clientData);
/* Dict callbacks */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(void *privdata, const void *key1,
const void *key2);
/* Implementation */
static long long ustime(void) {
struct timeval tv;
@ -195,6 +213,29 @@ static long long mstime(void) {
return mst;
}
static uint64_t dictSdsHash(const void *key) {
return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}
static int dictSdsKeyCompare(void *privdata, const void *key1,
const void *key2)
{
int l1,l2;
DICT_NOTUSED(privdata);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
if (l1 != l2) return 0;
return memcmp(key1, key2, l1) == 0;
}
/* _serverAssert is needed by dict */
void _serverAssert(const char *estr, const char *file, int line) {
fprintf(stderr, "=== ASSERTION FAILED ===");
fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
*((char*)-1) = 'x';
}
static redisConfig *getRedisConfig(const char *ip, int port,
const char *hostsocket)
{
@ -320,6 +361,15 @@ static void setClusterKeyHashTag(client c) {
clusterNode *node = c->cluster_node;
assert(node);
assert(node->current_slot_index < node->slots_count);
int is_updating_slots = 0;
atomicGet(config.is_updating_slots, is_updating_slots);
/* If updateClusterSlotsConfiguration is updating the slots array,
* call updateClusterSlotsConfiguration is order to block the thread
* since the mutex is locked. When the slots will be updated by the
* thread that's actually performing the update, the execution of
* updateClusterSlotsConfiguration won't actually do anything, since
* the updated_slots_count array will be already NULL. */
if (is_updating_slots) updateClusterSlotsConfiguration();
int slot = node->slots[node->current_slot_index];
const char *tag = crc16_slot_table[slot];
int taglen = strlen(tag);
@ -401,6 +451,11 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
(!strncmp(r->str,"MOVED",5) ||
!strcmp(r->str,"ASK")))
{
/* Try to update slots configuration if the key of the
* command is using the slot hash tag. */
if (c->staglen && !fetchClusterSlotsConfiguration(c))
exit(1);
/*
clusterNode *node = c->cluster_node;
assert(node);
if (++node->current_slot_index >= node->slots_count) {
@ -412,6 +467,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
freeClient(c);
return;
}
*/
}
freeReplyObject(reply);
@ -466,6 +522,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Really initialize: randomize keys and set start time. */
if (config.randomkeys) randomizeClientKey(c);
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
atomicGet(config.slots_last_update, c->slots_last_update);
c->start = ustime();
c->latency = -1;
}
@ -659,6 +716,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
listAddNodeTail(config.clients,c);
atomicIncr(config.liveclients, 1);
atomicGet(config.slots_last_update, c->slots_last_update);
return c;
}
@ -846,6 +904,8 @@ static clusterNode *createClusterNode(char *ip, int port) {
node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int));
node->slots_count = 0;
node->current_slot_index = 0;
node->updated_slots = NULL;
node->updated_slots_count = 0;
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
@ -1067,6 +1127,121 @@ cleanup:
return success;
}
/* Request the current cluster slots configuration by calling CLUSTER SLOTS
* and atomically update the slots after a successful reply. */
static int fetchClusterSlotsConfiguration(client c) {
UNUSED(c);
int success = 1, is_fetching_slots = 0, last_update = 0;
size_t i;
atomicGet(config.slots_last_update, last_update);
if (c->slots_last_update < last_update) {
c->slots_last_update = last_update;
return -1;
}
redisReply *reply = NULL;
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
atomicSet(config.is_fetching_slots, 1);
if (config.showerrors)
printf("Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
dict *masters = dictCreate(&dtype, NULL);
redisContext *ctx = NULL;
for (i = 0; i < (size_t) config.cluster_node_count; i++) {
clusterNode *node = config.cluster_nodes[i];
assert(node->ip != NULL);
assert(node->name != NULL);
assert(node->port);
/* Use first node as entry point to connect to. */
if (ctx == NULL) {
ctx = redisConnect(node->ip, node->port);
if (!ctx || ctx->err) {
success = 0;
if (ctx && ctx->err)
fprintf(stderr, "REDIS CONNECTION ERROR: %s\n", ctx->errstr);
goto cleanup;
}
}
if (node->updated_slots != NULL)
zfree(node->updated_slots);
node->updated_slots = NULL;
node->updated_slots_count = 0;
dictReplace(masters, node->name, node) ;
}
reply = redisCommand(ctx, "CLUSTER SLOTS");
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
success = 0;
if (reply)
fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str);
goto cleanup;
}
assert(reply->type == REDIS_REPLY_ARRAY);
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type = REDIS_REPLY_ARRAY);
assert(r->elements >= 3);
int from, to, slot;
from = r->element[0]->integer;
to = r->element[1]->integer;
redisReply *nr = r->element[2];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
assert(nr->element[2]->str != NULL);
sds name = sdsnew(nr->element[2]->str);
dictEntry *entry = dictFind(masters, name);
if (entry == NULL) {
success = 0;
fprintf(stderr, "%s: could not find node with ID %s in current "
"configuration.\n", errmsg, name);
if (name) sdsfree(name);
goto cleanup;
}
sdsfree(name);
clusterNode *node = dictGetVal(entry);
if (node->updated_slots == NULL)
node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
for (slot = from; slot <= to; slot++)
node->updated_slots[node->updated_slots_count++] = slot;
}
updateClusterSlotsConfiguration();
cleanup:
freeReplyObject(reply);
redisFree(ctx);
dictRelease(masters);
atomicSet(config.is_fetching_slots, 0);
return success;
}
/* Atomically update the new slots configuration. */
static void updateClusterSlotsConfiguration() {
pthread_mutex_lock(&config.is_updating_slots_mutex);
atomicSet(config.is_updating_slots, 1);
int i;
for (i = 0; i < config.cluster_node_count; i++) {
clusterNode *node = config.cluster_nodes[i];
if (node->updated_slots != NULL) {
int *oldslots = node->slots;
node->slots = node->updated_slots;
node->slots_count = node->updated_slots_count;
node->current_slot_index = 0;
node->updated_slots = NULL;
node->updated_slots_count = 0;
zfree(oldslots);
}
}
atomicSet(config.is_updating_slots, 0);
atomicIncr(config.slots_last_update, 1);
pthread_mutex_unlock(&config.is_updating_slots_mutex);
}
/* Returns number of consumed options. */
int parseOptions(int argc, const char **argv) {
int i;
@ -1301,6 +1476,9 @@ int main(int argc, const char **argv) {
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
config.redis_config = NULL;
config.is_fetching_slots = 0;
config.is_updating_slots = 0;
config.slots_last_update = 0;
i = parseOptions(argc,argv);
argc -= i;
@ -1354,6 +1532,10 @@ int main(int argc, const char **argv) {
pthread_mutex_init(&(config.requests_issued_mutex), NULL);
pthread_mutex_init(&(config.requests_finished_mutex), NULL);
pthread_mutex_init(&(config.liveclients_mutex), NULL);
pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
pthread_mutex_init(&(config.updating_slots_mutex), NULL);
pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
}
if (config.keepalive == 0) {