mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Cluster: collect more specific bus messages stats.
First step in order to change Cluster in order to use less messages. Related to issue #3929.
This commit is contained in:
parent
104584b95e
commit
c5d6f577f0
@ -423,8 +423,10 @@ void clusterInit(void) {
|
|||||||
server.cluster->failover_auth_epoch = 0;
|
server.cluster->failover_auth_epoch = 0;
|
||||||
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
|
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
|
||||||
server.cluster->lastVoteEpoch = 0;
|
server.cluster->lastVoteEpoch = 0;
|
||||||
server.cluster->stats_bus_messages_sent = 0;
|
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
|
||||||
server.cluster->stats_bus_messages_received = 0;
|
server.cluster->stats_bus_messages_sent[i] = 0;
|
||||||
|
server.cluster->stats_bus_messages_received[i] = 0;
|
||||||
|
}
|
||||||
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
|
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
|
||||||
clusterCloseAllSlots();
|
clusterCloseAllSlots();
|
||||||
|
|
||||||
@ -1583,7 +1585,8 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
uint32_t totlen = ntohl(hdr->totlen);
|
uint32_t totlen = ntohl(hdr->totlen);
|
||||||
uint16_t type = ntohs(hdr->type);
|
uint16_t type = ntohs(hdr->type);
|
||||||
|
|
||||||
server.cluster->stats_bus_messages_received++;
|
if (type < CLUSTERMSG_TYPE_COUNT)
|
||||||
|
server.cluster->stats_bus_messages_received[type]++;
|
||||||
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
|
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
|
||||||
type, (unsigned long) totlen);
|
type, (unsigned long) totlen);
|
||||||
|
|
||||||
@ -2130,7 +2133,12 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
|||||||
clusterWriteHandler,link);
|
clusterWriteHandler,link);
|
||||||
|
|
||||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||||
server.cluster->stats_bus_messages_sent++;
|
|
||||||
|
/* Populate sent messages stats. */
|
||||||
|
clusterMsg *hdr = (clusterMsg*) msg;
|
||||||
|
uint16_t type = ntohs(hdr->type);
|
||||||
|
if (type < CLUSTERMSG_TYPE_COUNT)
|
||||||
|
server.cluster->stats_bus_messages_sent[type]++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a message to all the nodes that are part of the cluster having
|
/* Send a message to all the nodes that are part of the cluster having
|
||||||
@ -3877,6 +3885,21 @@ sds clusterGenNodesDescription(int filter) {
|
|||||||
* CLUSTER command
|
* CLUSTER command
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
const char *clusterGetMessageTypeString(int type) {
|
||||||
|
switch(type) {
|
||||||
|
case CLUSTERMSG_TYPE_PING: return "ping";
|
||||||
|
case CLUSTERMSG_TYPE_PONG: return "pong";
|
||||||
|
case CLUSTERMSG_TYPE_MEET: return "meet";
|
||||||
|
case CLUSTERMSG_TYPE_FAIL: return "fail";
|
||||||
|
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
|
||||||
|
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
|
||||||
|
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
|
||||||
|
case CLUSTERMSG_TYPE_UPDATE: return "update";
|
||||||
|
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
|
||||||
|
}
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
int getSlotOrReply(client *c, robj *o) {
|
int getSlotOrReply(client *c, robj *o) {
|
||||||
long long slot;
|
long long slot;
|
||||||
|
|
||||||
@ -4208,8 +4231,6 @@ void clusterCommand(client *c) {
|
|||||||
"cluster_size:%d\r\n"
|
"cluster_size:%d\r\n"
|
||||||
"cluster_current_epoch:%llu\r\n"
|
"cluster_current_epoch:%llu\r\n"
|
||||||
"cluster_my_epoch:%llu\r\n"
|
"cluster_my_epoch:%llu\r\n"
|
||||||
"cluster_stats_messages_sent:%lld\r\n"
|
|
||||||
"cluster_stats_messages_received:%lld\r\n"
|
|
||||||
, statestr[server.cluster->state],
|
, statestr[server.cluster->state],
|
||||||
slots_assigned,
|
slots_assigned,
|
||||||
slots_ok,
|
slots_ok,
|
||||||
@ -4218,10 +4239,36 @@ void clusterCommand(client *c) {
|
|||||||
dictSize(server.cluster->nodes),
|
dictSize(server.cluster->nodes),
|
||||||
server.cluster->size,
|
server.cluster->size,
|
||||||
(unsigned long long) server.cluster->currentEpoch,
|
(unsigned long long) server.cluster->currentEpoch,
|
||||||
(unsigned long long) myepoch,
|
(unsigned long long) myepoch
|
||||||
server.cluster->stats_bus_messages_sent,
|
|
||||||
server.cluster->stats_bus_messages_received
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/* Show stats about messages sent and received. */
|
||||||
|
long long tot_msg_sent = 0;
|
||||||
|
long long tot_msg_received = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
|
||||||
|
if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
|
||||||
|
tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
|
||||||
|
info = sdscatprintf(info,
|
||||||
|
"cluster_stats_messages_%s_sent:%lld\r\n",
|
||||||
|
clusterGetMessageTypeString(i),
|
||||||
|
server.cluster->stats_bus_messages_sent[i]);
|
||||||
|
}
|
||||||
|
info = sdscatprintf(info,
|
||||||
|
"cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
|
||||||
|
|
||||||
|
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
|
||||||
|
if (server.cluster->stats_bus_messages_received[i] == 0) continue;
|
||||||
|
tot_msg_received += server.cluster->stats_bus_messages_received[i];
|
||||||
|
info = sdscatprintf(info,
|
||||||
|
"cluster_stats_messages_%s_received:%lld\r\n",
|
||||||
|
clusterGetMessageTypeString(i),
|
||||||
|
server.cluster->stats_bus_messages_received[i]);
|
||||||
|
}
|
||||||
|
info = sdscatprintf(info,
|
||||||
|
"cluster_stats_messages_received:%lld\r\n", tot_msg_received);
|
||||||
|
|
||||||
|
/* Produce the reply protocol. */
|
||||||
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
|
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
|
||||||
(unsigned long)sdslen(info)));
|
(unsigned long)sdslen(info)));
|
||||||
addReplySds(c,info);
|
addReplySds(c,info);
|
||||||
|
@ -73,6 +73,29 @@ typedef struct clusterLink {
|
|||||||
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
|
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
|
||||||
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (60*5) /* seconds. */
|
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (60*5) /* seconds. */
|
||||||
|
|
||||||
|
/* clusterState todo_before_sleep flags. */
|
||||||
|
#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0)
|
||||||
|
#define CLUSTER_TODO_UPDATE_STATE (1<<1)
|
||||||
|
#define CLUSTER_TODO_SAVE_CONFIG (1<<2)
|
||||||
|
#define CLUSTER_TODO_FSYNC_CONFIG (1<<3)
|
||||||
|
|
||||||
|
/* Message types.
|
||||||
|
*
|
||||||
|
* Note that the PING, PONG and MEET messages are actually the same exact
|
||||||
|
* kind of packet. PONG is the reply to ping, in the exact format as a PING,
|
||||||
|
* while MEET is a special PING that forces the receiver to add the sender
|
||||||
|
* as a node (if it is not already in the list). */
|
||||||
|
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
|
||||||
|
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
|
||||||
|
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
|
||||||
|
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
|
||||||
|
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
|
||||||
|
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
|
||||||
|
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
|
||||||
|
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
|
||||||
|
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
|
||||||
|
#define CLUSTERMSG_TYPE_COUNT 9 /* Total number of message types. */
|
||||||
|
|
||||||
/* This structure represent elements of node->fail_reports. */
|
/* This structure represent elements of node->fail_reports. */
|
||||||
typedef struct clusterNodeFailReport {
|
typedef struct clusterNodeFailReport {
|
||||||
struct clusterNode *node; /* Node reporting the failure condition. */
|
struct clusterNode *node; /* Node reporting the failure condition. */
|
||||||
@ -139,32 +162,13 @@ typedef struct clusterState {
|
|||||||
/* The followign fields are used by masters to take state on elections. */
|
/* The followign fields are used by masters to take state on elections. */
|
||||||
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
|
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
|
||||||
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
|
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
|
||||||
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
|
/* Messages received and sent by type. */
|
||||||
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
|
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
|
||||||
|
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
|
||||||
} clusterState;
|
} clusterState;
|
||||||
|
|
||||||
/* clusterState todo_before_sleep flags. */
|
|
||||||
#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0)
|
|
||||||
#define CLUSTER_TODO_UPDATE_STATE (1<<1)
|
|
||||||
#define CLUSTER_TODO_SAVE_CONFIG (1<<2)
|
|
||||||
#define CLUSTER_TODO_FSYNC_CONFIG (1<<3)
|
|
||||||
|
|
||||||
/* Redis cluster messages header */
|
/* Redis cluster messages header */
|
||||||
|
|
||||||
/* Note that the PING, PONG and MEET messages are actually the same exact
|
|
||||||
* kind of packet. PONG is the reply to ping, in the exact format as a PING,
|
|
||||||
* while MEET is a special PING that forces the receiver to add the sender
|
|
||||||
* as a node (if it is not already in the list). */
|
|
||||||
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
|
|
||||||
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
|
|
||||||
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
|
|
||||||
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
|
|
||||||
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
|
|
||||||
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
|
|
||||||
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
|
|
||||||
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
|
|
||||||
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
|
|
||||||
|
|
||||||
/* Initially we don't know our "name", but we'll find it once we connect
|
/* Initially we don't know our "name", but we'll find it once we connect
|
||||||
* to the first node, using the getsockname() function. Then we'll use this
|
* to the first node, using the getsockname() function. Then we'll use this
|
||||||
* address for all the next messages. */
|
* address for all the next messages. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user