Merge pull request #5426 from soloestoy/feature-xstream

Bugfix data inconsistency after aof rewrite, and add XSTREAM command.
This commit is contained in:
Salvatore Sanfilippo 2018-10-16 13:10:36 +02:00 committed by GitHub
commit af09df08d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 164 additions and 15 deletions

View File

@ -1139,23 +1139,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamID id;
int64_t numfields;
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
if (s->length) {
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
}
/* Append XSTREAM SETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
} else {
/* Using XSTREAM CREATE if the stream is empty. */
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
}
/* Create all the stream consumer groups. */

View File

@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xstream",xstreamCommand,-2,"wmFR",0,NULL,2,2,1,0,0},
{"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},

View File

@ -2111,6 +2111,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
void xgroupCommand(client *c);
void xstreamCommand(client *c);
void xackCommand(client *c);
void xpendingCommand(client *c);
void xclaimCommand(client *c);

View File

@ -1775,6 +1775,75 @@ NULL
}
}
/* XSTREAM CREATE <key> <id or *>
* XSTREAM SETID <key> <id or $> */
void xstreamCommand(client *c) {
const char *help[] = {
"CREATE <key> <id or *> -- Create a new empty stream.",
"SETID <key> <id or $> -- Set the current stream ID.",
"HELP -- Prints this help.",
NULL
};
stream *s = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 4) {
robj *o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
addReplySds(c,
sdsnew("-BUSYSTREAM Stream already exists\r\n"));
return;
} else {
streamID id;
if (!strcmp(c->argv[3]->ptr,"*")) {
id.ms = mstime();
id.seq = 0;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
o = createStreamObject();
s = o->ptr;
s->last_id = id;
dbAdd(c->db,c->argv[2],o);
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,3,idarg);
decrRefCount(idarg);
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-create",
c->argv[2],c->db->id);
}
} else if (!strcasecmp(opt,"SETID") && c->argc == 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
streamID id;
if (!strcmp(c->argv[3]->ptr,"$")) {
id = s->last_id;
} else if (streamParseStrictIDOrReply(c,c->argv[3],&id,0) != C_OK) {
return;
}
if (streamCompareID(&id,&s->last_id) < 0) {
addReplyError(c,"The ID specified in XSTREAM SETID is smaller than the "
"target stream top item");
return;
}
s->last_id = id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xstream-setid",
c->argv[2],c->db->id);
} else if (!strcasecmp(opt,"HELP")) {
addReplyHelp(c, help);
} else {
addReplySubcommandSyntaxError(c);
}
}
/* XACK <key> <group> <id> <id> ... <id>
*
* Acknowledge a message as processed. In practical terms we just check the

View File

@ -363,3 +363,65 @@ start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries
assert {[r xlen mystream] == 90}
}
}
start_server {tags {"xstream command"}} {
test {XSTREAM can CREATE an empty stream} {
r XSTREAM CREATE mystream *
assert {[dict get [r xinfo stream mystream] length] == 0}
}
test {XSTREAM cannot CREATE on an busy stream} {
catch {r XSTREAM CREATE mystream *} err
set _ $err
} {BUSY*}
test {XSTREAM can CREATE an empty stream with specific ID} {
r del mystream
r XSTREAM CREATE mystream "100-100"
assert {[dict get [r xinfo stream mystream] length] == 0}
assert {[dict get [r xinfo stream mystream] last-generated-id] == "100-100"}
}
test {XSTREAM can SETID with $} {
r XSTREAM SETID mystream $
assert {[dict get [r xinfo stream mystream] last-generated-id] == "100-100"}
}
test {XSTREAM can SETID with specific ID} {
r XSTREAM SETID mystream "200-0"
assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
}
test {XSTREAM cannot SETID with smaller ID} {
catch {r XSTREAM SETID mystream "0-0"} err
set _ $err
} {ERR*smaller*}
test {XSTREAM cannot SETID on non-existent key} {
catch {r XSTREAM SETID stream $} err
set _ $err
} {ERR no such key}
}
start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
test {Empty stream can be rewrite into AOF correctly} {
r XSTREAM CREATE mystream 0
assert {[dict get [r xinfo stream mystream] length] == 0}
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 0}
}
test {Stream can be rewrite into AOF correctly after XDEL lastid} {
r XADD mystream 1-1 a b
r XADD mystream 2-2 a b
assert {[dict get [r xinfo stream mystream] length] == 2}
r XDEL mystream 2-2
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 1}
assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
}
}