diff --git a/src/blocked.c b/src/blocked.c index 2de79837..23142d1d 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -348,9 +348,14 @@ void handleClientsBlockedOnKeys(void) { addReplyMultiBulkLen(receiver,1); addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,rl->key); + + streamPropInfo pi = { + rl->key, + receiver->bpop.xread_group + }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, 0); + 0, group, consumer, 0, &pi); } } } diff --git a/src/server.c b/src/server.c index 6f06a33a..803d609c 100644 --- a/src/server.c +++ b/src/server.c @@ -1538,6 +1538,7 @@ void initServerConfig(void) { server.execCommand = lookupCommandByCString("exec"); server.expireCommand = lookupCommandByCString("expire"); server.pexpireCommand = lookupCommandByCString("pexpire"); + server.xclaimCommand = lookupCommandByCString("xclaim"); /* Slow log */ server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; diff --git a/src/server.h b/src/server.h index 046e84b4..abfdfaa0 100644 --- a/src/server.h +++ b/src/server.h @@ -59,7 +59,6 @@ typedef long long mstime_t; /* millisecond time type. */ #include "anet.h" /* Networking the easy way */ #include "ziplist.h" /* Compact list data structure */ #include "intset.h" /* Compact integer set structure */ -#include "stream.h" /* Stream data type header file. */ #include "version.h" /* Version macro */ #include "util.h" /* Misc functions useful in many places */ #include "latency.h" /* Latency monitor API */ @@ -944,8 +943,8 @@ struct redisServer { off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, - *rpopCommand, *sremCommand, *execCommand, *expireCommand, - *pexpireCommand; + *rpopCommand, *sremCommand, *execCommand, + *expireCommand, *pexpireCommand, *xclaimCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1298,6 +1297,8 @@ typedef struct { dictEntry *de; } hashTypeIterator; +#include "stream.h" /* Stream data type header file. */ + #define OBJ_HASH_KEY 1 #define OBJ_HASH_VALUE 2 diff --git a/src/stream.h b/src/stream.h index a759056e..7cc44ae7 100644 --- a/src/stream.h +++ b/src/stream.h @@ -84,12 +84,19 @@ typedef struct streamNACK { in the last delivery. */ } streamNACK; +/* Stream propagation informations, passed to functions in order to propagate + * XCLAIM commands to AOF and slaves. */ +typedef struct sreamPropInfo { + robj *keyname; + robj *groupname; +} streamPropInfo; + /* Prototypes of exported APIs. */ struct client; stream *streamNew(void); void freeStream(stream *s); -size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 3b2ddc2e..44b4912f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -672,6 +672,44 @@ void addReplyStreamID(client *c, streamID *id) { addReplySds(c,replyid); } +/* Similar to the above function, but just creates an object, usually useful + * for replication purposes to create arguments. */ +robj *createObjectFromStreamID(streamID *id) { + return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U", + id->ms,id->seq)); +} + +/* As a result of an explicit XCLAIM or XREADGROUP command, new entries + * are created in the pending list of the stream and consumers. We need + * to propagate this changes in the form of XCLAIM commands. */ +void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) { + /* We need to generate an XCLAIM that will work in a idempotent fashion: + * + * XCLAIM 0 TIME + * RETRYCOUNT [FORCE]. */ + robj *argv[11]; + argv[0] = createStringObject("XCLAIM",6); + argv[1] = key; + argv[2] = group; + argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); + argv[4] = createStringObjectFromLongLong(0); + argv[5] = id; + argv[6] = createStringObject("TIME",4); + argv[7] = createStringObjectFromLongLong(nack->delivery_time); + argv[8] = createStringObject("RETRYCOUNT",10); + argv[9] = createStringObjectFromLongLong(nack->delivery_count); + argv[10] = createStringObject("FORCE",5); + propagate(server.xclaimCommand,c->db->id,argv,10,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[0]); + decrRefCount(argv[3]); + decrRefCount(argv[4]); + decrRefCount(argv[6]); + decrRefCount(argv[7]); + decrRefCount(argv[8]); + decrRefCount(argv[9]); + decrRefCount(argv[10]); +} + /* Send the specified range to the client 'c'. The range the client will * receive is between start and end inclusive, if 'count' is non zero, no more * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that @@ -695,6 +733,15 @@ void addReplyStreamID(client *c, streamID *id) { * This is used when the function is just used in order * to emit data and there is some higher level logic. * + * The final argument 'spi' (stream propagatino info pointer) is a structure + * filled with information needed to propagte the command execution to AOF + * and slaves, in the case a consumer group was passed: we need to generate + * XCLAIM commands to create the pending list into AOF/slaves in that case. + * + * If 'spi' is set to NULL no propagation will happen even if the group was + * given, but currently such a feature is never used by the code base that + * will always pass 'spi' and propagate when a group is passed. + * * Note that this function is recursive in certian cases. When it's called * with a non NULL group and consumer argument, it may call * streamReplyWithRangeFromConsumerPEL() in order to get entries from the @@ -706,7 +753,7 @@ void addReplyStreamID(client *c, streamID *id) { #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; @@ -763,6 +810,13 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL); retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); serverAssert(retval == 2); /* Make sure entry was inserted. */ + + /* Propagate as XCLAIM. */ + if (spi) { + robj *idarg = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack); + decrRefCount(idarg); + } } arraylen++; @@ -802,7 +856,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES) == 0) + STREAM_RWR_RAWENTRIES,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed @@ -992,8 +1046,7 @@ void xaddCommand(client *c) { /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ - robj *idarg = createObject(OBJ_STRING, - sdscatfmt(sdsempty(),"%U-%U",id.ms,id.seq)); + robj *idarg = createObjectFromStreamID(&id); rewriteClientCommandArgument(c,i,idarg); decrRefCount(idarg); @@ -1035,7 +1088,7 @@ void xrangeGenericCommand(client *c, int rev) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); } /* XRANGE key start end [COUNT ] */ @@ -1220,9 +1273,11 @@ void xreadCommand(client *c) { streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr,1); + streamPropInfo spi = {c->argv[i+streams_arg],groupname}; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, noack); + consumer, noack, &spi); + if (groups) server.dirty++; } } @@ -1268,8 +1323,11 @@ void xreadCommand(client *c) { addReply(c,shared.nullmultibulk); /* Continue to cleanup... */ -cleanup: - /* Cleanup. */ +cleanup: /* Cleanup. */ + + /* The command is propagated (in the READGROUP form) as a side effect + * of calling lower level APIs. So stop any implicit propagation. */ + preventCommandPropagation(c); if (ids != static_ids) zfree(ids); zfree(groups); } @@ -1849,12 +1907,17 @@ void xclaimCommand(client *c) { addReplyStreamID(c,&id); } else { streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES); + STREAM_RWR_RAWENTRIES,NULL); } arraylen++; + + /* Propagate this change. */ + streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack); + server.dirty++; } } setDeferredMultiBulkLength(c,arraylenptr,arraylen); + preventCommandPropagation(c); } /* XINFO [CONSUMERS group|GROUPS|STREAM]. STREAM is the default */ @@ -1951,11 +2014,11 @@ NULL end.ms = end.seq = UINT64_MAX; addReplyStatus(c,"first-entry"); count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES); + STREAM_RWR_RAWENTRIES,NULL); if (!count) addReply(c,shared.nullbulk); addReplyStatus(c,"last-entry"); count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES); + STREAM_RWR_RAWENTRIES,NULL); if (!count) addReply(c,shared.nullbulk); } else if (!strcasecmp(opt,"HELP")) { addReplyHelp(c, help);