From 4086dff477cc3d979d39c6c4ba9457575fc67d3e Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 7 Sep 2017 09:30:50 +0200 Subject: [PATCH] Streams: augment client.bpop with XREAD specific fields. --- src/blocked.c | 4 ++++ src/networking.c | 1 + src/server.h | 5 +++++ src/t_stream.c | 8 ++++++++ 4 files changed, 18 insertions(+) diff --git a/src/blocked.c b/src/blocked.c index 74dab0c1..376b343d 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -384,6 +384,10 @@ void unblockClientWaitingData(client *c) { decrRefCount(c->bpop.target); c->bpop.target = NULL; } + if (c->bpop.xread_group) { + decrRefCount(c->bpop.xread_group); + c->bpop.xread_group = NULL; + } } /* If the specified key has clients blocked waiting for list pushes, this diff --git a/src/networking.c b/src/networking.c index d672ec32..f0bdacfa 100644 --- a/src/networking.c +++ b/src/networking.c @@ -126,6 +126,7 @@ client *createClient(int fd) { c->bpop.timeout = 0; c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.target = NULL; + c->bpop.xread_group = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; diff --git a/src/server.h b/src/server.h index 2c69a94c..34c5fb06 100644 --- a/src/server.h +++ b/src/server.h @@ -648,6 +648,11 @@ typedef struct blockingState { robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ + /* BLOCK_STREAM */ + size_t xread_count; /* XREAD COUNT option. */ + robj *xread_group; /* XREAD group name. */ + mstime_t xread_retry_time, xread_retry_ttl; + /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */ diff --git a/src/t_stream.c b/src/t_stream.c index 52b0e105..66c6cb89 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -392,3 +392,11 @@ void xlenCommand(client *c) { stream *s = o->ptr; addReplyLongLong(c,s->length); } + +/* XREAD [BLOCK ] [COUNT ] [GROUP ] + * [RETRY ] STREAMS key_1 ID_1 key_2 ID_2 ... + * key_N ID_N */ +void xreadCommand(client *c) { +} + +