diff --git a/src/notify.c b/src/notify.c index 94a1f2e7..9bbeb142 100644 --- a/src/notify.c +++ b/src/notify.c @@ -54,6 +54,7 @@ int keyspaceEventsStringToFlags(char *classes) { case 'e': flags |= NOTIFY_EVICTED; break; case 'K': flags |= NOTIFY_KEYSPACE; break; case 'E': flags |= NOTIFY_KEYEVENT; break; + case 't': flags |= NOTIFY_STREAM; break; default: return -1; } } @@ -79,6 +80,7 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_ZSET) res = sdscatlen(res,"z",1); if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); + if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); diff --git a/src/server.c b/src/server.c index 38f16179..f1fd06ca 100644 --- a/src/server.c +++ b/src/server.c @@ -302,7 +302,7 @@ struct redisCommand redisCommandTable[] = { {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, - {"xadd",xaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, + {"xadd",xaddCommand,-5,"wmF",0,NULL,1,1,1,0,0}, {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0}, diff --git a/src/server.h b/src/server.h index 2d98b6f1..37df429b 100644 --- a/src/server.h +++ b/src/server.h @@ -427,7 +427,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define NOTIFY_ZSET (1<<7) /* z */ #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */ +#define NOTIFY_STREAM (1<<10) /* t */ +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) diff --git a/src/t_stream.c b/src/t_stream.c index 1836ae73..0921a54b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -115,8 +115,24 @@ void streamDecodeID(void *buf, streamID *id) { /* Adds a new item into the stream 's' having the specified number of * field-value pairs as specified in 'numfields' and stored into 'argv'. - * Returns the new entry ID populating the 'added_id' structure. */ -void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) { + * Returns the new entry ID populating the 'added_id' structure. + * + * If 'use_id' is not NULL, the ID is not auto-generated by the function, + * but instead the passed ID is uesd to add the new entry. In this case + * adding the entry may fail as specified later in this comment. + * + * The function returns C_OK if the item was added, this is always true + * if the ID was generated by the function. However the function may return + * C_ERR if an ID was given via 'use_id', but adding it failed since the + * current top ID is greater or equal. */ +int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) { + /* If an ID was given, check that it's greater than the last entry ID + * or return an error. */ + if (use_id && (use_id->ms < s->last_id.ms || + (use_id->ms == s->last_id.ms && + use_id->seq <= s->last_id.seq))) return C_ERR; + + /* Add the new entry. */ raxIterator ri; raxStart(&ri,s->rax); raxSeek(&ri,"$",NULL,0); @@ -133,7 +149,10 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) /* Generate the new entry ID. */ streamID id; - streamNextID(&s->last_id,&id); + if (use_id) + id = *use_id; + else + streamNextID(&s->last_id,&id); /* We have to add the key into the radix tree in lexicographic order, * to do so we consider the ID as a single 128 bit number written in @@ -173,6 +192,7 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) s->length++; s->last_id = id; if (added_id) *added_id = id; + return C_OK; } /* Send the specified range to the client 'c'. The range the client will @@ -299,7 +319,9 @@ int string2ull(const char *s, unsigned long long *value) { * form, just stating the milliseconds time part of the stream. In such a case * the missing part is set according to the value of 'missing_seq' parameter. * The IDs "-" and "+" specify respectively the minimum and maximum IDs - * that can be represented. */ + * that can be represented. + * + * If 'c' is set to NULL, no reply is sent to the client. */ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { char buf[128]; if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; @@ -328,13 +350,45 @@ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) return C_OK; invalid: - addReplyError(c,"Invalid stream ID specified as stream command argument"); + if (c) addReplyError(c,"Invalid stream ID specified as stream " + "command argument"); return C_ERR; } -/* XADD key [field value] [field value] ... */ +/* XADD key [MAXLEN ] [field value] [field value] ... */ void xaddCommand(client *c) { - if ((c->argc % 2) == 1) { + streamID id; + int id_given = 0; /* Was an ID different than "*" specified? */ + + /* Parse options. */ + int i = 2; /* This is the first argument position where we could + find an option, or the ID. */ + for (; i < c->argc; i++) { + int moreargs = i != c->argc-1; + char *opt = c->argv[i]->ptr; + if (opt[0] == '*' && opt[1] == '\0') { + /* This is just a fast path for the common case of auto-ID + * creation. */ + break; + } else if (!strcasecmp(opt,"maxlen") && moreargs) { + addReplyError(c,"Sorry, MAXLEN is still not implemented"); + i++; + return; + } else { + /* If we are here is a syntax error or a valid ID. */ + if (streamParseIDOrReply(NULL,c->argv[i],&id,0) == C_OK) { + id_given = 1; + break; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + } + int field_pos = i+1; + + /* Check arity. */ + if ((c->argc - field_pos) < 2 || (c->argc-field_pos % 2) == 1) { addReplyError(c,"wrong number of arguments for XADD"); return; } @@ -346,13 +400,19 @@ void xaddCommand(client *c) { s = o->ptr; /* Append using the low level function and return the ID. */ - streamID id; - streamAppendItem(s,c->argv+2,(c->argc-2)/2,&id); + if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, + &id, id_given ? &id : NULL) + == C_ERR) + { + addReplyError(c,"The ID specified in XADD is smaller than the " + "target stream top item"); + return; + } sds reply = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq); addReplySds(c,reply); signalModifiedKey(c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id); + notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; if (server.blocked_clients_by_type[BLOCKED_STREAM]) signalKeyAsReady(c->db, c->argv[1]);