Cluster: clusterReadHandler() fixed to work with new message header.

This commit is contained in:
antirez 2014-02-10 16:27:33 +01:00
parent 344a065d51
commit f885fa8bac
2 changed files with 13 additions and 11 deletions

View File

@ -1167,8 +1167,6 @@ int clusterProcessPacket(clusterLink *link) {
/* Perform sanity checks */ /* Perform sanity checks */
if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
if (hdr->sig[0] != 'R' || hdr->sig[1] != 'C' ||
hdr->sig[2] != 'm' || hdr->sig[3] != 'b') return 1; /* Bad signature. */
if (ntohs(hdr->ver) != 0) return 1; /* Can't handle versions other than 0. */ if (ntohs(hdr->ver) != 0) return 1; /* Can't handle versions other than 0. */
if (totlen > sdslen(link->rcvbuf)) return 1; if (totlen > sdslen(link->rcvbuf)) return 1;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
@ -1582,18 +1580,22 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
while(1) { /* Read as long as there is data to read. */ while(1) { /* Read as long as there is data to read. */
rcvbuflen = sdslen(link->rcvbuf); rcvbuflen = sdslen(link->rcvbuf);
if (rcvbuflen < 4) { if (rcvbuflen < 8) {
/* First, obtain the first four bytes to get the full message /* First, obtain the first 8 bytes to get the full message
* length. */ * length. */
readlen = 4 - rcvbuflen; readlen = 8 - rcvbuflen;
} else { } else {
/* Finally read the full message. */ /* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf; hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 4) { if (rcvbuflen == 8) {
/* Perform some sanity check on the message length. */ /* Perform some sanity check on the message signature
if (ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) { * and length. */
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
redisLog(REDIS_WARNING, redisLog(REDIS_WARNING,
"Bad message length received from Cluster bus."); "Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link); handleLinkIOError(link);
return; return;
} }
@ -1619,7 +1621,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
/* Total length obtained? Process this packet. */ /* Total length obtained? Process this packet. */
if (rcvbuflen >= 4 && rcvbuflen == ntohl(hdr->totlen)) { if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) { if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf); sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty(); link->rcvbuf = sdsempty();

View File

@ -196,9 +196,9 @@ union clusterMsgData {
typedef struct { typedef struct {
char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */ char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 0. */ uint16_t ver; /* Protocol version, currently set to 0. */
uint16_t notused0; /* 2 bytes not used. */ uint16_t notused0; /* 2 bytes not used. */
uint32_t totlen; /* Total length of this message */
uint16_t type; /* Message type */ uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */ uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */