Geo: GEOADD implementation improved, replication fixed

1. We no longer use a fake client but just rewriting.
2. We group all the inserts into a single ZADD dispatch (big speed win).
3. As a side effect of the correct implementation, replication works.
4. The return value of the command is now correct.
This commit is contained in:
antirez 2015-06-23 10:18:23 +02:00
parent ae5fd11563
commit bb3284563c
3 changed files with 38 additions and 36 deletions

View File

@ -318,8 +318,6 @@ void geoAddCommand(redisClient *c) {
/* args 0-4: [cmd, key, lat, lng, val]; optional 5-6: [radius, units] /* args 0-4: [cmd, key, lat, lng, val]; optional 5-6: [radius, units]
* - OR - * - OR -
* args 0-N: [cmd, key, lat, lng, val, lat2, lng2, val2, ...] */ * args 0-N: [cmd, key, lat, lng, val, lat2, lng2, val2, ...] */
robj *cmd = c->argv[0];
robj *key = c->argv[1];
/* Prepare for the three different forms of the add command. */ /* Prepare for the three different forms of the add command. */
double radius_meters = 0; double radius_meters = 0;
@ -338,56 +336,49 @@ void geoAddCommand(redisClient *c) {
return; return;
} }
redisClient *client = c;
int elements = (c->argc - 2) / 3; int elements = (c->argc - 2) / 3;
/* elements will always be correct size (integer math floors for us if we int argc = 2+elements*2; /* ZADD key score ele ... */
* have 6 or 7 total arguments) */ robj **argv = zcalloc(argc*sizeof(robj*));
if (elements > 1) { argv[0] = createRawStringObject("zadd",4);
/* We should probably use a static client and not create/free it argv[1] = c->argv[1]; /* key */
* for every multi-add */ incrRefCount(argv[1]);
client = createClient(-1); /* fake client for multi-zadd */
/* Tell fake client to use the same DB as our calling client. */ /* Create the argument vector to call ZADD in order to add all
selectDb(client, c->db->id); * the score,value pairs to the requested zset, where score is actually
} * an encoded version of lat,long. */
uint8_t step = geohashEstimateStepsByRadius(radius_meters);
int i;
for (i = 0; i < elements; i++) {
double latlong[elements * 2];
/* Capture all lat/long components up front so if we encounter an error we if (!extractLatLongOrReply(c, (c->argv+2)+(i*3),latlong)) {
* return before making any changes to the database. */ for (i = 0; i < argc; i++)
double latlong[elements * 2]; if (argv[i]) decrRefCount(argv[i]);
for (int i = 0; i < elements; i++) { zfree(argv);
if (!extractLatLongOrReply(c, (c->argv + 2) + (i * 3),
latlong + (i * 2)))
return; return;
} }
/* Add all (lat, long, value) triples to the requested zset */
for (int i = 0; i < elements; i++) {
uint8_t step = geohashEstimateStepsByRadius(radius_meters);
#ifdef DEBUG #ifdef DEBUG
printf("Adding with step size: %d\n", step); printf("Adding with step size: %d\n", step);
#endif #endif
/* Turn the coordinates into the score of the element. */
GeoHashBits hash; GeoHashBits hash;
int ll_offset = i * 2; double latitude = latlong[0];
double latitude = latlong[ll_offset]; double longitude = latlong[1];
double longitude = latlong[ll_offset + 1];
geohashEncodeWGS84(latitude, longitude, step, &hash); geohashEncodeWGS84(latitude, longitude, step, &hash);
GeoHashFix52Bits bits = geohashAlign52Bits(hash); GeoHashFix52Bits bits = geohashAlign52Bits(hash);
robj *score = createObject(REDIS_STRING, sdsfromlonglong(bits)); robj *score = createObject(REDIS_STRING, sdsfromlonglong(bits));
robj *val = c->argv[2 + i * 3 + 2]; robj *val = c->argv[2 + i * 3 + 2];
/* (base args) + (offset for this triple) + (offset of value arg) */ argv[2+i*2] = score;
argv[3+i*2] = val;
rewriteClientCommandVector(client, 4, cmd, key, score, val); incrRefCount(val);
decrRefCount(score);
zaddCommand(client);
} }
/* If we used a fake client, return a real reply then free fake client. */ /* Finally call ZADD that will do the work for us. */
if (client != c) { replaceClientCommandVector(c,argc,argv);
addReplyLongLong(c, elements); zaddCommand(c);
freeClient(client);
}
} }
#define SORT_NONE 0 #define SORT_NONE 0

View File

@ -1527,6 +1527,16 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
va_end(ap); va_end(ap);
} }
/* Completely replace the client command vector with the provided one. */
void replaceClientCommandVector(redisClient *c, int argc, robj **argv) {
freeClientArgv(c);
zfree(c->argv);
c->argv = argv;
c->argc = argc;
c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
redisAssertWithInfo(c,NULL,c->cmd != NULL);
}
/* Rewrite a single item in the command vector. /* Rewrite a single item in the command vector.
* The new val ref count is incremented, and the old decremented. */ * The new val ref count is incremented, and the old decremented. */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) { void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {

View File

@ -1074,6 +1074,7 @@ sds catClientInfoString(sds s, redisClient *client);
sds getAllClientsInfoString(void); sds getAllClientsInfoString(void);
void rewriteClientCommandVector(redisClient *c, int argc, ...); void rewriteClientCommandVector(redisClient *c, int argc, ...);
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
void replaceClientCommandVector(redisClient *c, int argc, robj **argv);
unsigned long getClientOutputBufferMemoryUsage(redisClient *c); unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
void freeClientsInAsyncFreeQueue(void); void freeClientsInAsyncFreeQueue(void);
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c); void asyncCloseClientOnOutputBufferLimitReached(redisClient *c);