diff --git a/src/t_stream.c b/src/t_stream.c index ea1290fa..f233785a 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1060,14 +1060,18 @@ int string2ull(const char *s, unsigned long long *value) { * form, just stating the milliseconds time part of the stream. In such a case * the missing part is set according to the value of 'missing_seq' parameter. * The IDs "-" and "+" specify respectively the minimum and maximum IDs - * that can be represented. + * that can be represented. If 'mmid_supp' is set to 0, "-" and "+" will be + * treated as an invalid ID. * * If 'c' is set to NULL, no reply is sent to the client. */ -int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { +int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int mmid_supp) { char buf[128]; if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; memcpy(buf,o->ptr,sdslen(o->ptr)+1); + if (!mmid_supp && (buf[0] == '-' || buf[0] == '+') && + buf[1] == '\0') goto invalid; + /* Handle the "-" and "+" special cases. */ if (buf[0] == '-' && buf[1] == '\0') { id->ms = 0; @@ -1133,7 +1137,7 @@ void xaddCommand(client *c) { maxlen_arg_idx = i; } else { /* If we are here is a syntax error or a valid ID. */ - if (streamParseIDOrReply(c,c->argv[i],&id,0) != C_OK) return; + if (streamParseIDOrReply(c,c->argv[i],&id,0,0) != C_OK) return; id_given = 1; break; } @@ -1202,8 +1206,8 @@ void xrangeGenericCommand(client *c, int rev) { robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *endarg = rev ? c->argv[2] : c->argv[3]; - if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return; - if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return; + if (streamParseIDOrReply(c,startarg,&startid,0,1) == C_ERR) return; + if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX,1) == C_ERR) return; /* Parse the COUNT option if any. */ if (c->argc > 4) { @@ -1389,7 +1393,7 @@ void xreadCommand(client *c) { ids[id_idx].seq = UINT64_MAX; continue; } - if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) + if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0,0) != C_OK) goto cleanup; } @@ -1663,7 +1667,7 @@ NULL streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { id = s->last_id; - } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { + } else if (streamParseIDOrReply(c,c->argv[4],&id,0,0) != C_OK) { return; } streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); @@ -1680,7 +1684,7 @@ NULL streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { id = s->last_id; - } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { + } else if (streamParseIDOrReply(c,c->argv[4],&id,0,1) != C_OK) { return; } cg->last_id = id; @@ -1740,7 +1744,7 @@ void xackCommand(client *c) { for (int j = 3; j < c->argc; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; streamEncodeID(buf,&id); /* Lookup the ID in the group PEL: it will have a reference to the @@ -1787,9 +1791,9 @@ void xpendingCommand(client *c) { if (c->argc >= 6) { if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR) return; - if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR) + if (streamParseIDOrReply(c,c->argv[3],&startid,0,1) == C_ERR) return; - if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR) + if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX,1) == C_ERR) return; } @@ -1998,7 +2002,7 @@ void xclaimCommand(client *c) { int j; for (j = 4; j < c->argc; j++) { streamID id; - if (streamParseIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break; + if (streamParseIDOrReply(NULL,c->argv[j],&id,0,0) != C_OK) break; } int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ @@ -2057,7 +2061,7 @@ void xclaimCommand(client *c) { for (int j = 5; j <= last_id_arg; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; streamEncodeID(buf,&id); /* Lookup the ID in the group PEL. */ @@ -2140,13 +2144,13 @@ void xdelCommand(client *c) { * executed because at some point an invalid ID is parsed. */ streamID id; for (int j = 2; j < c->argc; j++) { - if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; } /* Actually apply the command. */ int deleted = 0; for (int j = 2; j < c->argc; j++) { - streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */ + streamParseIDOrReply(c,c->argv[j],&id,0,0); /* Retval already checked. */ deleted += streamDeleteItem(s,&id); }