Tracking: BCAST: basic feature now works.

This commit is contained in:
antirez 2020-02-12 19:22:04 +01:00
parent 71f3f3f1af
commit 40194a2a68
3 changed files with 55 additions and 40 deletions

View File

@ -2224,7 +2224,7 @@ NULL
* [PREFIX second] ... */
long long redir = 0;
int bcast = 0;
robj **prefix;
robj **prefix = NULL;
size_t numprefix = 0;
/* Parse the options. */

View File

@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);

View File

@ -85,6 +85,8 @@ void disableTracking(client *c) {
}
}
raxStop(&ri);
raxFree(c->client_tracking_prefixes);
c->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
if (c->client_tracking_prefixes == NULL)
c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
c->flags |= CLIENT_TRACKING;
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->client_tracking_redirection = redirect_to;
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
c->client_tracking_redirection = redirect_to;
if (TrackingTable == NULL) {
TrackingTable = raxNew();
PrefixTable = raxNew();
@ -229,8 +233,9 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (keylen > ri.key_len) continue;
if (memcmp(ri.key,keyname,ri.key_len) != 0) continue;
if (ri.key_len > keylen) continue;
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
@ -362,10 +367,15 @@ void trackingLimitUsedSlots(void) {
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
/* Return ASAP if there is nothing to do here. */
if (TrackingTable == NULL || !server.tracking_clients) return;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = ri.data;
if (raxSize(bs->keys)) {
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
@ -379,7 +389,9 @@ void trackingBroadcastInvalidationMessages(void) {
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
sds proto = sdsnewlen("$",1);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
@ -399,9 +411,8 @@ void trackingBroadcastInvalidationMessages(void) {
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
raxFree(bs->clients);
}
raxFree(bs->keys);
bs->clients = raxNew();
bs->keys = raxNew();
}
raxStop(&ri);