Streams IDs parsing refactoring.

Related to #5184.
This commit is contained in:
antirez 2018-07-31 18:08:52 +02:00
parent 187fa78637
commit 2f2987ffc5

View File

@ -1059,18 +1059,19 @@ int string2ull(const char *s, unsigned long long *value) {
* to the client, otherwise C_OK is returned. The ID may be in incomplete * to the client, otherwise C_OK is returned. The ID may be in incomplete
* form, just stating the milliseconds time part of the stream. In such a case * form, just stating the milliseconds time part of the stream. In such a case
* the missing part is set according to the value of 'missing_seq' parameter. * the missing part is set according to the value of 'missing_seq' parameter.
*
* The IDs "-" and "+" specify respectively the minimum and maximum IDs * The IDs "-" and "+" specify respectively the minimum and maximum IDs
* that can be represented. If 'mmid_supp' is set to 0, "-" and "+" will be * that can be represented. If 'strict' is set to 1, "-" and "+" will be
* treated as an invalid ID. * treated as an invalid ID.
* *
* If 'c' is set to NULL, no reply is sent to the client. */ * If 'c' is set to NULL, no reply is sent to the client. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int mmid_supp) { int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
char buf[128]; char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1); memcpy(buf,o->ptr,sdslen(o->ptr)+1);
if (!mmid_supp && (buf[0] == '-' || buf[0] == '+') && if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
buf[1] == '\0') goto invalid; goto invalid;
/* Handle the "-" and "+" special cases. */ /* Handle the "-" and "+" special cases. */
if (buf[0] == '-' && buf[1] == '\0') { if (buf[0] == '-' && buf[1] == '\0') {
@ -1100,6 +1101,20 @@ invalid:
return C_ERR; return C_ERR;
} }
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are accetable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 1, to be used when we want to return an error if the special IDs + or -
* are provided. */
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */ /* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) { void xaddCommand(client *c) {
streamID id; streamID id;
@ -1137,7 +1152,7 @@ void xaddCommand(client *c) {
maxlen_arg_idx = i; maxlen_arg_idx = i;
} else { } else {
/* If we are here is a syntax error or a valid ID. */ /* If we are here is a syntax error or a valid ID. */
if (streamParseIDOrReply(c,c->argv[i],&id,0,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
id_given = 1; id_given = 1;
break; break;
} }
@ -1206,8 +1221,8 @@ void xrangeGenericCommand(client *c, int rev) {
robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *startarg = rev ? c->argv[3] : c->argv[2];
robj *endarg = rev ? c->argv[2] : c->argv[3]; robj *endarg = rev ? c->argv[2] : c->argv[3];
if (streamParseIDOrReply(c,startarg,&startid,0,1) == C_ERR) return; if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX,1) == C_ERR) return; if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
/* Parse the COUNT option if any. */ /* Parse the COUNT option if any. */
if (c->argc > 4) { if (c->argc > 4) {
@ -1393,7 +1408,7 @@ void xreadCommand(client *c) {
ids[id_idx].seq = UINT64_MAX; ids[id_idx].seq = UINT64_MAX;
continue; continue;
} }
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0,0) != C_OK) if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
goto cleanup; goto cleanup;
} }
@ -1667,7 +1682,7 @@ NULL
streamID id; streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) { if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id; id = s->last_id;
} else if (streamParseIDOrReply(c,c->argv[4],&id,0,0) != C_OK) { } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return; return;
} }
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
@ -1684,7 +1699,7 @@ NULL
streamID id; streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) { if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id; id = s->last_id;
} else if (streamParseIDOrReply(c,c->argv[4],&id,0,1) != C_OK) { } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return; return;
} }
cg->last_id = id; cg->last_id = id;
@ -1744,7 +1759,7 @@ void xackCommand(client *c) {
for (int j = 3; j < c->argc; j++) { for (int j = 3; j < c->argc; j++) {
streamID id; streamID id;
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL: it will have a reference to the /* Lookup the ID in the group PEL: it will have a reference to the
@ -1791,9 +1806,9 @@ void xpendingCommand(client *c) {
if (c->argc >= 6) { if (c->argc >= 6) {
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR) if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
return; return;
if (streamParseIDOrReply(c,c->argv[3],&startid,0,1) == C_ERR) if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
return; return;
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX,1) == C_ERR) if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
return; return;
} }
@ -2002,7 +2017,7 @@ void xclaimCommand(client *c) {
int j; int j;
for (j = 4; j < c->argc; j++) { for (j = 4; j < c->argc; j++) {
streamID id; streamID id;
if (streamParseIDOrReply(NULL,c->argv[j],&id,0,0) != C_OK) break; if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
} }
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
@ -2061,7 +2076,7 @@ void xclaimCommand(client *c) {
for (int j = 5; j <= last_id_arg; j++) { for (int j = 5; j <= last_id_arg; j++) {
streamID id; streamID id;
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */ /* Lookup the ID in the group PEL. */
@ -2144,13 +2159,13 @@ void xdelCommand(client *c) {
* executed because at some point an invalid ID is parsed. */ * executed because at some point an invalid ID is parsed. */
streamID id; streamID id;
for (int j = 2; j < c->argc; j++) { for (int j = 2; j < c->argc; j++) {
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return; if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
} }
/* Actually apply the command. */ /* Actually apply the command. */
int deleted = 0; int deleted = 0;
for (int j = 2; j < c->argc; j++) { for (int j = 2; j < c->argc; j++) {
streamParseIDOrReply(c,c->argv[j],&id,0,0); /* Retval already checked. */ streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
deleted += streamDeleteItem(s,&id); deleted += streamDeleteItem(s,&id);
} }