diff --git a/src/server.c b/src/server.c index 3229907d03..189a7eb516 100644 --- a/src/server.c +++ b/src/server.c @@ -889,6 +889,266 @@ void trackInstantaneousMetric(int metric, long long current_value, long long cur server.inst_metric[metric].last_sample_value = current_value; } +void displayUpdate(int pre_value, int current_value) { + serverLog(LL_WARNING, "This is for testing, previous item number is %d, and current item number is %d", pre_value, current_value); +} + +void displayDataTypeArray(keysizeInfo *keysize_array, int length) { + serverLog(LL_WARNING, "Current array length is %d", length); + for (int i = 0; i < length; i++) { + serverLog(LL_WARNING, "Item %ld and value is %ld", keysize_array[i].element_size, keysize_array[i].num); + } +} + +void decreaseDataTypeArrayPreviousValue(keysizeInfo *keysize_array, int low, int high, int value) { + if (keysize_array[low].element_size == value) { + keysize_array[low].num--; + } else if (keysize_array[high].element_size == value) { + keysize_array[high].num--; + } else { + while (low + 1 < high) { + int mid = low + (high - low) / 2; + if (value > keysize_array[mid].element_size) { + low = mid; + } else { + high = mid; + } + } + if (value == keysize_array[high].element_size) { + keysize_array[high].num--; + } else { + keysize_array[low].num--; + } + } +} + +void increaseDataTypeArrayCurrentValue(keysizeInfo *keysize_array, int low, int high, int value) { + if (keysize_array[low].element_size == value) { + keysize_array[low].num++; + } else if (keysize_array[high].element_size == value) { + keysize_array[high].num++; + } else { + while (low + 1 < high) { + int mid = low + (high - low) / 2; + if (value > keysize_array[mid].element_size) { + low = mid; + } else { + high = mid; + } + } + if (value == keysize_array[high].element_size) { + keysize_array[high].num++; + } else { + keysize_array[low].num++; + } + } +} + +void scaleHashKeySizeArray(client *c, long value) { + int length = c->db->hashes_array_length; + int high_bound = c->db->hashes_array[length - 1].element_size; + int base = high_bound; + int count = 0; + while (high_bound < value) { + count++; + high_bound = high_bound * 2; + } + keysizeInfo *new_array = zmalloc(sizeof(keysizeInfo) * (count + length)); + for (int i = 0; i < length; i++) { + new_array[i].element_size = c->db->hashes_array[i].element_size; + new_array[i].num = c->db->hashes_array[i].num; + } + for (int i = length; i < (count + length); i++) { + base *= 2; + new_array[i].element_size = base; + new_array[i].num = 0; + } + keysizeInfo *old_array = c->db->hashes_array; + zfree(old_array); + c->db->hashes_array = new_array; + c->db->hashes_array_length = count + length; +} + +void updateHashKeySizeArray(client *c, long previous, long curr) { + int low = 0; + int high = c->db->hashes_array_length - 1; + if (curr > c->db->hashes_array[high].element_size) { + scaleHashKeySizeArray(c, curr); + } + + high = c->db->hashes_array_length - 1; + if (previous != 0) { + decreaseDataTypeArrayPreviousValue(c->db->hashes_array, low, high, previous); + } + if (curr != 0) { + increaseDataTypeArrayCurrentValue(c->db->hashes_array, low, high, curr); + } +} + +void scaleListKeySizeArray(client *c, long value) { + int length = c->db->lists_array_length; + int high_bound = c->db->lists_array[length - 1].element_size; + int base = high_bound; + int count = 0; + while (high_bound < value) { + count++; + high_bound = high_bound * 2; + } + keysizeInfo *new_array = zmalloc(sizeof(keysizeInfo) * (count + length)); + for (int i = 0; i < length; i++) { + new_array[i].element_size = c->db->lists_array[i].element_size; + new_array[i].num = c->db->lists_array[i].num; + } + for (int i = length; i < (count + length); i++) { + base *= 2; + new_array[i].element_size = base; + new_array[i].num = 0; + } + keysizeInfo *old_array = c->db->lists_array; + zfree(old_array); + c->db->lists_array = new_array; + c->db->lists_array_length = count + length; +} + +void updateListKeySizeArray(client *c, long previous, long curr) { + int low = 0; + int high = c->db->lists_array_length - 1; + if (curr > c->db->lists_array[high].element_size) { + scaleListKeySizeArray(c, curr); + } + + high = c->db->lists_array_length - 1; + if (previous != 0) { + decreaseDataTypeArrayPreviousValue(c->db->lists_array, low, high, previous); + } + if (curr != 0) { + increaseDataTypeArrayCurrentValue(c->db->lists_array, low, high, curr); + } +} + +void scaleSetKeySizeArray(client *c, long value) { + int length = c->db->sets_array_length; + int high_bound = c->db->sets_array[length - 1].element_size; + int base = high_bound; + int count = 0; + while (high_bound < value) { + count++; + high_bound = high_bound * 2; + } + keysizeInfo *new_array = zmalloc(sizeof(keysizeInfo) * (count + length)); + for (int i = 0; i < length; i++) { + new_array[i].element_size = c->db->sets_array[i].element_size; + new_array[i].num = c->db->sets_array[i].num; + } + for (int i = length; i < (count + length); i++) { + base *= 2; + new_array[i].element_size = base; + new_array[i].num = 0; + } + keysizeInfo *old_array = c->db->sets_array; + zfree(old_array); + c->db->sets_array = new_array; + c->db->sets_array_length = count + length; +} + +void updateSetKeySizeArray(client *c, long previous, long curr) { + int low = 0; + int high = c->db->sets_array_length - 1; + if (curr > c->db->sets_array[high].element_size) { + scaleSetKeySizeArray(c, curr); + } + + high = c->db->sets_array_length - 1; + if (previous != 0) { + decreaseDataTypeArrayPreviousValue(c->db->sets_array, low, high, previous); + } + if (curr != 0) { + increaseDataTypeArrayCurrentValue(c->db->sets_array, low, high, curr); + } +} + +void scaleStringKeySizeArray(client *c, long value) { + int length = c->db->strings_array_length; + int high_bound = c->db->strings_array[length - 1].element_size; + int base = high_bound; + int count = 0; + while (high_bound < value) { + count++; + high_bound = high_bound * 2; + } + keysizeInfo *new_array = zmalloc(sizeof(keysizeInfo) * (count + length)); + for (int i = 0; i < length; i++) { + new_array[i].element_size = c->db->strings_array[i].element_size; + new_array[i].num = c->db->strings_array[i].num; + } + for (int i = length; i < (count + length); i++) { + base *= 2; + new_array[i].element_size = base; + new_array[i].num = 0; + } + keysizeInfo *old_array = c->db->strings_array; + zfree(old_array); + c->db->strings_array = new_array; + c->db->strings_array_length = count + length; +} + +void updateStringKeySizeArray(client *c, long previous, long curr) { + int low = 0; + int high = c->db->strings_array_length - 1; + if (curr > c->db->strings_array[high].element_size) { + scaleStringKeySizeArray(c, curr); + } + + high = c->db->strings_array_length - 1; + if (previous != 0) { + decreaseDataTypeArrayPreviousValue(c->db->strings_array, low, high, previous); + } + if (curr != 0) { + increaseDataTypeArrayCurrentValue(c->db->strings_array, low, high, curr); + } +} + +void scaleZsetKeySizeArray(client *c, long value) { + int length = c->db->zsets_array_length; + int high_bound = c->db->zsets_array[length - 1].element_size; + int base = high_bound; + int count = 0; + while (high_bound < value) { + count++; + high_bound = high_bound * 2; + } + keysizeInfo *new_array = zmalloc(sizeof(keysizeInfo) * (count + length)); + for (int i = 0; i < length; i++) { + new_array[i].element_size = c->db->zsets_array[i].element_size; + new_array[i].num = c->db->zsets_array[i].num; + } + for (int i = length; i < (count + length); i++) { + base *= 2; + new_array[i].element_size = base; + new_array[i].num = 0; + } + keysizeInfo *old_array = c->db->zsets_array; + zfree(old_array); + c->db->zsets_array = new_array; + c->db->zsets_array_length = count + length; +} + +void updateZsetKeySizeArray(client *c, long previous, long curr) { + int low = 0; + int high = c->db->zsets_array_length - 1; + if (curr > c->db->zsets_array[high].element_size) { + scaleZsetKeySizeArray(c, curr); + } + + high = c->db->zsets_array_length - 1; + if (previous != 0) { + decreaseDataTypeArrayPreviousValue(c->db->zsets_array, low, high, previous); + } + if (curr != 0) { + increaseDataTypeArrayCurrentValue(c->db->zsets_array, low, high, curr); + } +} + /* Return the mean of all the samples. */ long long getInstantaneousMetric(int metric) { int j; @@ -2740,6 +3000,7 @@ void makeThreadKillable(void) { pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); } +#define INIT_ARRAY_SIZE 5 void initServer(void) { int j; @@ -2834,6 +3095,35 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0; + server.db[j].lists_array_length = INIT_ARRAY_SIZE; + server.db[j].lists_number_of_elements = 0; + server.db[j].lists_array = zmalloc(sizeof(keysizeInfo) * INIT_ARRAY_SIZE); + server.db[j].sets_array_length = INIT_ARRAY_SIZE; + server.db[j].sets_number_of_elements = 0; + server.db[j].sets_array = zmalloc(sizeof(keysizeInfo) * INIT_ARRAY_SIZE); + server.db[j].hashes_array_length = INIT_ARRAY_SIZE; + server.db[j].hashes_number_of_elements = 0; + server.db[j].hashes_array = zmalloc(sizeof(keysizeInfo) * INIT_ARRAY_SIZE); + server.db[j].zsets_array_length = INIT_ARRAY_SIZE; + server.db[j].zsets_number_of_elements = 0; + server.db[j].zsets_array = zmalloc(sizeof(keysizeInfo) * INIT_ARRAY_SIZE); + server.db[j].strings_array_length = INIT_ARRAY_SIZE; + server.db[j].strings_number_of_elements = 0; + server.db[j].strings_array = zmalloc(sizeof(keysizeInfo) * INIT_ARRAY_SIZE); + int i = 1; + for (int count = 0; count < INIT_ARRAY_SIZE; count++) { + server.db[j].lists_array[count].element_size = i; + server.db[j].lists_array[count].num = 0; + server.db[j].sets_array[count].element_size = i; + server.db[j].sets_array[count].num = 0; + server.db[j].hashes_array[count].element_size = i; + server.db[j].hashes_array[count].num = 0; + server.db[j].zsets_array[count].element_size = i; + server.db[j].zsets_array[count].num = 0; + server.db[j].strings_array[count].element_size = i; + server.db[j].strings_array[count].num = 0; + i *= 2; + } } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which @@ -5560,6 +5850,7 @@ dict *genInfoSectionDict(robj **argv, int argc, char **defaults, int *out_all, i "errorstats", "cluster", "keyspace", + "keysizes", NULL, }; if (!defaults) defaults = default_sections; @@ -6214,6 +6505,39 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { } } + /* Key size distribution*/ + if (all_sections || (dictFind(section_dict, "keysizes") != NULL)) { + if (sections++) info = sdscat(info, "\r\n"); + info = sdscatprintf(info, "# Keysizes\r\n"); + for (j = 0; j < server.dbnum; j++) { + info = sdscatprintf(info, "db%d_distrib_strings_sizes:", j); + for (int l = 0; l < server.db[j].strings_array_length; l++) { + info = sdscatprintf(info, "%ld=%ld,", server.db[j].strings_array[l].element_size, server.db[j].strings_array[l].num); + } + info = sdscatprintf(info, "\r\n"); + info = sdscatprintf(info, "db%d_distrib_lists_items:", j); + for (int l = 0; l < server.db[j].lists_array_length; l++) { + info = sdscatprintf(info, "%ld=%ld,", server.db[j].lists_array[l].element_size, server.db[j].lists_array[l].num); + } + info = sdscatprintf(info, "\r\n"); + info = sdscatprintf(info, "db%d_distrib_sets_items:", j); + for (int l = 0; l < server.db[j].sets_array_length; l++) { + info = sdscatprintf(info, "%ld=%ld,", server.db[j].sets_array[l].element_size, server.db[j].sets_array[l].num); + } + info = sdscatprintf(info, "\r\n"); + info = sdscatprintf(info, "db%d_distrib_hashes_items:", j); + for (int l = 0; l < server.db[j].hashes_array_length; l++) { + info = sdscatprintf(info, "%ld=%ld,", server.db[j].hashes_array[l].element_size, server.db[j].hashes_array[l].num); + } + info = sdscatprintf(info, "\r\n"); + info = sdscatprintf(info, "db%d_distrib_zsets_items:", j); + for (int l = 0; l < server.db[j].zsets_array_length; l++) { + info = sdscatprintf(info, "%ld=%ld,", server.db[j].zsets_array[l].element_size, server.db[j].zsets_array[l].num); + } + info = sdscatprintf(info, "\r\n"); + } + } + /* Get info from modules. * Returned when the user asked for "everything", "modules", or a specific module section. * We're not aware of the module section names here, and we rather avoid the search when we can. diff --git a/src/server.h b/src/server.h index 246b31e9ff..90af95fa32 100644 --- a/src/server.h +++ b/src/server.h @@ -819,6 +819,11 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; +typedef struct keysizeInfo { + long element_size; + long num; +} keysizeInfo; + /* Database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -834,6 +839,21 @@ typedef struct serverDb { int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ + keysizeInfo *lists_array; + int lists_array_length; + unsigned long long lists_number_of_elements; + keysizeInfo *sets_array; + int sets_array_length; + unsigned long long sets_number_of_elements; + keysizeInfo *hashes_array; + int hashes_array_length; + unsigned long long hashes_number_of_elements; + keysizeInfo *zsets_array; + int zsets_array_length; + unsigned long long zsets_number_of_elements; + keysizeInfo *strings_array; + int strings_array_length; + unsigned long long strings_number_of_elements; } serverDb; /* forward declaration for functions ctx */ @@ -3229,6 +3249,15 @@ void *activeDefragAlloc(void *ptr); robj *activeDefragStringOb(robj *ob); void dismissSds(sds s); void dismissMemoryInChild(void); +void displayUpdate(int pre_value, int current_value); +void displayDataTypeArray(keysizeInfo *keysize_array, int length); +void decreaseDataTypeArrayPreviousValue(keysizeInfo *keysize_array, int low, int high, int value); +void increaseDataTypeArrayCurrentValue(keysizeInfo *keysize_array, int low, int high, int value); +void updateHashKeySizeArray(client *c, long previous, long curr); +void updateStringKeySizeArray(client *c, long previous, long curr); +void updateListKeySizeArray(client *c, long previous, long curr); +void updateZsetKeySizeArray(client *c, long previous, long curr); +void updateSetKeySizeArray(client *c, long previous, long curr); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1 << 0) /* Do proper shutdown. */ diff --git a/src/t_hash.c b/src/t_hash.c index 5a8c17e90c..cef2c5f8e3 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -788,14 +788,25 @@ static void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpac } } - /*----------------------------------------------------------------------------- * Hash type commands *----------------------------------------------------------------------------*/ void hsetnxCommand(client *c) { robj *o; - if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; + long previous_element_number; + long current_element_number; + + o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) return; + if (o == NULL) { + o = createHashObject(); + dbAdd(c->db, c->argv[1], &o); + previous_element_number = 0; + c->db->lists_number_of_elements++; + } else { + previous_element_number = hashTypeLength(o); + } if (hashTypeExists(o, c->argv[2]->ptr)) { addReply(c, shared.czero); @@ -806,19 +817,33 @@ void hsetnxCommand(client *c) { notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); server.dirty++; addReply(c, shared.cone); + current_element_number = previous_element_number + 1; + /* TO DO: update INFO KEYSIZES */ + updateHashKeySizeArray(c, previous_element_number, current_element_number); } } void hsetCommand(client *c) { int i, created = 0; robj *o; + long previous_element_number; + long current_element_number; if ((c->argc % 2) == 1) { addReplyErrorArity(c); return; } - if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; + o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) return; + if (o == NULL) { + o = createHashObject(); + dbAdd(c->db, c->argv[1], &o); + previous_element_number = 0; + c->db->hashes_number_of_elements++; + } else { + previous_element_number = hashTypeLength(o); + } hashTypeTryConversion(o, c->argv, 2, c->argc - 1); for (i = 2; i < c->argc; i += 2) created += !hashTypeSet(o, c->argv[i]->ptr, c->argv[i + 1]->ptr, HASH_SET_COPY); @@ -827,6 +852,10 @@ void hsetCommand(client *c) { notifyKeyspaceEvent(NOTIFY_HASH, "hset", c->argv[1], c->db->id); server.dirty += (c->argc - 2) / 2; + if (created > 0) { + current_element_number = previous_element_number + created; + updateHashKeySizeArray(c, previous_element_number, current_element_number); + } /* HMSET (deprecated) and HSET return value is different. */ char *cmdname = c->argv[0]->ptr; if (cmdname[1] == 's' || cmdname[1] == 'S') { @@ -844,9 +873,20 @@ void hincrbyCommand(client *c) { sds new; unsigned char *vstr; unsigned int vlen; + long previous_element_number; + long current_element_number; if (getLongLongFromObjectOrReply(c, c->argv[3], &incr, NULL) != C_OK) return; - if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; + o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) return; + if (o == NULL) { + o = createHashObject(); + dbAdd(c->db, c->argv[1], &o); + previous_element_number = 0; + c->db->hashes_number_of_elements++; + } else { + previous_element_number = hashTypeLength(o); + } if (hashTypeGetValue(o, c->argv[2]->ptr, &vstr, &vlen, &value) == C_OK) { if (vstr) { if (string2ll((char *)vstr, vlen, &value) == 0) { @@ -871,6 +911,9 @@ void hincrbyCommand(client *c) { notifyKeyspaceEvent(NOTIFY_HASH, "hincrby", c->argv[1], c->db->id); server.dirty++; addReplyLongLong(c, value); + current_element_number = hashTypeLength(o); + /* TO DO: update INFO KEYSIZES */ + updateHashKeySizeArray(c, previous_element_number, current_element_number); } void hincrbyfloatCommand(client *c) { @@ -880,13 +923,25 @@ void hincrbyfloatCommand(client *c) { sds new; unsigned char *vstr; unsigned int vlen; + long previous_element_number; + long current_element_number; if (getLongDoubleFromObjectOrReply(c, c->argv[3], &incr, NULL) != C_OK) return; if (isnan(incr) || isinf(incr)) { addReplyError(c, "value is NaN or Infinity"); return; } - if ((o = hashTypeLookupWriteOrCreate(c, c->argv[1])) == NULL) return; + + o = lookupKeyWrite(c->db, c->argv[1]); + if (checkType(c, o, OBJ_HASH)) return; + if (o == NULL) { + o = createHashObject(); + dbAdd(c->db, c->argv[1], &o); + previous_element_number = 0; + c->db->hashes_number_of_elements++; + } else { + previous_element_number = hashTypeLength(o); + } if (hashTypeGetValue(o, c->argv[2]->ptr, &vstr, &vlen, &ll) == C_OK) { if (vstr) { if (string2ld((char *)vstr, vlen, &value) == 0) { @@ -914,6 +969,9 @@ void hincrbyfloatCommand(client *c) { notifyKeyspaceEvent(NOTIFY_HASH, "hincrbyfloat", c->argv[1], c->db->id); server.dirty++; addReplyBulkCBuffer(c, buf, len); + current_element_number = hashTypeLength(o); + /* TO DO: update INFO KEYSIZES */ + updateHashKeySizeArray(c, previous_element_number, current_element_number); /* Always replicate HINCRBYFLOAT as an HSET command with the final value * in order to make sure that differences in float precision or formatting @@ -972,9 +1030,12 @@ void hmgetCommand(client *c) { void hdelCommand(client *c) { robj *o; int j, deleted = 0, keyremoved = 0; + long previous_element_number; + long current_element_number; if ((o = lookupKeyWriteOrReply(c, c->argv[1], shared.czero)) == NULL || checkType(c, o, OBJ_HASH)) return; + previous_element_number = hashTypeLength(o); for (j = 2; j < c->argc; j++) { if (hashTypeDelete(o, c->argv[j]->ptr)) { deleted++; @@ -988,7 +1049,13 @@ void hdelCommand(client *c) { if (deleted) { signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH, "hdel", c->argv[1], c->db->id); - if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + if (keyremoved) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); + c->db->hashes_number_of_elements--; + } + current_element_number = previous_element_number - deleted; + /* TO DO: update INFO KEYSIZES */ + updateHashKeySizeArray(c, previous_element_number, current_element_number); server.dirty += deleted; } addReplyLongLong(c, deleted); diff --git a/src/t_list.c b/src/t_list.c index 9cafd60cf6..22d5b0cdaf 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -461,6 +461,8 @@ void listTypeDelRange(robj *subject, long start, long count) { * 'xx': push if key exists. */ void pushGenericCommand(client *c, int where, int xx) { int j; + long previous_element_number; + long current_element_number; robj *lobj = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c, lobj, OBJ_LIST)) return; @@ -472,6 +474,10 @@ void pushGenericCommand(client *c, int where, int xx) { lobj = createListListpackObject(); dbAdd(c->db, c->argv[1], &lobj); + previous_element_number = 0; + c->db->lists_number_of_elements++; + } else { + previous_element_number = listTypeLength(lobj); } listTypeTryConversionAppend(lobj, c->argv, 2, c->argc - 1, NULL, NULL); @@ -483,6 +489,9 @@ void pushGenericCommand(client *c, int where, int xx) { signalModifiedKey(c, c->db, c->argv[1]); char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; notifyKeyspaceEvent(NOTIFY_LIST, event, c->argv[1], c->db->id); + current_element_number = listTypeLength(lobj); + // displayUpdate(previous_element_number, current_element_number); + updateListKeySizeArray(c, previous_element_number, current_element_number); addReplyLongLong(c, listTypeLength(lobj)); } @@ -514,6 +523,8 @@ void linsertCommand(client *c) { listTypeIterator *iter; listTypeEntry entry; int inserted = 0; + long previous_element_number; + long current_element_number; if (strcasecmp(c->argv[2]->ptr, "after") == 0) { where = LIST_TAIL; @@ -532,8 +543,10 @@ void linsertCommand(client *c) { * the list twice (once to see if the value can be inserted and once * to do the actual insert), so we assume this value can be inserted * and convert the listpack to a regular list if necessary. */ + previous_element_number = listTypeLength(subject); listTypeTryConversionAppend(subject, c->argv, 4, 4, NULL, NULL); + /* Seek pivot from head to tail */ iter = listTypeInitIterator(subject, 0, LIST_TAIL); while (listTypeNext(iter, &entry)) { @@ -549,6 +562,8 @@ void linsertCommand(client *c) { signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST, "linsert", c->argv[1], c->db->id); server.dirty++; + current_element_number = previous_element_number + 1; + updateListKeySizeArray(c, previous_element_number, current_element_number); } else { /* Notify client of a failed insert */ addReplyLongLong(c, -1); @@ -634,7 +649,10 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long long rangestart = (where == LIST_HEAD) ? 0 : -rangelen; long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1; int reverse = (where == LIST_HEAD) ? 0 : 1; + long previous_element_number; + long current_element_number; + previous_element_number = llen; initDeferredReplyBuffer(c); /* We return key-name just once, and an array of elements */ addReplyArrayLen(c, 2); @@ -646,6 +664,8 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long /* Maintain the notifications and dirty. */ listElementsRemoved(c, key, where, o, rangelen, deleted); commitDeferredReplyBuffer(c, 1); + current_element_number = previous_element_number - rangelen; + updateListKeySizeArray(c, previous_element_number, current_element_number); } /* Extracted from `addListRangeReply()` to reply with a quicklist list. @@ -756,6 +776,8 @@ void popGenericCommand(client *c, int where) { int hascount = (c->argc == 3); long count = 0; robj *value; + long previous_element_number; + long current_element_number; if (c->argc > 3) { addReplyErrorArity(c); @@ -774,6 +796,7 @@ void popGenericCommand(client *c, int where) { return; } + previous_element_number = listTypeLength(o); if (!count) { /* Pop a single element. This is POP's original behavior that replies * with a bulk string. */ @@ -782,10 +805,11 @@ void popGenericCommand(client *c, int where) { serverAssert(value != NULL); addReplyBulk(c, value); decrRefCount(value); + current_element_number = previous_element_number - 1; } else { /* Pop a range of elements. An addition to the original POP command, * which replies with a multi-bulk. */ - long llen = listTypeLength(o); + long llen = previous_element_number; long rangelen = (count > llen) ? llen : count; long rangestart = (where == LIST_HEAD) ? 0 : -rangelen; long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1; @@ -796,7 +820,9 @@ void popGenericCommand(client *c, int where) { listTypeDelRange(o, rangestart, rangelen); listElementsRemoved(c, c->argv[1], where, o, rangelen, NULL); commitDeferredReplyBuffer(c, 1); + current_element_number = previous_element_number - rangelen; } + updateListKeySizeArray(c, previous_element_number, current_element_number); } /* Like popGenericCommand but work with multiple keys. @@ -866,6 +892,8 @@ void lrangeCommand(client *c) { void ltrimCommand(client *c) { robj *o; long start, end, llen, ltrim, rtrim; + long previous_element_number; + long current_element_number; if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) @@ -873,6 +901,7 @@ void ltrimCommand(client *c) { if ((o = lookupKeyWriteOrReply(c, c->argv[1], shared.ok)) == NULL || checkType(c, o, OBJ_LIST)) return; llen = listTypeLength(o); + previous_element_number = llen; /* convert negative indexes */ if (start < 0) start = llen + start; @@ -902,13 +931,15 @@ void ltrimCommand(client *c) { serverPanic("Unknown list encoding"); } + current_element_number = listTypeLength(o); notifyKeyspaceEvent(NOTIFY_LIST, "ltrim", c->argv[1], c->db->id); - if (listTypeLength(o) == 0) { + if (current_element_number == 0) { dbDelete(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } else { listTypeTryConversion(o, LIST_CONV_SHRINKING, NULL, NULL); } + updateListKeySizeArray(c, previous_element_number, current_element_number); signalModifiedKey(c, c->db, c->argv[1]); server.dirty += (ltrim + rtrim); addReply(c, shared.ok); @@ -1027,6 +1058,8 @@ void lremCommand(client *c) { obj = c->argv[3]; long toremove; long removed = 0; + long previous_element_number; + long current_element_number; if (getRangeLongFromObjectOrReply(c, c->argv[2], -LONG_MAX, LONG_MAX, &toremove, NULL) != C_OK) return; @@ -1041,6 +1074,8 @@ void lremCommand(client *c) { li = listTypeInitIterator(subject, 0, LIST_TAIL); } + previous_element_number = listTypeLength(subject); + current_element_number = previous_element_number; listTypeEntry entry; while (listTypeNext(li, &entry)) { if (listTypeEqual(&entry, obj)) { @@ -1054,6 +1089,7 @@ void lremCommand(client *c) { if (removed) { notifyKeyspaceEvent(NOTIFY_LIST, "lrem", c->argv[1], c->db->id); + current_element_number = listTypeLength(subject); if (listTypeLength(subject) == 0) { dbDelete(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); @@ -1061,6 +1097,7 @@ void lremCommand(client *c) { listTypeTryConversion(subject, LIST_CONV_SHRINKING, NULL, NULL); } signalModifiedKey(c, c->db, c->argv[1]); + updateListKeySizeArray(c, previous_element_number, current_element_number); } addReplyLongLong(c, removed); @@ -1103,6 +1140,9 @@ robj *getStringObjectFromListPosition(int position) { void lmoveGenericCommand(client *c, int wherefrom, int whereto) { robj *sobj, *value; + long s_previous_element_number; + long d_previous_element_number; + if ((sobj = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp])) == NULL || checkType(c, sobj, OBJ_LIST)) return; @@ -1116,12 +1156,22 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) { if (checkType(c, dobj, OBJ_LIST)) return; value = listTypePop(sobj, wherefrom); + s_previous_element_number = listTypeLength(sobj); + serverAssert(value); /* assertion for valgrind (avoid NPD) */ + if (dobj) { + d_previous_element_number = listTypeLength(dobj); + } else { + d_previous_element_number = 0; + } lmoveHandlePush(c, c->argv[2], dobj, value, whereto); listElementsRemoved(c, touchedkey, wherefrom, sobj, 1, NULL); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); + updateListKeySizeArray(c, s_previous_element_number, s_previous_element_number - 1); + updateListKeySizeArray(c, d_previous_element_number, d_previous_element_number + 1); + if (c->cmd->proc == blmoveCommand) { rewriteClientCommandVector(c, 5, shared.lmove, c->argv[1], c->argv[2], c->argv[3], c->argv[4]); @@ -1172,6 +1222,8 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i robj *key; mstime_t timeout; int j; + long previous_element_number; + long current_element_number; if (getTimeoutFromObjectOrReply(c, c->argv[timeout_idx], &timeout, UNIT_SECONDS) != C_OK) return; @@ -1201,9 +1253,12 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i return; } + previous_element_number = llen; /* Non empty list, this is like a normal [LR]POP. */ robj *value = listTypePop(o, where); serverAssert(value != NULL); + current_element_number = previous_element_number - 1; + updateListKeySizeArray(c, previous_element_number, current_element_number); listElementsRemoved(c, key, where, o, 1, NULL); addReplyArrayLen(c, 2); diff --git a/src/t_set.c b/src/t_set.c index 8eac0cd9e6..de0bad2fa2 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -597,6 +597,8 @@ robj *setTypeDup(robj *o) { void saddCommand(client *c) { robj *set; int j, added = 0; + long previous_element_number; + long current_element_number; set = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c, set, OBJ_SET)) return; @@ -604,8 +606,11 @@ void saddCommand(client *c) { if (set == NULL) { set = setTypeCreate(c->argv[2]->ptr, c->argc - 2); dbAdd(c->db, c->argv[1], &set); + previous_element_number = 0; + c->db->sets_number_of_elements++; } else { setTypeMaybeConvert(set, c->argc - 2); + previous_element_number = setTypeSize(set); } for (j = 2; j < c->argc; j++) { @@ -615,6 +620,9 @@ void saddCommand(client *c) { signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET, "sadd", c->argv[1], c->db->id); server.dirty += added; + current_element_number = previous_element_number + added; + /* TO DO: update INFO KEYSIZES */ + updateSetKeySizeArray(c, previous_element_number, current_element_number); } addReplyLongLong(c, added); } @@ -622,9 +630,12 @@ void saddCommand(client *c) { void sremCommand(client *c) { robj *set; int j, deleted = 0, keyremoved = 0; + long previous_element_number; + long current_element_number; if ((set = lookupKeyWriteOrReply(c, c->argv[1], shared.czero)) == NULL || checkType(c, set, OBJ_SET)) return; + previous_element_number = setTypeSize(set); for (j = 2; j < c->argc; j++) { if (setTypeRemove(set, c->argv[j]->ptr)) { deleted++; @@ -641,6 +652,9 @@ void sremCommand(client *c) { if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); server.dirty += deleted; } + current_element_number = previous_element_number - deleted; + /* TO DO: update INFO KEYSIZES */ + updateSetKeySizeArray(c, previous_element_number, current_element_number); addReplyLongLong(c, deleted); } @@ -949,6 +963,8 @@ void spopWithCountCommand(client *c) { void spopCommand(client *c) { robj *set, *ele; + long previous_element_number; + long current_element_number; if (c->argc == 3) { spopWithCountCommand(c); @@ -963,8 +979,12 @@ void spopCommand(client *c) { if ((set = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp])) == NULL || checkType(c, set, OBJ_SET)) return; + previous_element_number = setTypeSize(set); /* Pop a random element from the set */ ele = setTypePopRandom(set); + current_element_number = setTypeSize(set); + /* TO DO: update INFO KEYSIZES */ + updateSetKeySizeArray(c, previous_element_number, current_element_number); notifyKeyspaceEvent(NOTIFY_SET, "spop", c->argv[1], c->db->id); @@ -1606,6 +1626,27 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke setTypeReleaseIterator(si); server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) : decrRefCount(dstset); } else { + robj *t_obj = lookupKeyWrite(c->db, dstkey); + if (t_obj) { + if (t_obj->type == OBJ_STRING) { + updateStringKeySizeArray(c, stringObjectLen(t_obj), 0); + c->db->strings_number_of_elements--; + } else if (t_obj->type == OBJ_LIST) { + updateListKeySizeArray(c, listTypeLength(t_obj), 0); + c->db->lists_number_of_elements--; + } else if (t_obj->type == OBJ_SET) { + updateSetKeySizeArray(c, setTypeSize(t_obj), 0); + c->db->sets_number_of_elements--; + } else if (t_obj->type == OBJ_ZSET) { + updateZsetKeySizeArray(c, zsetLength(t_obj), 0); + c->db->zsets_number_of_elements--; + } else if (t_obj->type == OBJ_HASH) { + updateHashKeySizeArray(c, hashTypeLength(t_obj), 0); + c->db->hashes_number_of_elements--; + } else if (t_obj->type == OBJ_STREAM) { + } + } + /* If we have a target key where to store the resulting set * create this key with the result set inside */ if (setTypeSize(dstset) > 0) { @@ -1613,6 +1654,8 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke notifyKeyspaceEvent(NOTIFY_SET, op == SET_OP_UNION ? "sunionstore" : "sdiffstore", dstkey, c->db->id); server.dirty++; addReplyLongLong(c, setTypeSize(dstset)); + updateSetKeySizeArray(c, 0, setTypeSize(dstset)); + c->db->sets_number_of_elements++; } else { if (dbDelete(c->db, dstkey)) { server.dirty++; diff --git a/src/t_string.c b/src/t_string.c index ef3e4bccde..688c37b32b 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -100,6 +100,8 @@ void setGenericCommand(client *c, long long milliseconds = 0; /* initialized to avoid any harmness warning */ int found = 0; int setkey_flags = 0; + long previous_str_len; + long curr_str_len; if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) { return; @@ -152,9 +154,20 @@ void setGenericCommand(client *c, * created again. */ setkey_flags |= ((flags & OBJ_KEEPTTL) || expire) ? SETKEY_KEEPTTL : 0; setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST; + if (found) { + // previous_str_len = stringObjectLen(existing_value); + previous_str_len = 0; + } else { + previous_str_len = 0; + c->db->strings_number_of_elements++; + } + setKey(c, c->db, key, &val, setkey_flags); if (expire) val = setExpire(c, c->db, key, milliseconds); + // curr_str_len = stringObjectLen(val); + curr_str_len = 0; + updateStringKeySizeArray(c, previous_str_len, curr_str_len); /* By setting the reallocated value back into argv, we can avoid duplicating * a large string value when adding it to the db. */ @@ -498,8 +511,19 @@ void getexCommand(client *c) { } void getdelCommand(client *c) { + robj *o; + long previous_str_len; + initDeferredReplyBuffer(c); - if (getGenericCommand(c) == C_ERR) return; + o = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]); + if (checkType(c, o, OBJ_STRING)) return; + if (o != NULL) { + addReplyBulk(c, o); + previous_str_len = stringObjectLen(o); + } else { + previous_str_len = 0; + } + if (dbSyncDelete(c->db, c->argv[1])) { /* Propagate as DEL command */ rewriteClientCommandVector(c, 2, shared.del, c->argv[1]); @@ -507,18 +531,33 @@ void getdelCommand(client *c) { notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); server.dirty++; } + updateStringKeySizeArray(c, previous_str_len, 0); commitDeferredReplyBuffer(c, 1); } void getsetCommand(client *c) { + robj *o; + long previous_str_len; + long curr_str_len; + initDeferredReplyBuffer(c); - if (getGenericCommand(c) == C_ERR) return; + o = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]); + if (checkType(c, o, OBJ_STRING)) return; + if (o != NULL) { + addReplyBulk(c, o); + previous_str_len = stringObjectLen(o); + } else { + previous_str_len = 0; + } + c->argv[2] = tryObjectEncoding(c->argv[2]); setKey(c, c->db, c->argv[1], &c->argv[2], 0); incrRefCount(c->argv[2]); notifyKeyspaceEvent(NOTIFY_STRING, "set", c->argv[1], c->db->id); server.dirty++; + curr_str_len = stringObjectLen(c->argv[2]); + updateStringKeySizeArray(c, previous_str_len, curr_str_len); commitDeferredReplyBuffer(c, 1); /* Propagate as SET command */ rewriteClientCommandArgument(c, 0, shared.set); @@ -528,6 +567,8 @@ void setrangeCommand(client *c) { robj *o; long offset; sds value = c->argv[3]->ptr; + long previous_str_len; + long curr_str_len; if (getLongFromObjectOrReply(c, c->argv[2], &offset, NULL) != C_OK) return; @@ -548,8 +589,9 @@ void setrangeCommand(client *c) { /* Return when the resulting string exceeds allowed size */ if (checkStringLength(c, offset, sdslen(value)) != C_OK) return; - + previous_str_len = 0; o = createObject(OBJ_STRING, sdsnewlen(NULL, offset + sdslen(value))); + c->db->strings_number_of_elements++; dbAdd(c->db, c->argv[1], &o); } else { size_t olen; @@ -564,6 +606,7 @@ void setrangeCommand(client *c) { addReplyLongLong(c, olen); return; } + previous_str_len = olen; /* Return when the resulting string exceeds allowed size */ if (checkStringLength(c, offset, sdslen(value)) != C_OK) @@ -578,6 +621,8 @@ void setrangeCommand(client *c) { signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING, "setrange", c->argv[1], c->db->id); server.dirty++; + curr_str_len = sdslen(o->ptr); + updateStringKeySizeArray(c, previous_str_len, curr_str_len); addReplyLongLong(c, sdslen(o->ptr)); } @@ -779,6 +824,8 @@ void incrbyfloatCommand(client *c) { void appendCommand(client *c) { size_t totlen; robj *o, *append; + long previous_str_len; + long curr_str_len; o = lookupKeyWrite(c->db, c->argv[1]); if (o == NULL) { @@ -787,6 +834,8 @@ void appendCommand(client *c) { dbAdd(c->db, c->argv[1], &c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); + previous_str_len = 0; + c->db->strings_number_of_elements++; } else { /* Key exists, check type */ if (checkType(c, o, OBJ_STRING)) @@ -797,6 +846,7 @@ void appendCommand(client *c) { if (checkStringLength(c, stringObjectLen(o), sdslen(append->ptr)) != C_OK) return; + previous_str_len = stringObjectLen(o); /* Append the value */ o = dbUnshareStringValue(c->db, c->argv[1], o); o->ptr = sdscatlen(o->ptr, append->ptr, sdslen(append->ptr)); @@ -805,6 +855,9 @@ void appendCommand(client *c) { signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING, "append", c->argv[1], c->db->id); server.dirty++; + curr_str_len = totlen; + /* TO DO: update INFO KEYSIZES */ + updateStringKeySizeArray(c, previous_str_len, curr_str_len); addReplyLongLong(c, totlen); } diff --git a/src/t_zset.c b/src/t_zset.c index 3930a7181d..6ddc9bc513 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1754,6 +1754,8 @@ static void zaddGenericCommand(client *c, int flags) { robj *key = c->argv[1]; robj *zobj; sds ele; + long previous_element_number = 0; + long current_element_number = 0; double score = 0, *scores = NULL; int j, elements, ch = 0; size_t maxelelen = 0; @@ -1841,8 +1843,10 @@ static void zaddGenericCommand(client *c, int flags) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ zobj = zsetTypeCreate(elements, maxelelen); dbAdd(c->db, key, &zobj); + previous_element_number = 0; } else { zsetTypeMaybeConvert(zobj, elements, maxelelen); + previous_element_number = zsetLength(zobj); } for (j = 0; j < elements; j++) { @@ -1868,6 +1872,10 @@ static void zaddGenericCommand(client *c, int flags) { signalModifiedKey(c, c->db, key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } + server.dirty += (added + updated); + current_element_number = zsetLength(zobj); + /* TO DO: update INFO KEYSIZES */ + updateZsetKeySizeArray(c, previous_element_number, current_element_number); reply_to_client: if (reply_err) { @@ -1896,9 +1904,12 @@ void zremCommand(client *c) { robj *key = c->argv[1]; robj *zobj; int deleted = 0, keyremoved = 0, j; + long previous_element_number; + long current_element_number; if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) return; + previous_element_number = zsetLength(zobj); for (j = 2; j < c->argc; j++) { if (zsetDel(zobj, c->argv[j]->ptr)) deleted++; if (zsetLength(zobj) == 0) { @@ -1910,9 +1921,14 @@ void zremCommand(client *c) { if (deleted) { notifyKeyspaceEvent(NOTIFY_ZSET, "zrem", key, c->db->id); - if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + if (keyremoved) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + c->db->zsets_number_of_elements--; + } signalModifiedKey(c, c->db, key); server.dirty += deleted; + current_element_number = previous_element_number - deleted; + updateZsetKeySizeArray(c, previous_element_number, current_element_number); } addReplyLongLong(c, deleted); } @@ -1934,6 +1950,8 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) { zlexrangespec lexrange; long start, end, llen; char *notify_type = NULL; + long previous_element_number; + long current_element_number; /* Step 1: Parse the range. */ if (rangetype == ZRANGE_RANK) { @@ -1960,6 +1978,7 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) { /* Step 2: Lookup & range sanity checks if needed. */ if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL || checkType(c, zobj, OBJ_ZSET)) goto cleanup; + previous_element_number = zsetLength(zobj); if (rangetype == ZRANGE_RANK) { /* Sanitize indexes. */ llen = zsetLength(zobj); @@ -2010,8 +2029,13 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) { if (deleted) { signalModifiedKey(c, c->db, key); notifyKeyspaceEvent(NOTIFY_ZSET, notify_type, key, c->db->id); - if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + if (keyremoved) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + c->db->zsets_number_of_elements--; + } server.dirty += deleted; + current_element_number = previous_element_number - deleted; + updateZsetKeySizeArray(c, previous_element_number, current_element_number); } addReplyLongLong(c, deleted);