mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Streams: propagate lastid in XCLAIM when it has effect
This commit is contained in:
parent
183ef7ae9b
commit
08ae522ff9
@ -2075,6 +2075,8 @@ void xclaimCommand(client *c) {
|
|||||||
/* If we stopped because some IDs cannot be parsed, perhaps they
|
/* If we stopped because some IDs cannot be parsed, perhaps they
|
||||||
* are trailing options. */
|
* are trailing options. */
|
||||||
time_t now = mstime();
|
time_t now = mstime();
|
||||||
|
streamID last_id = {0,0};
|
||||||
|
int lastid_updated = 0;
|
||||||
for (; j < c->argc; j++) {
|
for (; j < c->argc; j++) {
|
||||||
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
|
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
|
||||||
char *opt = c->argv[j]->ptr;
|
char *opt = c->argv[j]->ptr;
|
||||||
@ -2100,18 +2102,18 @@ void xclaimCommand(client *c) {
|
|||||||
!= C_OK) return;
|
!= C_OK) return;
|
||||||
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
|
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
streamID id;
|
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
|
||||||
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
|
||||||
/* Technically it could be more correct to update that only after
|
|
||||||
* checking for syntax errors, but this option is only used by
|
|
||||||
* the replication command that outputs correct syntax. */
|
|
||||||
if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id;
|
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
|
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamCompareID(&last_id,&group->last_id) > 0) {
|
||||||
|
group->last_id = last_id;
|
||||||
|
lastid_updated = 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (deliverytime != -1) {
|
if (deliverytime != -1) {
|
||||||
/* If a delivery time was passed, either with IDLE or TIME, we
|
/* If a delivery time was passed, either with IDLE or TIME, we
|
||||||
* do some sanity check on it, and set the deliverytime to now
|
* do some sanity check on it, and set the deliverytime to now
|
||||||
@ -2132,6 +2134,7 @@ void xclaimCommand(client *c) {
|
|||||||
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
|
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
|
||||||
void *arraylenptr = addDeferredMultiBulkLength(c);
|
void *arraylenptr = addDeferredMultiBulkLength(c);
|
||||||
size_t arraylen = 0;
|
size_t arraylen = 0;
|
||||||
|
long long dirty = server.dirty;
|
||||||
for (int j = 5; j <= last_id_arg; j++) {
|
for (int j = 5; j <= last_id_arg; j++) {
|
||||||
streamID id;
|
streamID id;
|
||||||
unsigned char buf[sizeof(streamID)];
|
unsigned char buf[sizeof(streamID)];
|
||||||
@ -2198,6 +2201,10 @@ void xclaimCommand(client *c) {
|
|||||||
server.dirty++;
|
server.dirty++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (server.dirty == dirty && lastid_updated) {
|
||||||
|
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
|
||||||
|
server.dirty++;
|
||||||
|
}
|
||||||
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
|
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
|
||||||
preventCommandPropagation(c);
|
preventCommandPropagation(c);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user