diff --git a/src/blocked.c b/src/blocked.c index 376b343d..fccce35d 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -134,7 +134,7 @@ void processUnblockedClients(void) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST) { + if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); @@ -160,7 +160,7 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST) { + if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); diff --git a/src/t_stream.c b/src/t_stream.c index 92c62077..0358e644 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -412,9 +412,8 @@ void xreadCommand(client *c) { char *o = c->argv[i]->ptr; if (!strcasecmp(o,"BLOCK") && moreargs) { i++; - if (getLongLongFromObjectOrReply(c,c->argv[i],&timeout,NULL) - != C_OK) return; - if (timeout < 0) timeout = 0; + if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, + UNIT_MILLISECONDS) != C_OK) return; } else if (!strcasecmp(o,"COUNT") && moreargs) { i++; if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)