mirror of
https://github.com/fluencelabs/redis
synced 2025-03-30 22:31:03 +00:00
Merge pull request #5184 from 0xtonyxia/streams-id-opt
Streams: rearrange the usage of '-' and '+' IDs in stream commands.
This commit is contained in:
commit
187fa78637
@ -1060,14 +1060,18 @@ int string2ull(const char *s, unsigned long long *value) {
|
|||||||
* form, just stating the milliseconds time part of the stream. In such a case
|
* form, just stating the milliseconds time part of the stream. In such a case
|
||||||
* the missing part is set according to the value of 'missing_seq' parameter.
|
* the missing part is set according to the value of 'missing_seq' parameter.
|
||||||
* The IDs "-" and "+" specify respectively the minimum and maximum IDs
|
* The IDs "-" and "+" specify respectively the minimum and maximum IDs
|
||||||
* that can be represented.
|
* that can be represented. If 'mmid_supp' is set to 0, "-" and "+" will be
|
||||||
|
* treated as an invalid ID.
|
||||||
*
|
*
|
||||||
* If 'c' is set to NULL, no reply is sent to the client. */
|
* If 'c' is set to NULL, no reply is sent to the client. */
|
||||||
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
|
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int mmid_supp) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
|
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
|
||||||
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
|
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
|
||||||
|
|
||||||
|
if (!mmid_supp && (buf[0] == '-' || buf[0] == '+') &&
|
||||||
|
buf[1] == '\0') goto invalid;
|
||||||
|
|
||||||
/* Handle the "-" and "+" special cases. */
|
/* Handle the "-" and "+" special cases. */
|
||||||
if (buf[0] == '-' && buf[1] == '\0') {
|
if (buf[0] == '-' && buf[1] == '\0') {
|
||||||
id->ms = 0;
|
id->ms = 0;
|
||||||
@ -1133,7 +1137,7 @@ void xaddCommand(client *c) {
|
|||||||
maxlen_arg_idx = i;
|
maxlen_arg_idx = i;
|
||||||
} else {
|
} else {
|
||||||
/* If we are here is a syntax error or a valid ID. */
|
/* If we are here is a syntax error or a valid ID. */
|
||||||
if (streamParseIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
|
if (streamParseIDOrReply(c,c->argv[i],&id,0,0) != C_OK) return;
|
||||||
id_given = 1;
|
id_given = 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1202,8 +1206,8 @@ void xrangeGenericCommand(client *c, int rev) {
|
|||||||
robj *startarg = rev ? c->argv[3] : c->argv[2];
|
robj *startarg = rev ? c->argv[3] : c->argv[2];
|
||||||
robj *endarg = rev ? c->argv[2] : c->argv[3];
|
robj *endarg = rev ? c->argv[2] : c->argv[3];
|
||||||
|
|
||||||
if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
|
if (streamParseIDOrReply(c,startarg,&startid,0,1) == C_ERR) return;
|
||||||
if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
|
if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX,1) == C_ERR) return;
|
||||||
|
|
||||||
/* Parse the COUNT option if any. */
|
/* Parse the COUNT option if any. */
|
||||||
if (c->argc > 4) {
|
if (c->argc > 4) {
|
||||||
@ -1389,7 +1393,7 @@ void xreadCommand(client *c) {
|
|||||||
ids[id_idx].seq = UINT64_MAX;
|
ids[id_idx].seq = UINT64_MAX;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
|
if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0,0) != C_OK)
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1663,7 +1667,7 @@ NULL
|
|||||||
streamID id;
|
streamID id;
|
||||||
if (!strcmp(c->argv[4]->ptr,"$")) {
|
if (!strcmp(c->argv[4]->ptr,"$")) {
|
||||||
id = s->last_id;
|
id = s->last_id;
|
||||||
} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
|
} else if (streamParseIDOrReply(c,c->argv[4],&id,0,0) != C_OK) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
|
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
|
||||||
@ -1680,7 +1684,7 @@ NULL
|
|||||||
streamID id;
|
streamID id;
|
||||||
if (!strcmp(c->argv[4]->ptr,"$")) {
|
if (!strcmp(c->argv[4]->ptr,"$")) {
|
||||||
id = s->last_id;
|
id = s->last_id;
|
||||||
} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
|
} else if (streamParseIDOrReply(c,c->argv[4],&id,0,1) != C_OK) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cg->last_id = id;
|
cg->last_id = id;
|
||||||
@ -1740,7 +1744,7 @@ void xackCommand(client *c) {
|
|||||||
for (int j = 3; j < c->argc; j++) {
|
for (int j = 3; j < c->argc; j++) {
|
||||||
streamID id;
|
streamID id;
|
||||||
unsigned char buf[sizeof(streamID)];
|
unsigned char buf[sizeof(streamID)];
|
||||||
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return;
|
||||||
streamEncodeID(buf,&id);
|
streamEncodeID(buf,&id);
|
||||||
|
|
||||||
/* Lookup the ID in the group PEL: it will have a reference to the
|
/* Lookup the ID in the group PEL: it will have a reference to the
|
||||||
@ -1787,9 +1791,9 @@ void xpendingCommand(client *c) {
|
|||||||
if (c->argc >= 6) {
|
if (c->argc >= 6) {
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
|
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
|
||||||
return;
|
return;
|
||||||
if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
|
if (streamParseIDOrReply(c,c->argv[3],&startid,0,1) == C_ERR)
|
||||||
return;
|
return;
|
||||||
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
|
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX,1) == C_ERR)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1998,7 +2002,7 @@ void xclaimCommand(client *c) {
|
|||||||
int j;
|
int j;
|
||||||
for (j = 4; j < c->argc; j++) {
|
for (j = 4; j < c->argc; j++) {
|
||||||
streamID id;
|
streamID id;
|
||||||
if (streamParseIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
|
if (streamParseIDOrReply(NULL,c->argv[j],&id,0,0) != C_OK) break;
|
||||||
}
|
}
|
||||||
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
|
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
|
||||||
|
|
||||||
@ -2057,7 +2061,7 @@ void xclaimCommand(client *c) {
|
|||||||
for (int j = 5; j <= last_id_arg; j++) {
|
for (int j = 5; j <= last_id_arg; j++) {
|
||||||
streamID id;
|
streamID id;
|
||||||
unsigned char buf[sizeof(streamID)];
|
unsigned char buf[sizeof(streamID)];
|
||||||
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return;
|
||||||
streamEncodeID(buf,&id);
|
streamEncodeID(buf,&id);
|
||||||
|
|
||||||
/* Lookup the ID in the group PEL. */
|
/* Lookup the ID in the group PEL. */
|
||||||
@ -2140,13 +2144,13 @@ void xdelCommand(client *c) {
|
|||||||
* executed because at some point an invalid ID is parsed. */
|
* executed because at some point an invalid ID is parsed. */
|
||||||
streamID id;
|
streamID id;
|
||||||
for (int j = 2; j < c->argc; j++) {
|
for (int j = 2; j < c->argc; j++) {
|
||||||
if (streamParseIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
if (streamParseIDOrReply(c,c->argv[j],&id,0,0) != C_OK) return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Actually apply the command. */
|
/* Actually apply the command. */
|
||||||
int deleted = 0;
|
int deleted = 0;
|
||||||
for (int j = 2; j < c->argc; j++) {
|
for (int j = 2; j < c->argc; j++) {
|
||||||
streamParseIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */
|
streamParseIDOrReply(c,c->argv[j],&id,0,0); /* Retval already checked. */
|
||||||
deleted += streamDeleteItem(s,&id);
|
deleted += streamDeleteItem(s,&id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user