Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2019-02-21 17:03:19 +01:00
commit 624568ae3f

View File

@ -210,6 +210,8 @@ static struct config {
char *pattern; char *pattern;
char *rdb_filename; char *rdb_filename;
int bigkeys; int bigkeys;
int memkeys;
unsigned memkeys_samples;
int hotkeys; int hotkeys;
int stdinarg; /* get last arg from stdin. (-x option) */ int stdinarg; /* get last arg from stdin. (-x option) */
char *auth; char *auth;
@ -1336,6 +1338,12 @@ static int parseOptions(int argc, char **argv) {
config.pipe_timeout = atoi(argv[++i]); config.pipe_timeout = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--bigkeys")) { } else if (!strcmp(argv[i],"--bigkeys")) {
config.bigkeys = 1; config.bigkeys = 1;
} else if (!strcmp(argv[i],"--memkeys")) {
config.memkeys = 1;
config.memkeys_samples = 0; /* use redis default */
} else if (!strcmp(argv[i],"--memkeys-samples")) {
config.memkeys = 1;
config.memkeys_samples = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--hotkeys")) { } else if (!strcmp(argv[i],"--hotkeys")) {
config.hotkeys = 1; config.hotkeys = 1;
} else if (!strcmp(argv[i],"--eval") && !lastarg) { } else if (!strcmp(argv[i],"--eval") && !lastarg) {
@ -1534,7 +1542,10 @@ static void usage(void) {
" --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n" " --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n"
" no reply is received within <n> seconds.\n" " no reply is received within <n> seconds.\n"
" Default timeout: %d. Use 0 to wait forever.\n" " Default timeout: %d. Use 0 to wait forever.\n"
" --bigkeys Sample Redis keys looking for big keys.\n" " --bigkeys Sample Redis keys looking for keys with many elements (complexity).\n"
" --memkeys Sample Redis keys looking for keys consuming a lot of memory.\n"
" --memkeys-samples <n> Sample Redis keys looking for keys consuming a lot of memory.\n"
" And define number of key elements to sample\n"
" --hotkeys Sample Redis keys looking for hot keys.\n" " --hotkeys Sample Redis keys looking for hot keys.\n"
" only works when maxmemory-policy is *lfu.\n" " only works when maxmemory-policy is *lfu.\n"
" --scan List all keys using the SCAN command.\n" " --scan List all keys using the SCAN command.\n"
@ -6141,9 +6152,31 @@ static void latencyDistMode(void) {
* Slave mode * Slave mode
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */
#define RDB_EOF_MARK_SIZE 40
void sendReplconf(const char* arg1, const char* arg2) {
printf("sending REPLCONF %s %s\n", arg1, arg2);
redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
/* Handle any error conditions */
if(reply == NULL) {
fprintf(stderr, "\nI/O error\n");
exit(1);
} else if(reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
/* non fatal, old versions may not support it */
}
freeReplyObject(reply);
}
void sendCapa() {
sendReplconf("capa", "eof");
}
/* Sends SYNC and reads the number of bytes in the payload. Used both by /* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB(). */ * slaveMode() and getRDB().
unsigned long long sendSync(int fd) { * returns 0 in case an EOF marker is used. */
unsigned long long sendSync(int fd, char *out_eof) {
/* To start we need to send the SYNC command and return the payload. /* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol * The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed * and we don't want to mess with its buffers, so everything is performed
@ -6173,17 +6206,33 @@ unsigned long long sendSync(int fd) {
printf("SYNC with master failed: %s\n", buf); printf("SYNC with master failed: %s\n", buf);
exit(1); exit(1);
} }
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
return 0;
}
return strtoull(buf+1,NULL,10); return strtoull(buf+1,NULL,10);
} }
static void slaveMode(void) { static void slaveMode(void) {
int fd = context->fd; int fd = context->fd;
unsigned long long payload = sendSync(fd); static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(fd, eofmark);
char buf[1024]; char buf[1024];
int original_output = config.output; int original_output = config.output;
fprintf(stderr,"SYNC with master, discarding %llu " if (payload == 0) {
"bytes of bulk transfer...\n", payload); payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC with master, discarding "
"bytes of bulk transfer until EOF marker...\n");
} else {
fprintf(stderr,"SYNC with master, discarding %llu "
"bytes of bulk transfer...\n", payload);
}
/* Discard the payload. */ /* Discard the payload. */
while(payload) { while(payload) {
@ -6195,8 +6244,29 @@ static void slaveMode(void) {
exit(1); exit(1);
} }
payload -= nread; payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
} }
fprintf(stderr,"SYNC done. Logging commands from master.\n");
if (usemark) {
unsigned long long offset = ULLONG_MAX - payload;
fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
/* put the slave online */
sleep(1);
sendReplconf("ACK", "0");
} else
fprintf(stderr,"SYNC done. Logging commands from master.\n");
/* Now we can use hiredis to read the incoming protocol. */ /* Now we can use hiredis to read the incoming protocol. */
config.output = OUTPUT_CSV; config.output = OUTPUT_CSV;
@ -6213,11 +6283,22 @@ static void slaveMode(void) {
static void getRDB(void) { static void getRDB(void) {
int s = context->fd; int s = context->fd;
int fd; int fd;
unsigned long long payload = sendSync(s); static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(s, eofmark);
char buf[4096]; char buf[4096];
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n", if (payload == 0) {
payload, config.rdb_filename); payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
config.rdb_filename);
} else {
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
payload, config.rdb_filename);
}
/* Write to file. */ /* Write to file. */
if (!strcmp(config.rdb_filename,"-")) { if (!strcmp(config.rdb_filename,"-")) {
@ -6246,11 +6327,31 @@ static void getRDB(void) {
exit(1); exit(1);
} }
payload -= nread; payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
}
if (usemark) {
payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
if (ftruncate(fd, payload) == -1)
fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
} else {
fprintf(stderr,"Transfer finished with success.\n");
} }
close(s); /* Close the file descriptor ASAP as fsync() may take time. */ close(s); /* Close the file descriptor ASAP as fsync() may take time. */
fsync(fd); fsync(fd);
close(fd); close(fd);
fprintf(stderr,"Transfer finished with success.\n");
exit(0); exit(0);
} }
@ -6418,15 +6519,6 @@ static void pipeMode(void) {
* Find big keys * Find big keys
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */
#define TYPE_STRING 0
#define TYPE_LIST 1
#define TYPE_SET 2
#define TYPE_HASH 3
#define TYPE_ZSET 4
#define TYPE_STREAM 5
#define TYPE_NONE 6
#define TYPE_COUNT 7
static redisReply *sendScan(unsigned long long *it) { static redisReply *sendScan(unsigned long long *it) {
redisReply *reply = redisCommand(context, "SCAN %llu", *it); redisReply *reply = redisCommand(context, "SCAN %llu", *it);
@ -6473,28 +6565,51 @@ static int getDbSize(void) {
return size; return size;
} }
static int toIntType(char *key, char *type) { typedef struct {
if(!strcmp(type, "string")) { char *name;
return TYPE_STRING; char *sizecmd;
} else if(!strcmp(type, "list")) { char *sizeunit;
return TYPE_LIST; unsigned long long biggest;
} else if(!strcmp(type, "set")) { unsigned long long count;
return TYPE_SET; unsigned long long totalsize;
} else if(!strcmp(type, "hash")) { sds biggest_key;
return TYPE_HASH; } typeinfo;
} else if(!strcmp(type, "zset")) {
return TYPE_ZSET; typeinfo type_string = { "string", "STRLEN", "bytes" };
} else if(!strcmp(type, "stream")) { typeinfo type_list = { "list", "LLEN", "items" };
return TYPE_STREAM; typeinfo type_set = { "set", "SCARD", "members" };
} else if(!strcmp(type, "none")) { typeinfo type_hash = { "hash", "HLEN", "fields" };
return TYPE_NONE; typeinfo type_zset = { "zset", "ZCARD", "members" };
} else { typeinfo type_stream = { "stream", "XLEN", "entries" };
fprintf(stderr, "Unknown type '%s' for key '%s'\n", type, key); typeinfo type_other = { "other", NULL, "?" };
exit(1);
} static typeinfo* typeinfo_add(dict *types, char* name, typeinfo* type_template) {
typeinfo *info = zmalloc(sizeof(typeinfo));
*info = *type_template;
info->name = sdsnew(name);
dictAdd(types, info->name, info);
return info;
} }
static void getKeyTypes(redisReply *keys, int *types) { void type_free(void* priv_data, void* val) {
typeinfo *info = val;
UNUSED(priv_data);
if (info->biggest_key)
sdsfree(info->biggest_key);
sdsfree(info->name);
zfree(info);
}
static dictType typeinfoDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor (owned by the value)*/
type_free /* val destructor */
};
static void getKeyTypes(dict *types_dict, redisReply *keys, typeinfo **types) {
redisReply *reply; redisReply *reply;
unsigned int i; unsigned int i;
@ -6520,32 +6635,47 @@ static void getKeyTypes(redisReply *keys, int *types) {
exit(1); exit(1);
} }
types[i] = toIntType(keys->element[i]->str, reply->str); sds typereply = sdsnew(reply->str);
dictEntry *de = dictFind(types_dict, typereply);
sdsfree(typereply);
typeinfo *type = NULL;
if (de)
type = dictGetVal(de);
else if (strcmp(reply->str, "none")) /* create new types for modules, (but not for deleted keys) */
type = typeinfo_add(types_dict, reply->str, &type_other);
types[i] = type;
freeReplyObject(reply); freeReplyObject(reply);
} }
} }
static void getKeySizes(redisReply *keys, int *types, static void getKeySizes(redisReply *keys, typeinfo **types,
unsigned long long *sizes) unsigned long long *sizes, int memkeys,
unsigned memkeys_samples)
{ {
redisReply *reply; redisReply *reply;
char *sizecmds[] = {"STRLEN","LLEN","SCARD","HLEN","ZCARD"};
unsigned int i; unsigned int i;
/* Pipeline size commands */ /* Pipeline size commands */
for(i=0;i<keys->elements;i++) { for(i=0;i<keys->elements;i++) {
/* Skip keys that were deleted */ /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */
if(types[i]==TYPE_NONE) if(!types[i] || (!types[i]->sizecmd && !memkeys))
continue; continue;
redisAppendCommand(context, "%s %s", sizecmds[types[i]], if (!memkeys)
keys->element[i]->str); redisAppendCommand(context, "%s %s",
types[i]->sizecmd, keys->element[i]->str);
else if (memkeys_samples==0)
redisAppendCommand(context, "%s %s %s",
"MEMORY", "USAGE", keys->element[i]->str);
else
redisAppendCommand(context, "%s %s %s SAMPLES %u",
"MEMORY", "USAGE", keys->element[i]->str, memkeys_samples);
} }
/* Retrieve sizes */ /* Retrieve sizes */
for(i=0;i<keys->elements;i++) { for(i=0;i<keys->elements;i++) {
/* Skip keys that disappeared between SCAN and TYPE */ /* Skip keys that disappeared between SCAN and TYPE (or unknown types when not in memkeys mode) */
if(types[i] == TYPE_NONE) { if(!types[i] || (!types[i]->sizecmd && !memkeys)) {
sizes[i] = 0; sizes[i] = 0;
continue; continue;
} }
@ -6560,7 +6690,8 @@ static void getKeySizes(redisReply *keys, int *types,
* added as a different type between TYPE and SIZE */ * added as a different type between TYPE and SIZE */
fprintf(stderr, fprintf(stderr,
"Warning: %s on '%s' failed (may have changed type)\n", "Warning: %s on '%s' failed (may have changed type)\n",
sizecmds[types[i]], keys->element[i]->str); !memkeys? types[i]->sizecmd: "MEMORY USAGE",
keys->element[i]->str);
sizes[i] = 0; sizes[i] = 0;
} else { } else {
sizes[i] = reply->integer; sizes[i] = reply->integer;
@ -6570,17 +6701,23 @@ static void getKeySizes(redisReply *keys, int *types,
} }
} }
static void findBigKeys(void) { static void findBigKeys(int memkeys, unsigned memkeys_samples) {
unsigned long long biggest[TYPE_COUNT] = {0}, counts[TYPE_COUNT] = {0}, totalsize[TYPE_COUNT] = {0};
unsigned long long sampled = 0, total_keys, totlen=0, *sizes=NULL, it=0; unsigned long long sampled = 0, total_keys, totlen=0, *sizes=NULL, it=0;
sds maxkeys[TYPE_COUNT] = {0};
char *typename[] = {"string","list","set","hash","zset","stream","none"};
char *typeunit[] = {"bytes","items","members","fields","members","entries",""};
redisReply *reply, *keys; redisReply *reply, *keys;
unsigned int arrsize=0, i; unsigned int arrsize=0, i;
int type, *types=NULL; dictIterator *di;
dictEntry *de;
typeinfo **types = NULL;
double pct; double pct;
dict *types_dict = dictCreate(&typeinfoDictType, NULL);
typeinfo_add(types_dict, "string", &type_string);
typeinfo_add(types_dict, "list", &type_list);
typeinfo_add(types_dict, "set", &type_set);
typeinfo_add(types_dict, "hash", &type_hash);
typeinfo_add(types_dict, "zset", &type_zset);
typeinfo_add(types_dict, "stream", &type_stream);
/* Total keys pre scanning */ /* Total keys pre scanning */
total_keys = getDbSize(); total_keys = getDbSize();
@ -6589,15 +6726,6 @@ static void findBigKeys(void) {
printf("# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec\n"); printf("# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec\n");
printf("# per 100 SCAN commands (not usually needed).\n\n"); printf("# per 100 SCAN commands (not usually needed).\n\n");
/* New up sds strings to keep track of overall biggest per type */
for(i=0;i<TYPE_NONE; i++) {
maxkeys[i] = sdsempty();
if(!maxkeys[i]) {
fprintf(stderr, "Failed to allocate memory for largest key names!\n");
exit(1);
}
}
/* SCAN loop */ /* SCAN loop */
do { do {
/* Calculate approximate percentage completion */ /* Calculate approximate percentage completion */
@ -6609,7 +6737,7 @@ static void findBigKeys(void) {
/* Reallocate our type and size array if we need to */ /* Reallocate our type and size array if we need to */
if(keys->elements > arrsize) { if(keys->elements > arrsize) {
types = zrealloc(types, sizeof(int)*keys->elements); types = zrealloc(types, sizeof(typeinfo*)*keys->elements);
sizes = zrealloc(sizes, sizeof(unsigned long long)*keys->elements); sizes = zrealloc(sizes, sizeof(unsigned long long)*keys->elements);
if(!types || !sizes) { if(!types || !sizes) {
@ -6621,34 +6749,38 @@ static void findBigKeys(void) {
} }
/* Retrieve types and then sizes */ /* Retrieve types and then sizes */
getKeyTypes(keys, types); getKeyTypes(types_dict, keys, types);
getKeySizes(keys, types, sizes); getKeySizes(keys, types, sizes, memkeys, memkeys_samples);
/* Now update our stats */ /* Now update our stats */
for(i=0;i<keys->elements;i++) { for(i=0;i<keys->elements;i++) {
if((type = types[i]) == TYPE_NONE) typeinfo *type = types[i];
/* Skip keys that disappeared between SCAN and TYPE */
if(!type)
continue; continue;
totalsize[type] += sizes[i]; type->totalsize += sizes[i];
counts[type]++; type->count++;
totlen += keys->element[i]->len; totlen += keys->element[i]->len;
sampled++; sampled++;
if(biggest[type]<sizes[i]) { if(type->biggest<sizes[i]) {
printf( printf(
"[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n", "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n",
pct, typename[type], keys->element[i]->str, sizes[i], pct, type->name, keys->element[i]->str, sizes[i],
typeunit[type]); !memkeys? type->sizeunit: "bytes");
/* Keep track of biggest key name for this type */ /* Keep track of biggest key name for this type */
maxkeys[type] = sdscpy(maxkeys[type], keys->element[i]->str); if (type->biggest_key)
if(!maxkeys[type]) { sdsfree(type->biggest_key);
type->biggest_key = sdsnew(keys->element[i]->str);
if(!type->biggest_key) {
fprintf(stderr, "Failed to allocate memory for key!\n"); fprintf(stderr, "Failed to allocate memory for key!\n");
exit(1); exit(1);
} }
/* Keep track of the biggest size for this type */ /* Keep track of the biggest size for this type */
biggest[type] = sizes[i]; type->biggest = sizes[i];
} }
/* Update overall progress */ /* Update overall progress */
@ -6676,26 +6808,29 @@ static void findBigKeys(void) {
totlen, totlen ? (double)totlen/sampled : 0); totlen, totlen ? (double)totlen/sampled : 0);
/* Output the biggest keys we found, for types we did find */ /* Output the biggest keys we found, for types we did find */
for(i=0;i<TYPE_NONE;i++) { di = dictGetIterator(types_dict);
if(sdslen(maxkeys[i])>0) { while ((de = dictNext(di))) {
printf("Biggest %6s found '%s' has %llu %s\n", typename[i], maxkeys[i], typeinfo *type = dictGetVal(de);
biggest[i], typeunit[i]); if(type->biggest_key) {
printf("Biggest %6s found '%s' has %llu %s\n", type->name, type->biggest_key,
type->biggest, !memkeys? type->sizeunit: "bytes");
} }
} }
dictReleaseIterator(di);
printf("\n"); printf("\n");
for(i=0;i<TYPE_NONE;i++) { di = dictGetIterator(types_dict);
while ((de = dictNext(di))) {
typeinfo *type = dictGetVal(de);
printf("%llu %ss with %llu %s (%05.2f%% of keys, avg size %.2f)\n", printf("%llu %ss with %llu %s (%05.2f%% of keys, avg size %.2f)\n",
counts[i], typename[i], totalsize[i], typeunit[i], type->count, type->name, type->totalsize, !memkeys? type->sizeunit: "bytes",
sampled ? 100 * (double)counts[i]/sampled : 0, sampled ? 100 * (double)type->count/sampled : 0,
counts[i] ? (double)totalsize[i]/counts[i] : 0); type->count ? (double)type->totalsize/type->count : 0);
} }
dictReleaseIterator(di);
/* Free sds strings containing max keys */ dictRelease(types_dict);
for(i=0;i<TYPE_NONE;i++) {
sdsfree(maxkeys[i]);
}
/* Success! */ /* Success! */
exit(0); exit(0);
@ -7269,12 +7404,14 @@ int main(int argc, char **argv) {
/* Slave mode */ /* Slave mode */
if (config.slave_mode) { if (config.slave_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1); if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
slaveMode(); slaveMode();
} }
/* Get RDB mode. */ /* Get RDB mode. */
if (config.getrdb_mode) { if (config.getrdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1); if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
getRDB(); getRDB();
} }
@ -7287,7 +7424,13 @@ int main(int argc, char **argv) {
/* Find big keys */ /* Find big keys */
if (config.bigkeys) { if (config.bigkeys) {
if (cliConnect(0) == REDIS_ERR) exit(1); if (cliConnect(0) == REDIS_ERR) exit(1);
findBigKeys(); findBigKeys(0, 0);
}
/* Find large keys */
if (config.memkeys) {
if (cliConnect(0) == REDIS_ERR) exit(1);
findBigKeys(1, config.memkeys_samples);
} }
/* Find hot keys */ /* Find hot keys */