mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 00:50:50 +00:00
Network bandwidth tracking + refactoring.
Track bandwidth used by clients and replication (but diskless replication is not tracked since the actual transfer happens in the child process). This includes a refactoring that makes tracking new instantaneous metrics simpler.
This commit is contained in:
parent
eca9fbdb50
commit
1b732c09d0
@ -839,6 +839,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
*
|
*
|
||||||
* However if we are over the maxmemory limit we ignore that and
|
* However if we are over the maxmemory limit we ignore that and
|
||||||
* just deliver as much data as it is possible to deliver. */
|
* just deliver as much data as it is possible to deliver. */
|
||||||
|
server.stat_net_output_bytes += totwritten;
|
||||||
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
|
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
|
||||||
(server.maxmemory == 0 ||
|
(server.maxmemory == 0 ||
|
||||||
zmalloc_used_memory() < server.maxmemory)) break;
|
zmalloc_used_memory() < server.maxmemory)) break;
|
||||||
@ -1181,6 +1182,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
sdsIncrLen(c->querybuf,nread);
|
sdsIncrLen(c->querybuf,nread);
|
||||||
c->lastinteraction = server.unixtime;
|
c->lastinteraction = server.unixtime;
|
||||||
if (c->flags & REDIS_MASTER) c->reploff += nread;
|
if (c->flags & REDIS_MASTER) c->reploff += nread;
|
||||||
|
server.stat_net_input_bytes += nread;
|
||||||
} else {
|
} else {
|
||||||
server.current_client = NULL;
|
server.current_client = NULL;
|
||||||
return;
|
return;
|
||||||
|
58
src/redis.c
58
src/redis.c
@ -876,27 +876,30 @@ unsigned int getLRUClock(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Add a sample to the operations per second array of samples. */
|
/* Add a sample to the operations per second array of samples. */
|
||||||
void trackOperationsPerSecond(void) {
|
void trackInstantaneousMetric(int metric, long long current_reading) {
|
||||||
long long t = mstime() - server.ops_sec_last_sample_time;
|
long long t = mstime() - server.inst_metric[metric].last_sample_time;
|
||||||
long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
|
long long ops = current_reading -
|
||||||
|
server.inst_metric[metric].last_sample_count;
|
||||||
long long ops_sec;
|
long long ops_sec;
|
||||||
|
|
||||||
ops_sec = t > 0 ? (ops*1000/t) : 0;
|
ops_sec = t > 0 ? (ops*1000/t) : 0;
|
||||||
|
|
||||||
server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
|
server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
|
||||||
server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
|
ops_sec;
|
||||||
server.ops_sec_last_sample_time = mstime();
|
server.inst_metric[metric].idx++;
|
||||||
server.ops_sec_last_sample_ops = server.stat_numcommands;
|
server.inst_metric[metric].idx %= REDIS_METRIC_SAMPLES;
|
||||||
|
server.inst_metric[metric].last_sample_time = mstime();
|
||||||
|
server.inst_metric[metric].last_sample_count = current_reading;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return the mean of all the samples. */
|
/* Return the mean of all the samples. */
|
||||||
long long getOperationsPerSecond(void) {
|
long long getInstantaneousMetric(int metric) {
|
||||||
int j;
|
int j;
|
||||||
long long sum = 0;
|
long long sum = 0;
|
||||||
|
|
||||||
for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
|
for (j = 0; j < REDIS_METRIC_SAMPLES; j++)
|
||||||
sum += server.ops_sec_samples[j];
|
sum += server.inst_metric[metric].samples[j];
|
||||||
return sum / REDIS_OPS_SEC_SAMPLES;
|
return sum / REDIS_METRIC_SAMPLES;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check for timeouts. Returns non-zero if the client was terminated */
|
/* Check for timeouts. Returns non-zero if the client was terminated */
|
||||||
@ -1068,7 +1071,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
/* Update the time cache. */
|
/* Update the time cache. */
|
||||||
updateCachedTime();
|
updateCachedTime();
|
||||||
|
|
||||||
run_with_period(100) trackOperationsPerSecond();
|
run_with_period(100) {
|
||||||
|
trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands);
|
||||||
|
trackInstantaneousMetric(REDIS_METRIC_NET_INPUT,
|
||||||
|
server.stat_net_input_bytes);
|
||||||
|
trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT,
|
||||||
|
server.stat_net_output_bytes);
|
||||||
|
}
|
||||||
|
|
||||||
/* We have just REDIS_LRU_BITS bits per object for LRU information.
|
/* We have just REDIS_LRU_BITS bits per object for LRU information.
|
||||||
* So we use an (eventually wrapping) LRU clock.
|
* So we use an (eventually wrapping) LRU clock.
|
||||||
@ -1682,6 +1691,8 @@ int listenToPort(int port, int *fds, int *count) {
|
|||||||
* to reset via CONFIG RESETSTAT. The function is also used in order to
|
* to reset via CONFIG RESETSTAT. The function is also used in order to
|
||||||
* initialize these fields in initServer() at server startup. */
|
* initialize these fields in initServer() at server startup. */
|
||||||
void resetServerStats(void) {
|
void resetServerStats(void) {
|
||||||
|
int j;
|
||||||
|
|
||||||
server.stat_numcommands = 0;
|
server.stat_numcommands = 0;
|
||||||
server.stat_numconnections = 0;
|
server.stat_numconnections = 0;
|
||||||
server.stat_expiredkeys = 0;
|
server.stat_expiredkeys = 0;
|
||||||
@ -1694,10 +1705,15 @@ void resetServerStats(void) {
|
|||||||
server.stat_sync_full = 0;
|
server.stat_sync_full = 0;
|
||||||
server.stat_sync_partial_ok = 0;
|
server.stat_sync_partial_ok = 0;
|
||||||
server.stat_sync_partial_err = 0;
|
server.stat_sync_partial_err = 0;
|
||||||
memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
|
for (j = 0; j < REDIS_METRIC_COUNT; j++) {
|
||||||
server.ops_sec_idx = 0;
|
server.inst_metric[j].idx = 0;
|
||||||
server.ops_sec_last_sample_time = mstime();
|
server.inst_metric[j].last_sample_time = mstime();
|
||||||
server.ops_sec_last_sample_ops = 0;
|
server.inst_metric[j].last_sample_count = 0;
|
||||||
|
memset(server.inst_metric[j].samples,0,
|
||||||
|
sizeof(server.inst_metric[j].samples));
|
||||||
|
}
|
||||||
|
server.stat_net_input_bytes = 0;
|
||||||
|
server.stat_net_output_bytes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initServer(void) {
|
void initServer(void) {
|
||||||
@ -2790,6 +2806,10 @@ sds genRedisInfoString(char *section) {
|
|||||||
"total_connections_received:%lld\r\n"
|
"total_connections_received:%lld\r\n"
|
||||||
"total_commands_processed:%lld\r\n"
|
"total_commands_processed:%lld\r\n"
|
||||||
"instantaneous_ops_per_sec:%lld\r\n"
|
"instantaneous_ops_per_sec:%lld\r\n"
|
||||||
|
"total_net_input_bytes:%lld\r\n"
|
||||||
|
"total_net_output_bytes:%lld\r\n"
|
||||||
|
"instantaneous_input_kbps:%.2f\r\n"
|
||||||
|
"instantaneous_output_kbps:%.2f\r\n"
|
||||||
"rejected_connections:%lld\r\n"
|
"rejected_connections:%lld\r\n"
|
||||||
"sync_full:%lld\r\n"
|
"sync_full:%lld\r\n"
|
||||||
"sync_partial_ok:%lld\r\n"
|
"sync_partial_ok:%lld\r\n"
|
||||||
@ -2804,7 +2824,11 @@ sds genRedisInfoString(char *section) {
|
|||||||
"migrate_cached_sockets:%ld\r\n",
|
"migrate_cached_sockets:%ld\r\n",
|
||||||
server.stat_numconnections,
|
server.stat_numconnections,
|
||||||
server.stat_numcommands,
|
server.stat_numcommands,
|
||||||
getOperationsPerSecond(),
|
getInstantaneousMetric(REDIS_METRIC_COMMAND),
|
||||||
|
server.stat_net_input_bytes,
|
||||||
|
server.stat_net_output_bytes,
|
||||||
|
(float)getInstantaneousMetric(REDIS_METRIC_NET_INPUT)/1024,
|
||||||
|
(float)getInstantaneousMetric(REDIS_METRIC_NET_OUTPUT)/1024,
|
||||||
server.stat_rejected_conn,
|
server.stat_rejected_conn,
|
||||||
server.stat_sync_full,
|
server.stat_sync_full,
|
||||||
server.stat_sync_partial_ok,
|
server.stat_sync_partial_ok,
|
||||||
|
24
src/redis.h
24
src/redis.h
@ -97,7 +97,6 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define REDIS_REPL_PING_SLAVE_PERIOD 10
|
#define REDIS_REPL_PING_SLAVE_PERIOD 10
|
||||||
#define REDIS_RUN_ID_SIZE 40
|
#define REDIS_RUN_ID_SIZE 40
|
||||||
#define REDIS_EOF_MARK_SIZE 40
|
#define REDIS_EOF_MARK_SIZE 40
|
||||||
#define REDIS_OPS_SEC_SAMPLES 16
|
|
||||||
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
|
#define REDIS_DEFAULT_REPL_BACKLOG_SIZE (1024*1024) /* 1mb */
|
||||||
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
|
#define REDIS_DEFAULT_REPL_BACKLOG_TIME_LIMIT (60*60) /* 1 hour */
|
||||||
#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */
|
#define REDIS_REPL_BACKLOG_MIN_SIZE (1024*16) /* 16k */
|
||||||
@ -140,6 +139,13 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
|
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
|
||||||
#define ACTIVE_EXPIRE_CYCLE_FAST 1
|
#define ACTIVE_EXPIRE_CYCLE_FAST 1
|
||||||
|
|
||||||
|
/* Instantaneous metrics tracking. */
|
||||||
|
#define REDIS_METRIC_SAMPLES 16 /* Number of samples per metric. */
|
||||||
|
#define REDIS_METRIC_COMMAND 0 /* Number of commands executed. */
|
||||||
|
#define REDIS_METRIC_NET_INPUT 1 /* Bytes read to network .*/
|
||||||
|
#define REDIS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */
|
||||||
|
#define REDIS_METRIC_COUNT 3
|
||||||
|
|
||||||
/* Protocol and I/O related defines */
|
/* Protocol and I/O related defines */
|
||||||
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
|
||||||
#define REDIS_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
|
#define REDIS_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
|
||||||
@ -710,12 +716,16 @@ struct redisServer {
|
|||||||
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
|
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
|
||||||
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
|
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
|
||||||
size_t resident_set_size; /* RSS sampled in serverCron(). */
|
size_t resident_set_size; /* RSS sampled in serverCron(). */
|
||||||
/* The following two are used to track instantaneous "load" in terms
|
long long stat_net_input_bytes; /* Bytes read from network. */
|
||||||
* of operations per second. */
|
long long stat_net_output_bytes; /* Bytes written to network. */
|
||||||
long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */
|
/* The following two are used to track instantaneous metrics, like
|
||||||
long long ops_sec_last_sample_ops; /* numcommands in last sample */
|
* number of operations per second, network traffic. */
|
||||||
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
|
struct {
|
||||||
int ops_sec_idx;
|
long long last_sample_time; /* Timestamp of last sample in ms */
|
||||||
|
long long last_sample_count;/* Count in last sample */
|
||||||
|
long long samples[REDIS_METRIC_SAMPLES];
|
||||||
|
int idx;
|
||||||
|
} inst_metric[REDIS_METRIC_COUNT];
|
||||||
/* Configuration */
|
/* Configuration */
|
||||||
int verbosity; /* Loglevel in redis.conf */
|
int verbosity; /* Loglevel in redis.conf */
|
||||||
int maxidletime; /* Client timeout in seconds */
|
int maxidletime; /* Client timeout in seconds */
|
||||||
|
@ -690,6 +690,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
freeClient(slave);
|
freeClient(slave);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
server.stat_net_output_bytes += nwritten;
|
||||||
sdsrange(slave->replpreamble,nwritten,-1);
|
sdsrange(slave->replpreamble,nwritten,-1);
|
||||||
if (sdslen(slave->replpreamble) == 0) {
|
if (sdslen(slave->replpreamble) == 0) {
|
||||||
sdsfree(slave->replpreamble);
|
sdsfree(slave->replpreamble);
|
||||||
@ -718,6 +719,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
slave->repldboff += nwritten;
|
slave->repldboff += nwritten;
|
||||||
|
server.stat_net_output_bytes += nwritten;
|
||||||
if (slave->repldboff == slave->repldbsize) {
|
if (slave->repldboff == slave->repldbsize) {
|
||||||
close(slave->repldbfd);
|
close(slave->repldbfd);
|
||||||
slave->repldbfd = -1;
|
slave->repldbfd = -1;
|
||||||
@ -938,6 +940,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
replicationAbortSyncTransfer();
|
replicationAbortSyncTransfer();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
server.stat_net_input_bytes += nread;
|
||||||
|
|
||||||
/* When a mark is used, we want to detect EOF asap in order to avoid
|
/* When a mark is used, we want to detect EOF asap in order to avoid
|
||||||
* writing the EOF mark into the file... */
|
* writing the EOF mark into the file... */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user