diff --git a/src/t_stream.c b/src/t_stream.c index 1f904eff..04125bfa 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -41,6 +41,7 @@ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); +void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); /* ----------------------------------------------------------------------- @@ -867,18 +868,41 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* If a group is passed, we need to create an entry in the * PEL (pending entries list) of this group *and* this consumer. - * Note that we are sure about the fact the message is not already - * associated with some other consumer, because if we reached this - * loop the IDs the user is requesting are greater than any message - * delivered for this group. */ + * + * Note that we cannot be sure about the fact the message is not + * already owned by another consumer, because the admin is able + * to change the consumer group last delivered ID using the + * XGROUP SETID command. So if we find that there is already + * a NACK for the entry, we need to associate it to the new + * consumer. */ if (group && !(flags & STREAM_RWR_NOACK)) { unsigned char buf[sizeof(streamID)]; streamEncodeID(buf,&id); + + /* Try to add a new NACK. Most of the time this will work and + * will not require extra lookups. We'll fix the problem later + * if we find that there is already a entry for this ID. */ streamNACK *nack = streamCreateNACK(consumer); int retval = 0; - retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL); - retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - serverAssert(retval == 2); /* Make sure entry was inserted. */ + retval += raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); + retval += raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + + /* Now we can check if the entry was already busy, and + * in that case reassign the entry to the new consumer. */ + if (retval == 0) { + streamFreeNACK(nack); + nack = raxFind(group->pel,buf,sizeof(buf)); + serverAssert(nack != raxNotFound); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + /* Update the consumer and idle time. */ + nack->consumer = consumer; + nack->delivery_time = mstime(); + nack->delivery_count++; + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + } else if (retval == 1) { + serverPanic("NACK half-created. Should not be possible."); + } /* Propagate as XCLAIM. */ if (spi) { @@ -1568,6 +1592,14 @@ NULL sdsnew("-BUSYGROUP Consumer Group name already exists\r\n")); } } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { + streamID id; + if (!strcmp(c->argv[4]->ptr,"$")) { + id = s->last_id; + } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { + return; + } + cg->last_id = id; + addReply(c,shared.ok); } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { if (cg) { raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); @@ -1976,7 +2008,7 @@ void xclaimCommand(client *c) { nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given. */ if (retrycount >= 0) nack->delivery_count = retrycount; - /* Add the entry in the new cosnumer local PEL. */ + /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */ if (justid) {