XGROUP SETID implemented + consumer groups core fixes.

Now that we have SETID, the inetrnals of consumer groups should be able
to handle the case of the same message delivered multiple times just
as a side effect of calling XREADGROUP. Normally this should never
happen but if the admin manually "XGROUP SETID mykey mygroup 0",
messages will get re-delivered to clients waiting for the ">" special
ID. The consumer groups internals were not able to handle the case of a
message re-delivered in this circumstances that was already assigned to
another owner.
This commit is contained in:
antirez 2018-06-04 17:28:03 +02:00
parent 05a2996641
commit 36b392a0b2

View File

@ -41,6 +41,7 @@
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
@ -867,18 +868,41 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* If a group is passed, we need to create an entry in the /* If a group is passed, we need to create an entry in the
* PEL (pending entries list) of this group *and* this consumer. * PEL (pending entries list) of this group *and* this consumer.
* Note that we are sure about the fact the message is not already *
* associated with some other consumer, because if we reached this * Note that we cannot be sure about the fact the message is not
* loop the IDs the user is requesting are greater than any message * already owned by another consumer, because the admin is able
* delivered for this group. */ * to change the consumer group last delivered ID using the
* XGROUP SETID command. So if we find that there is already
* a NACK for the entry, we need to associate it to the new
* consumer. */
if (group && !(flags & STREAM_RWR_NOACK)) { if (group && !(flags & STREAM_RWR_NOACK)) {
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
/* Try to add a new NACK. Most of the time this will work and
* will not require extra lookups. We'll fix the problem later
* if we find that there is already a entry for this ID. */
streamNACK *nack = streamCreateNACK(consumer); streamNACK *nack = streamCreateNACK(consumer);
int retval = 0; int retval = 0;
retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL); retval += raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); retval += raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
serverAssert(retval == 2); /* Make sure entry was inserted. */
/* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer. */
if (retval == 0) {
streamFreeNACK(nack);
nack = raxFind(group->pel,buf,sizeof(buf));
serverAssert(nack != raxNotFound);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
nack->consumer = consumer;
nack->delivery_time = mstime();
nack->delivery_count++;
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
} else if (retval == 1) {
serverPanic("NACK half-created. Should not be possible.");
}
/* Propagate as XCLAIM. */ /* Propagate as XCLAIM. */
if (spi) { if (spi) {
@ -1568,6 +1592,14 @@ NULL
sdsnew("-BUSYGROUP Consumer Group name already exists\r\n")); sdsnew("-BUSYGROUP Consumer Group name already exists\r\n"));
} }
} else if (!strcasecmp(opt,"SETID") && c->argc == 5) { } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return;
}
cg->last_id = id;
addReply(c,shared.ok);
} else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
if (cg) { if (cg) {
raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
@ -1976,7 +2008,7 @@ void xclaimCommand(client *c) {
nack->delivery_time = deliverytime; nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given. */ /* Set the delivery attempts counter if given. */
if (retrycount >= 0) nack->delivery_count = retrycount; if (retrycount >= 0) nack->delivery_count = retrycount;
/* Add the entry in the new cosnumer local PEL. */ /* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Send the reply for this entry. */ /* Send the reply for this entry. */
if (justid) { if (justid) {