diff --git a/src/t_stream.c b/src/t_stream.c index c51dc94c..9835573d 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -149,6 +149,17 @@ void streamDecodeID(void *buf, streamID *id) { id->seq = ntohu64(e[1]); } +/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ +int streamCompareID(streamID *a, streamID *b) { + if (a->ms > b->ms) return 1; + else if (a->ms < b->ms) return -1; + /* The ms part is the same. Check the sequence part. */ + else if (a->seq > b->seq) return 1; + else if (a->seq < b->seq) return -1; + /* Everything is the same: IDs are equal. */ + return 0; +} + /* Adds a new item into the stream 's' having the specified number of * field-value pairs as specified in 'numfields' and stored into 'argv'. * Returns the new entry ID populating the 'added_id' structure. @@ -164,9 +175,7 @@ void streamDecodeID(void *buf, streamID *id) { int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) { /* If an ID was given, check that it's greater than the last entry ID * or return an error. */ - if (use_id && (use_id->ms < s->last_id.ms || - (use_id->ms == s->last_id.ms && - use_id->seq <= s->last_id.seq))) return C_ERR; + if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR; /* Add the new entry. */ raxIterator ri; @@ -679,8 +688,21 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end int64_t numfields; streamID id; + /* If a group was passed, as an optimization we check if the range + * specified is about messages that were never delivered. This is true if + * the 'start' range is greater than the current last_id in the stream. + * In that case, there is no need to check if the messages may already + * have another owner before delivering the message. This speeds up + * the processing significantly. */ + int newmessages = group != NULL && + streamCompareID(start,&group->last_id) > 0; + streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { + /* Update the group last_id if needed. */ + if (group && streamCompareID(&id,&group->last_id) > 0) + group->last_id = id; + /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq); @@ -1134,8 +1156,15 @@ void streamNotAckedFree(streamNotAcked *na) { zfree(na); } +/* Free a consumer and associated data structures. Note that this function + * will not reassign the pending messages associated with this consumer + * nor will delete them from the stream, so when this function is called + * to delete a consumer, and not when the whole stream is destroyed, the caller + * should do some work before. */ void streamConsumerFree(streamConsumer *sc) { - zfree(sc->name); + raxFree(sc->pel); /* No value free callback: the PEL entries are shared + between the consumer and the main stream PEL. */ + sdsfree(sc->name); zfree(sc); }