diff --git a/rma/application.py b/rma/application.py index 6dc74ed..967db5c 100644 --- a/rma/application.py +++ b/rma/application.py @@ -124,7 +124,7 @@ def run(self): is_all = self.behaviour == 'all' with Scanner(redis=self.redis, match=self.match, accepted_types=self.types) as scanner: keys = defaultdict(list) - for v in scanner.scan(limit=self.limit): + for v in scanner.scan(limit=self.limit, calculate_sizes=(is_all or self.behaviour == 'ram')): keys[v["type"]].append(v) if self.isTextFormat: diff --git a/rma/redis.py b/rma/redis.py index 331b134..a317437 100644 --- a/rma/redis.py +++ b/rma/redis.py @@ -98,16 +98,16 @@ def ziplist_overhead(size): return Jemalloc.align(12 + 21 * size) -def size_of_ziplist_aligned_string(value): - # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string - # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5% - try: - num_value = int(value) - return Jemalloc.align(size_of_pointer_fn()) - except ValueError: - pass +def size_of_ziplist_aligned_string(value_length): +# # Looks like we need something more complex here. We use calculation as 21 bytes per entry + len of string +# # or len of pointer. Redis use more RAM saving policy but after aligning it has infelicity ~3-5% +# try: +# num_value = int(value) +# return Jemalloc.align(size_of_pointer_fn()) +# except ValueError: +# pass - return Jemalloc.align(len(value)) + return Jemalloc.align(value_length) def linkedlist_overhead(): @@ -121,9 +121,8 @@ def linkedlist_entry_overhead(): return 3*size_of_pointer_fn() -def size_of_linkedlist_aligned_string(value): - return Jemalloc.align(linkedlist_entry_overhead() + len(value)) - +def size_of_linkedlist_aligned_string(value_length): + return Jemalloc.align(linkedlist_entry_overhead() + value_length) def intset_overhead(size): # typedef struct intset { diff --git a/rma/rule/List.py b/rma/rule/List.py index b2ecadd..0a167aa 100644 --- a/rma/rule/List.py +++ b/rma/rule/List.py @@ -15,20 +15,20 @@ def __init__(self, info, redis): key_name = info["name"] self.encoding = info['encoding'] self.ttl = info['ttl'] + self.entry_lengths = info['len'] - self.values = redis.lrange(key_name, 0, -1) - self.count = len(self.values) + self.count = len(self.entry_lengths) import time time.sleep(0.001) - used_bytes_iter, min_iter, max_iter = tee((len(x) for x in self.values), 3) + used_bytes_iter, min_iter, max_iter = tee(self.entry_lengths, 3) if self.encoding == REDIS_ENCODING_ID_LINKEDLIST: self.system = dict_overhead(self.count) - self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.values)) + self.valueAlignedBytes = sum(map(size_of_linkedlist_aligned_string, self.entry_lengths)) elif self.encoding == REDIS_ENCODING_ID_ZIPLIST or self.encoding == REDIS_ENCODING_ID_QUICKLIST: # Undone `quicklist` self.system = ziplist_overhead(self.count) - self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.values)) + self.valueAlignedBytes = sum(map(size_of_ziplist_aligned_string, self.entry_lengths)) else: raise Exception('Panic', 'Unknown encoding %s in %s' % (self.encoding, key_name)) diff --git a/rma/rule/ValueString.py b/rma/rule/ValueString.py index 4e7affb..c8b4809 100644 --- a/rma/rule/ValueString.py +++ b/rma/rule/ValueString.py @@ -36,7 +36,7 @@ def __init__(self, redis, info, use_debug=True): self.logger = logging.getLogger(__name__) if self.encoding == REDIS_ENCODING_ID_INT: - self.useful_bytes = self.get_int_encoded_bytes(redis, key_name) + self.useful_bytes = info["len"] self.free_bytes = 0 self.aligned = size_of_aligned_string_by_size(self.useful_bytes, encoding=self.encoding) elif self.encoding == REDIS_ENCODING_ID_EMBSTR or self.encoding == REDIS_ENCODING_ID_RAW: @@ -45,7 +45,7 @@ def __init__(self, redis, info, use_debug=True): self.useful_bytes = sdslen_response['val_sds_len'] self.free_bytes = sdslen_response['val_sds_avail'] else: - self.useful_bytes = size_of_aligned_string_by_size(redis.strlen(key_name), self.encoding) + self.useful_bytes = info["len"] self.free_bytes = 0 # INFO Rewrite this to support Redis >= 3.2 sds dynamic header sds_len = 8 + self.useful_bytes + self.free_bytes + 1 diff --git a/rma/scanner.py b/rma/scanner.py index 79258c7..440ac0a 100644 --- a/rma/scanner.py +++ b/rma/scanner.py @@ -28,14 +28,42 @@ def __init__(self, redis, match="*", accepted_types=None): self.pipeline_mode = False self.resolve_types_script = self.redis.register_script(""" local ret = {} + for i = 1, #KEYS do local type = redis.call("TYPE", KEYS[i]) - local encoding = redis.call("OBJECT", "ENCODING",KEYS[i]) - local ttl = redis.call("TTL", KEYS[i]) - ret[i] = {type["ok"], encoding, ttl} + + if type["ok"] == 'none' then + ret[i] = {} + else + local encoding = redis.call("OBJECT", "ENCODING",KEYS[i]) + local ttl = redis.call("TTL", KEYS[i]) + local len = 0 + + -- "calculate_sizes" option + if ARGV[1] == '1' then + if encoding == 'embstr' or encoding == 'raw' then + len = redis.call("STRLEN", KEYS[i]) + elseif encoding == 'int' then + local val = redis.call("GET", KEYS[i]) + if tonumber(val) < %(REDIS_SHARED_INTEGERS)d then + len = 0 + else + len = %(SIZE_OF_POINTER_FN)d + end + elseif encoding == 'linkedlist' or encoding == 'quicklist' or encoding == 'skiplist' then + local items = redis.call("lrange", KEYS[i], 0, -1) + len = {} + for j = 1, #items do + len[j] = #items[j] + end + end + end + + ret[i] = {type["ok"], encoding, ttl, len} + end end return cmsgpack.pack(ret) - """) + """ % { "SIZE_OF_POINTER_FN": size_of_pointer_fn(), "REDIS_SHARED_INTEGERS": REDIS_SHARED_INTEGERS }) def __enter__(self): return self @@ -43,7 +71,7 @@ def __enter__(self): def __exit__(self, *exc): return False - def batch_scan(self, count=1000, batch_size=3000): + def batch_scan(self, count=1000, batch_size=3000, calculate_sizes=True): ret = [] for key in self.redis.scan_iter(self.match, count=count): ret.append(key) @@ -51,12 +79,12 @@ def batch_scan(self, count=1000, batch_size=3000): yield from self.resolve_types(ret) if len(ret): - yield from self.resolve_types(ret) + yield from self.resolve_types(ret, calculate_sizes) - def resolve_types(self, ret): + def resolve_types(self, ret, calculate_sizes=True): if not self.pipeline_mode: try: - key_with_types = msgpack.unpackb(self.resolve_types_script(ret)) + key_with_types = msgpack.unpackb(self.resolve_types_script(ret, [( '1' if calculate_sizes else '0' )] )) except ResponseError as e: if "CROSSSLOT" not in repr(e): raise e @@ -79,14 +107,16 @@ def resolve_with_pipe(self, ret): key_with_types = [{'type': x, 'encoding': y, 'ttl': z} for x, y, z in chunker(pipe.execute(), 3)] return key_with_types - def scan(self, limit=1000): + def scan(self, limit=1000, calculate_sizes=True): with tqdm(total=min(limit, self.redis.dbsize()), desc="Match {0}".format(self.match), miniters=1000) as progress: total = 0 for key_tuple in self.batch_scan(): key_info, key_name = key_tuple - key_type, key_encoding, key_ttl = key_info + if len(key_info) == 0: + continue # key deleted between scan and check + key_type, key_encoding, key_ttl, key_len = key_info if not key_name: self.logger.warning( '\r\nWarning! Scan iterator return key with empty name `` and type %s', key_type) @@ -98,7 +128,8 @@ def scan(self, limit=1000): 'name': key_name.decode("utf-8", "replace"), 'type': to_id, 'encoding': redis_encoding_str_to_id(key_encoding), - 'ttl': key_ttl + 'ttl': key_ttl, + 'len': key_len } yield key_info_obj