Streams: XREAD ability to block fixed.

This commit is contained in:
antirez 2017-09-08 12:25:06 +02:00
parent 6a1c92d52d
commit 0adb43b68f
2 changed files with 4 additions and 5 deletions

View File

@ -134,7 +134,7 @@ void processUnblockedClients(void) {
/* Unblock a client calling the right function depending on the kind /* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */ * of operation the client is blocking for. */
void unblockClient(client *c) { void unblockClient(client *c) {
if (c->btype == BLOCKED_LIST) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c); unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) { } else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c); unblockClientWaitingReplicas(c);
@ -160,7 +160,7 @@ void unblockClient(client *c) {
* send it a reply of some kind. After this function is called, * send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */ * unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) { void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
addReply(c,shared.nullmultibulk); addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) { } else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));

View File

@ -412,9 +412,8 @@ void xreadCommand(client *c) {
char *o = c->argv[i]->ptr; char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) { if (!strcasecmp(o,"BLOCK") && moreargs) {
i++; i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&timeout,NULL) if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
!= C_OK) return; UNIT_MILLISECONDS) != C_OK) return;
if (timeout < 0) timeout = 0;
} else if (!strcasecmp(o,"COUNT") && moreargs) { } else if (!strcasecmp(o,"COUNT") && moreargs) {
i++; i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)