Streams: augment client.bpop with XREAD specific fields.

This commit is contained in:
antirez 2017-09-07 09:30:50 +02:00
parent f80dfbf464
commit 4086dff477
4 changed files with 18 additions and 0 deletions

View File

@ -384,6 +384,10 @@ void unblockClientWaitingData(client *c) {
decrRefCount(c->bpop.target);
c->bpop.target = NULL;
}
if (c->bpop.xread_group) {
decrRefCount(c->bpop.xread_group);
c->bpop.xread_group = NULL;
}
}
/* If the specified key has clients blocked waiting for list pushes, this

View File

@ -126,6 +126,7 @@ client *createClient(int fd) {
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;

View File

@ -648,6 +648,11 @@ typedef struct blockingState {
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
/* BLOCK_STREAM */
size_t xread_count; /* XREAD COUNT option. */
robj *xread_group; /* XREAD group name. */
mstime_t xread_retry_time, xread_retry_ttl;
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */

View File

@ -392,3 +392,11 @@ void xlenCommand(client *c) {
stream *s = o->ptr;
addReplyLongLong(c,s->length);
}
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
* key_N ID_N */
void xreadCommand(client *c) {
}