RESP3: t_stream.c updated.

This commit is contained in:
antirez 2018-12-03 16:24:04 +01:00
parent baf5b3f93a
commit 8a0391fbc9
2 changed files with 49 additions and 40 deletions

View File

@ -436,8 +436,12 @@ void handleClientsBlockedOnKeys(void) {
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
* array, since we have just one key. */
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
if (receiver->resp == 2) {
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
} else {
addReplyMapLen(receiver,1);
}
addReplyBulk(receiver,rl->key);
streamPropInfo pi = {

View File

@ -914,7 +914,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
}
if (!(flags & STREAM_RWR_RAWENTRIES))
arraylen_ptr = addDeferredMultiBulkLength(c);
arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
@ -925,9 +925,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
addReplyMultiBulkLen(c,2);
addReplyArrayLen(c,2);
addReplyStreamID(c,&id);
addReplyMultiBulkLen(c,numfields*2);
addReplyMapLen(c,numfields);
/* Emit the field-value pairs. */
while(numfields--) {
@ -993,7 +994,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
if (count && count == arraylen) break;
}
streamIteratorStop(&si);
if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
@ -1018,7 +1019,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
if (end) streamEncodeID(endkey,end);
size_t arraylen = 0;
void *arraylen_ptr = addDeferredMultiBulkLength(c);
void *arraylen_ptr = addReplyDeferredLen(c);
raxStart(&ri,consumer->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
while(raxNext(&ri) && (!count || arraylen < count)) {
@ -1032,11 +1033,11 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
* about a message that's no longer here because was removed
* by the user by other means. In that case we signal it emitting
* the ID but then a NULL entry for the fields. */
addReplyMultiBulkLen(c,2);
addReplyArrayLen(c,2);
streamID id;
streamDecodeID(ri.key,&id);
addReplyStreamID(c,&id);
addReply(c,shared.nullmultibulk);
addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
@ -1045,7 +1046,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
arraylen++;
}
raxStop(&ri);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
@ -1286,12 +1287,13 @@ void xrangeGenericCommand(client *c, int rev) {
}
/* Return the specified range to the user. */
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
if (count == 0) {
addReply(c,shared.nullmultibulk);
addReplyNullArray(c);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
@ -1505,7 +1507,7 @@ void xreadCommand(client *c) {
if (serve_synchronously) {
arraylen++;
if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
/* streamReplyWithRange() handles the 'start' ID as inclusive,
* so start from the next ID, since we want only messages with
* IDs greater than start. */
@ -1514,7 +1516,7 @@ void xreadCommand(client *c) {
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */
addReplyMultiBulkLen(c,2);
if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
@ -1532,7 +1534,10 @@ void xreadCommand(client *c) {
/* We replied synchronously! Set the top array len and return to caller. */
if (arraylen) {
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
if (c->resp == 2)
setDeferredArrayLen(c,arraylen_ptr,arraylen);
else
setDeferredMapLen(c,arraylen_ptr,arraylen);
goto cleanup;
}
@ -1541,7 +1546,7 @@ void xreadCommand(client *c) {
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
addReplyNullArray(c);
goto cleanup;
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
@ -1570,7 +1575,7 @@ void xreadCommand(client *c) {
/* No BLOCK option, nor any stream we can serve. Reply as with a
* timeout happened. */
addReply(c,shared.nullmultibulk);
addReplyNullArray(c);
/* Continue to cleanup... */
cleanup: /* Cleanup. */
@ -1960,14 +1965,14 @@ void xpendingCommand(client *c) {
/* XPENDING <key> <group> variant. */
if (justinfo) {
addReplyMultiBulkLen(c,4);
addReplyArrayLen(c,4);
/* Total number of messages in the PEL. */
addReplyLongLong(c,raxSize(group->pel));
/* First and last IDs. */
if (raxSize(group->pel) == 0) {
addReply(c,shared.nullbulk); /* Start. */
addReply(c,shared.nullbulk); /* End. */
addReply(c,shared.nullmultibulk); /* Clients. */
addReplyNull(c); /* Start. */
addReplyNull(c); /* End. */
addReplyNullArray(c); /* Clients. */
} else {
/* Start. */
raxIterator ri;
@ -1987,17 +1992,17 @@ void xpendingCommand(client *c) {
/* Consumers with pending messages. */
raxStart(&ri,group->consumers);
raxSeek(&ri,"^",NULL,0);
void *arraylen_ptr = addDeferredMultiBulkLength(c);
void *arraylen_ptr = addReplyDeferredLen(c);
size_t arraylen = 0;
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;
if (raxSize(consumer->pel) == 0) continue;
addReplyMultiBulkLen(c,2);
addReplyArrayLen(c,2);
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkLongLong(c,raxSize(consumer->pel));
arraylen++;
}
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
setDeferredArrayLen(c,arraylen_ptr,arraylen);
raxStop(&ri);
}
}
@ -2010,7 +2015,7 @@ void xpendingCommand(client *c) {
/* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */
if (consumername && consumer == NULL) {
addReplyMultiBulkLen(c,0);
addReplyArrayLen(c,0);
return;
}
@ -2024,7 +2029,7 @@ void xpendingCommand(client *c) {
streamEncodeID(endkey,&endid);
raxStart(&ri,pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
void *arraylen_ptr = addDeferredMultiBulkLength(c);
void *arraylen_ptr = addReplyDeferredLen(c);
size_t arraylen = 0;
while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
@ -2032,7 +2037,7 @@ void xpendingCommand(client *c) {
arraylen++;
count--;
addReplyMultiBulkLen(c,4);
addReplyArrayLen(c,4);
/* Entry ID. */
streamID id;
@ -2052,7 +2057,7 @@ void xpendingCommand(client *c) {
addReplyLongLong(c,nack->delivery_count);
}
raxStop(&ri);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
setDeferredArrayLen(c,arraylen_ptr,arraylen);
}
}
@ -2221,7 +2226,7 @@ void xclaimCommand(client *c) {
/* Do the actual claiming. */
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
void *arraylenptr = addDeferredMultiBulkLength(c);
void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
@ -2284,7 +2289,7 @@ void xclaimCommand(client *c) {
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReply(c,shared.nullbulk);
if (!emitted) addReplyNull(c);
}
arraylen++;
@ -2298,7 +2303,7 @@ void xclaimCommand(client *c) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
setDeferredArrayLen(c,arraylenptr,arraylen);
preventCommandPropagation(c);
}
@ -2463,7 +2468,7 @@ NULL
return;
}
addReplyMultiBulkLen(c,raxSize(cg->consumers));
addReplyArrayLen(c,raxSize(cg->consumers));
raxIterator ri;
raxStart(&ri,cg->consumers);
raxSeek(&ri,"^",NULL,0);
@ -2473,7 +2478,7 @@ NULL
mstime_t idle = now - consumer->seen_time;
if (idle < 0) idle = 0;
addReplyMultiBulkLen(c,6);
addReplyMapLen(c,3);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
addReplyBulkCString(c,"pending");
@ -2485,17 +2490,17 @@ NULL
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
/* XINFO GROUPS <key>. */
if (s->cgroups == NULL) {
addReplyMultiBulkLen(c,0);
addReplyArrayLen(c,0);
return;
}
addReplyMultiBulkLen(c,raxSize(s->cgroups));
addReplyArrayLen(c,raxSize(s->cgroups));
raxIterator ri;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
addReplyMultiBulkLen(c,8);
addReplyMapLen(c,4);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkCString(c,"consumers");
@ -2508,7 +2513,7 @@ NULL
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */
addReplyMultiBulkLen(c,14);
addReplyMapLen(c,7);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
@ -2529,11 +2534,11 @@ NULL
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);
if (!count) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);
if (!count) addReplyNull(c);
} else {
addReplySubcommandSyntaxError(c);
}