From 2cacdcd6f8ee0af32618ceff2d303acaa61645ab Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Fri, 8 Sep 2017 16:57:32 +0200
Subject: [PATCH] Streams: XREAD related code to serve blocked clients.

---
 src/blocked.c | 50 +++++++++++++++++++++++++++++++++++++++++++++++++-
 src/server.h  |  1 +
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/src/blocked.c b/src/blocked.c
index fccce35d..84d74f24 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -240,7 +240,14 @@ void handleClientsBlockedOnKeys(void) {
                     while(numclients--) {
                         listNode *clientnode = listFirst(clients);
                         client *receiver = clientnode->value;
-                        if (receiver->btype != BLOCKED_LIST) continue;
+
+                        if (receiver->btype != BLOCKED_LIST) {
+                            /* Put on the tail, so that at the next call
+                             * we'll not run into it again. */
+                            listDelNode(clients,clientnode);
+                            listAddNodeTail(clients,receiver);
+                            continue;
+                        }
 
                         robj *dstkey = receiver->bpop.target;
                         int where = (receiver->lastcmd &&
@@ -279,6 +286,47 @@ void handleClientsBlockedOnKeys(void) {
                  * when an element was pushed on the list. */
             }
 
+            /* Serve clients blocked on stream key. */
+            else if (o != NULL && o->type == OBJ_STREAM) {
+                dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+                stream *s = o->ptr;
+
+                /* We need to provide the new data arrived on the stream
+                 * to all the clients that are waiting for an offset smaller
+                 * than the current top item. */
+                if (de) {
+                    list *clients = dictGetVal(de);
+                    listNode *ln;
+                    listIter li;
+                    listRewind(clients,&li);
+
+                    while((ln = listNext(&li))) {
+                        client *receiver = listNodeValue(ln);
+                        if (receiver->btype != BLOCKED_STREAM) continue;
+                        streamID *gt = dictFetchValue(receiver->bpop.keys,
+                                                      rl->key);
+                        if (s->last_id.ms > gt->ms ||
+                            (s->last_id.ms == gt->ms &&
+                             s->last_id.seq > gt->seq))
+                        {
+                            unblockClient(receiver);
+                            streamID start = *gt;
+                            start.seq++; /* Can't overflow, it's an uint64_t */
+
+                            /* Emit the two elements sub-array consisting of
+                             * the name of the stream and the data we
+                             * extracted from it. Wrapped in a single-item
+                             * array, since we have just one key. */
+                            addReplyMultiBulkLen(receiver,1);
+                            addReplyMultiBulkLen(receiver,2);
+                            addReplyBulk(receiver,rl->key);
+                            streamReplyWithRange(receiver,s,&start,NULL,
+                                                 receiver->bpop.xread_count);
+                        }
+                    }
+                }
+            }
+
             /* Free this item. */
             decrRefCount(rl->key);
             zfree(rl);
diff --git a/src/server.h b/src/server.h
index 4b84486e..8fa7380e 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1425,6 +1425,7 @@ void popGenericCommand(client *c, int where);
 /* Stream data type. */
 stream *streamNew(void);
 void freeStream(stream *s);
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count);
 
 /* MULTI/EXEC/WATCH... */
 void unwatchAllKeys(client *c);