diff --git a/src/blocked.c b/src/blocked.c index 84d74f24..3cf661aa 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -102,7 +102,8 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void blockClient(client *c, int btype) { c->flags |= CLIENT_BLOCKED; c->btype = btype; - server.bpop_blocked_clients++; + server.blocked_clients++; + server.blocked_clients_by_type[btype]++; } /* This function is called in the beforeSleep() function of the event loop @@ -145,9 +146,10 @@ void unblockClient(client *c) { } /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ + server.blocked_clients--; + server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; - server.bpop_blocked_clients--; /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ if (!(c->flags & CLIENT_UNBLOCKED)) { diff --git a/src/db.c b/src/db.c index e422d4b8..74c2be62 100644 --- a/src/db.c +++ b/src/db.c @@ -169,8 +169,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST || val->type == OBJ_STREAM) - signalKeyAsReady(db, key); + if (val->type == OBJ_LIST) signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); } diff --git a/src/server.c b/src/server.c index e1d9abef..38f16179 100644 --- a/src/server.c +++ b/src/server.c @@ -1426,7 +1426,9 @@ void initServerConfig(void) { server.active_defrag_running = 0; server.notify_keyspace_events = 0; server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; - server.bpop_blocked_clients = 0; + server.blocked_clients = 0; + memset(server.blocked_clients_by_type,0, + sizeof(server.blocked_clients_by_type)); server.maxmemory = CONFIG_DEFAULT_MAXMEMORY; server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY; server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES; @@ -2929,7 +2931,7 @@ sds genRedisInfoString(char *section) { "blocked_clients:%d\r\n", listLength(server.clients)-listLength(server.slaves), lol, bib, - server.bpop_blocked_clients); + server.blocked_clients); } /* Memory */ diff --git a/src/server.h b/src/server.h index 8fa7380e..2d98b6f1 100644 --- a/src/server.h +++ b/src/server.h @@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ +#define BLOCKED_NUM 5 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -1130,7 +1131,8 @@ struct redisServer { int lfu_log_factor; /* LFU logarithmic counter factor. */ int lfu_decay_time; /* LFU counter decay factor. */ /* Blocked clients */ - unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ + unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ + unsigned int blocked_clients_by_type[BLOCKED_NUM]; list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we diff --git a/src/t_stream.c b/src/t_stream.c index afa8224c..c47c5dde 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -354,6 +354,8 @@ void xaddCommand(client *c) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id); server.dirty++; + if (server.blocked_clients_by_type[BLOCKED_STREAM]) + signalKeyAsReady(c->db, c->argv[1]); } /* XRANGE key start end [COUNT ] */