diff --git a/src/t_stream.c b/src/t_stream.c index c47c5dde..1836ae73 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -399,6 +399,7 @@ void xlenCommand(client *c) { /* XREAD [BLOCK ] [COUNT ] [GROUP ] * [RETRY ] STREAMS key_1 key_2 ... key_N * ID_1 ID_2 ... ID_N */ +#define XREAD_BLOCKED_DEFAULT_COUNT 1000 void xreadCommand(client *c) { long long timeout = -1; /* -1 means, no BLOCK argument given. */ long long count = 0; @@ -510,6 +511,11 @@ void xreadCommand(client *c) { } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, NULL, ids); + /* If no COUNT is given and we block, set a relatively small count: + * in case the ID provided is too low, we do not want the server to + * block just to serve this client a huge stream of messages. */ + c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; + c->bpop.xread_group = NULL; /* Not used for now. */ goto cleanup; }