Skip to content

Commit ba86b3a

Browse files
committed
Add a new insideout mode
The idea is that instead of saving all dependent cache keys in conj set we put a simple random stamp in those and store a checksum of all related conj stamps along with cache data. This makes cache reads more complicated - `MGET` key + conj keys, validate stamps checksum. However, we no longer need to store potentially big conj sets and invalidation becomes faster, including model level invalidation. It also removes strong link between conj and cache keys, i.e. loss of conj keys no longer leads to a stale cache, instead we will simply drop the key on next read. This opens easier way for maxmemory and cluster. So: - more friendly to `maxmemory`, even assumes that, see #143 - eliminates issues with big conj sets and long invalidation, see #340, - `reapconjs` is not needed with it, see #323, #434 Followups: - docs - remove `CACHEOPS_LRU` as it's superseeded by this generally - make insideout default or even drop the old ways?
1 parent 26fda80 commit ba86b3a

File tree

12 files changed

+245
-106
lines changed

12 files changed

+245
-106
lines changed

README.rst

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ Setup redis connection and enable caching for desired models:
7575
... # everything else is passed to Sentinel()
7676
}
7777
78-
# To use your own redis client class,
79-
# should be compatible or subclass cacheops.redis.CacheopsRedis
78+
# Use your own redis client class, should be compatible or subclass redis.StrictRedis
8079
CACHEOPS_CLIENT_CLASS = 'your.redis.ClientClass'
8180
8281
CACHEOPS = {

bench.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def run_benchmarks(tests):
1616
if 'h' in flags:
1717
print(HEADER_TEMPLATE % name)
1818
time = bench_test(test)
19-
print('%-18s time: %.2fms' % (name, time * 1000))
19+
print('%-18s time: %.3fms' % (name, time * 1000))
2020

2121
def bench_test(test):
2222
prepared = None
@@ -81,6 +81,9 @@ def bench_once(test, prepared=None):
8181
db_name = connection.creation.create_test_db(verbosity=verbosity, autoclobber=not interactive)
8282
call_command('loaddata', *fixtures, **{'verbosity': verbosity})
8383

84+
from cacheops.redis import redis_client
85+
redis_client.flushdb()
86+
8487
from tests.bench import TESTS # import is here because it executes queries
8588
if selector:
8689
tests = [(name, test) for name, test in TESTS if select(name)]

cacheops/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class Defaults:
1616
CACHEOPS = {}
1717
CACHEOPS_PREFIX = lambda query: ''
1818
CACHEOPS_LRU = False
19+
CACHEOPS_INSIDEOUT = False
1920
CACHEOPS_CLIENT_CLASS = None
2021
CACHEOPS_DEGRADE_ON_FAILURE = False
2122
CACHEOPS_SENTINEL = {}

cacheops/getset.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
from contextlib import contextmanager
2+
import hashlib
3+
import json
4+
5+
from .conf import settings
6+
from .redis import redis_client, handle_connection_failure, load_script
7+
from .transaction import transaction_states
8+
9+
10+
LOCK_TIMEOUT = 60
11+
12+
13+
@handle_connection_failure
14+
def cache_thing(prefix, cache_key, data, cond_dnfs, timeout, dbs=(), precall_key='',
15+
expected_checksum=''):
16+
"""
17+
Writes data to cache and creates appropriate invalidators.
18+
19+
If precall_key is not the empty string, the data will only be cached if the
20+
precall_key is set to avoid caching stale data.
21+
22+
If expected_checksum is set and does not match the actual one then cache won't be written.
23+
"""
24+
# Could have changed after last check, sometimes superficially
25+
if transaction_states.is_dirty(dbs):
26+
return
27+
28+
if settings.CACHEOPS_INSIDEOUT:
29+
schemes = dnfs_to_schemes(cond_dnfs)
30+
conj_keys = dnfs_to_conj_keys(prefix, cond_dnfs)
31+
return load_script('cache_thing_insideout')(
32+
keys=[prefix, cache_key],
33+
args=[
34+
settings.CACHEOPS_SERIALIZER.dumps(data),
35+
json.dumps(schemes),
36+
json.dumps(conj_keys),
37+
timeout,
38+
expected_checksum,
39+
]
40+
)
41+
else:
42+
if prefix and precall_key == "":
43+
precall_key = prefix
44+
load_script('cache_thing', settings.CACHEOPS_LRU)(
45+
keys=[prefix, cache_key, precall_key],
46+
args=[
47+
settings.CACHEOPS_SERIALIZER.dumps(data),
48+
json.dumps(cond_dnfs, default=str),
49+
timeout
50+
]
51+
)
52+
53+
54+
@contextmanager
55+
def getting(key, cond_dnfs, prefix, lock=False):
56+
if not lock:
57+
yield _read(key, cond_dnfs, prefix)
58+
else:
59+
locked = False
60+
try:
61+
data = _get_or_lock(key, cond_dnfs, prefix)
62+
locked = data is None
63+
yield data
64+
finally:
65+
if locked:
66+
_release_lock(key)
67+
68+
69+
@handle_connection_failure
70+
def _read(key, cond_dnfs, prefix):
71+
if not settings.CACHEOPS_INSIDEOUT:
72+
return redis_client.get(key)
73+
74+
conj_keys = dnfs_to_conj_keys(prefix, cond_dnfs)
75+
coded, *stamps = redis_client.mget(key, *conj_keys)
76+
if coded is None or coded == b'LOCK':
77+
return coded
78+
79+
if None in stamps:
80+
redis_client.unlink(key)
81+
return
82+
83+
stamp_checksum, data = coded.split(b':', 1)
84+
if stamp_checksum.decode() != join_stamps(stamps):
85+
redis_client.unlink(key)
86+
return None
87+
88+
return data
89+
90+
91+
@handle_connection_failure
92+
def _get_or_lock(key, cond_dnfs, prefix):
93+
_lock = redis_client.register_script("""
94+
local locked = redis.call('set', KEYS[1], 'LOCK', 'nx', 'ex', ARGV[1])
95+
if locked then
96+
redis.call('del', KEYS[2])
97+
end
98+
return locked
99+
""")
100+
signal_key = key + ':signal'
101+
102+
while True:
103+
data = _read(key, cond_dnfs, prefix)
104+
if data is None:
105+
if _lock(keys=[key, signal_key], args=[LOCK_TIMEOUT]):
106+
return None
107+
elif data != b'LOCK':
108+
return data
109+
110+
# No data and not locked, wait
111+
redis_client.brpoplpush(signal_key, signal_key, timeout=LOCK_TIMEOUT)
112+
113+
114+
@handle_connection_failure
115+
def _release_lock(key):
116+
_unlock = redis_client.register_script("""
117+
if redis.call('get', KEYS[1]) == 'LOCK' then
118+
redis.call('del', KEYS[1])
119+
end
120+
redis.call('lpush', KEYS[2], 1)
121+
redis.call('expire', KEYS[2], 1)
122+
""")
123+
signal_key = key + ':signal'
124+
_unlock(keys=[key, signal_key])
125+
126+
127+
# Key manipulation helpers
128+
129+
def join_stamps(stamps):
130+
return hashlib.sha1(b' '.join(stamps)).hexdigest()
131+
132+
133+
def dnfs_to_conj_keys(prefix, cond_dnfs):
134+
def _conj_cache_key(table, conj):
135+
conj_str = '&'.join(f'{field}={val}' for field, val in sorted(conj.items()))
136+
return f'{prefix}conj:{table}:{conj_str}'
137+
138+
return [_conj_cache_key(table, conj) for table, disj in cond_dnfs.items()
139+
for conj in disj]
140+
141+
def dnfs_to_schemes(cond_dnfs):
142+
return {table: [",".join(sorted(conj)) for conj in disj]
143+
for table, disj in cond_dnfs.items() if disj}

cacheops/invalidation.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
22
import threading
3-
from funcy import memoize, post_processing, ContextDecorator, decorator
3+
from funcy import memoize, post_processing, ContextDecorator, decorator, walk_values
44
from django.db import DEFAULT_DB_ALIAS
55
from django.db.models.expressions import F, Expression
66

@@ -27,12 +27,17 @@ def skip_on_no_invalidation(call):
2727
def invalidate_dict(model, obj_dict, using=DEFAULT_DB_ALIAS):
2828
if no_invalidation.active or not settings.CACHEOPS_ENABLED:
2929
return
30+
3031
model = model._meta.concrete_model
3132
prefix = get_prefix(_cond_dnfs=[(model._meta.db_table, list(obj_dict.items()))], dbs=[using])
32-
load_script('invalidate')(keys=[prefix], args=[
33-
model._meta.db_table,
34-
json.dumps(obj_dict, default=str)
35-
])
33+
34+
if settings.CACHEOPS_INSIDEOUT:
35+
script = 'invalidate_insideout'
36+
serialized_dict = json.dumps(walk_values(str, obj_dict))
37+
else:
38+
script = 'invalidate'
39+
serialized_dict = json.dumps(obj_dict, default=str)
40+
load_script(script)(keys=[prefix], args=[model._meta.db_table, serialized_dict])
3641
cache_invalidated.send(sender=model, obj_dict=obj_dict)
3742

3843

@@ -60,9 +65,12 @@ def invalidate_model(model, using=DEFAULT_DB_ALIAS):
6065
prefix = get_prefix(tables=[model._meta.db_table], dbs=[using])
6166
conjs_keys = redis_client.keys('%sconj:%s:*' % (prefix, model._meta.db_table))
6267
if conjs_keys:
63-
cache_keys = redis_client.sunion(conjs_keys)
64-
keys = list(cache_keys) + conjs_keys
65-
redis_client.unlink(*keys)
68+
if settings.CACHEOPS_INSIDEOUT:
69+
redis_client.unlink(*conjs_keys)
70+
else:
71+
cache_keys = redis_client.sunion(conjs_keys)
72+
keys = list(cache_keys) + conjs_keys
73+
redis_client.unlink(*keys)
6674
cache_invalidated.send(sender=model, obj_dict=None)
6775

6876

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
local prefix = KEYS[1]
2+
local key = KEYS[2]
3+
local data = ARGV[1]
4+
local schemes = cjson.decode(ARGV[2])
5+
local conj_keys = cjson.decode(ARGV[3])
6+
local timeout = tonumber(ARGV[4])
7+
local expected_checksum = ARGV[5]
8+
9+
-- Ensure schemes are known
10+
for db_table, _schemes in pairs(schemes) do
11+
redis.call('sadd', prefix .. 'schemes:' .. db_table, unpack(_schemes))
12+
end
13+
14+
-- Fill in invalidators and collect stamps
15+
local stamps = {}
16+
local rnd = tostring(math.random()) -- A new value for empty stamps
17+
for _, conj_key in ipairs(conj_keys) do
18+
local stamp = redis.call('set', conj_key, rnd, 'nx', 'get') or rnd
19+
table.insert(stamps, stamp)
20+
-- NOTE: an invalidator should live longer than any key it references.
21+
-- So we update its ttl on every key if needed.
22+
redis.call('expire', conj_key, timeout, 'gt')
23+
end
24+
25+
-- Write data to cache along with a checksum of the stamps to see if any of them changed
26+
local all_stamps = table.concat(stamps, ' ')
27+
local stamp_checksum = redis.sha1hex(all_stamps)
28+
29+
if expected_checksum ~= '' and stamp_checksum ~= expected_checksum then
30+
-- Cached data was invalidated during the function call. The data is
31+
-- stale and should not be cached.
32+
return stamp_checksum -- This one is used for keep_fresh implementation
33+
end
34+
35+
redis.call('set', key, stamp_checksum .. ':' .. data, 'ex', timeout)

cacheops/lua/invalidate_insideout.lua

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
local prefix = KEYS[1]
2+
local db_table = ARGV[1]
3+
local obj = cjson.decode(ARGV[2])
4+
5+
local conj_cache_key = function (db_table, scheme, obj)
6+
local parts = {}
7+
for field in string.gmatch(scheme, "[^,]+") do
8+
-- All obj values are strings, we still use tostring() in case obj does not contain field
9+
table.insert(parts, field .. '=' .. tostring(obj[field]))
10+
end
11+
12+
return prefix .. 'conj:' .. db_table .. ':' .. table.concat(parts, '&')
13+
end
14+
15+
-- Drop conj keys
16+
local conj_keys = {}
17+
local schemes = redis.call('smembers', prefix .. 'schemes:' .. db_table)
18+
for _, scheme in ipairs(schemes) do
19+
redis.call('unlink', conj_cache_key(db_table, scheme, obj))
20+
end

cacheops/query.py

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import sys
2-
import json
32
import threading
43
from random import random
54

@@ -24,8 +23,8 @@
2423
from .conf import model_profile, settings, ALL_OPS
2524
from .utils import monkey_mix, stamp_fields, get_cache_key, cached_view_fab, family_has_profile
2625
from .utils import md5
26+
from .getset import cache_thing, getting
2727
from .sharding import get_prefix
28-
from .redis import redis_client, handle_connection_failure, load_script
2928
from .tree import dnfs
3029
from .invalidation import invalidate_obj, invalidate_dict, skip_on_no_invalidation
3130
from .transaction import transaction_states
@@ -37,29 +36,6 @@
3736
_local_get_cache = {}
3837

3938

40-
@handle_connection_failure
41-
def cache_thing(prefix, cache_key, data, cond_dnfs, timeout, dbs=(), precall_key=''):
42-
"""
43-
Writes data to cache and creates appropriate invalidators.
44-
45-
If precall_key is not the empty string, the data will only be cached if the
46-
precall_key is set to avoid caching stale data.
47-
"""
48-
# Could have changed after last check, sometimes superficially
49-
if transaction_states.is_dirty(dbs):
50-
return
51-
if prefix and precall_key == "":
52-
precall_key = prefix
53-
load_script('cache_thing', settings.CACHEOPS_LRU)(
54-
keys=[prefix, cache_key, precall_key],
55-
args=[
56-
settings.CACHEOPS_SERIALIZER.dumps(data),
57-
json.dumps(cond_dnfs, default=str),
58-
timeout
59-
]
60-
)
61-
62-
6339
def cached_as(*samples, timeout=None, extra=None, lock=None, keep_fresh=False):
6440
"""
6541
Caches results of a function and invalidates them same way as given queryset(s).
@@ -91,7 +67,7 @@ def _get_queryset(sample):
9167

9268
querysets = lmap(_get_queryset, samples)
9369
dbs = list({qs.db for qs in querysets})
94-
cond_dnfs = join_with(lcat, map(dnfs, querysets))
70+
cond_dnfs = join_with(lcat, map(dnfs, querysets)) # TODO: use cached version?
9571
qs_keys = [qs._cache_key(prefix=False) for qs in querysets]
9672
if timeout is None:
9773
timeout = min(qs._cacheprofile['timeout'] for qs in querysets)
@@ -108,12 +84,22 @@ def wrapper(*args, **kwargs):
10884
extra_val = extra(*args, **kwargs) if callable(extra) else extra
10985
cache_key = prefix + 'as:' + get_cache_key(func, args, kwargs, qs_keys, extra_val)
11086

111-
with redis_client.getting(cache_key, lock=lock) as cache_data:
87+
with getting(cache_key, cond_dnfs, prefix, lock=lock) as cache_data:
11288
cache_read.send(sender=None, func=func, hit=cache_data is not None)
11389
if cache_data is not None:
11490
return settings.CACHEOPS_SERIALIZER.loads(cache_data)
11591
else:
116-
if keep_fresh:
92+
precall_key = ''
93+
expected_checksum = ''
94+
if keep_fresh and settings.CACHEOPS_INSIDEOUT:
95+
# The conj stamps should not be dropped while we calculate the function.
96+
# But being filled in concurrently is a normal concurrent cache write.
97+
# However, if they are filled in and then dropped, we cannot detect that.
98+
# Unless we fill them ourselves and get expected checksum now. We also need
99+
# to fill in schemes, so we just reuse the cache_thing().
100+
expected_checksum = cache_thing(prefix, cache_key, '', cond_dnfs, timeout,
101+
dbs=dbs, expected_checksum='never match')
102+
elif keep_fresh:
117103
# We call this "asp" for "as precall" because this key is
118104
# cached before the actual function is called. We randomize
119105
# the key to prevent falsely thinking the key was not
@@ -126,12 +112,10 @@ def wrapper(*args, **kwargs):
126112
# only if it remains valid before, during, and after the
127113
# call, the result can be cached and returned.
128114
cache_thing(prefix, precall_key, 'PRECALL', cond_dnfs, timeout, dbs=dbs)
129-
else:
130-
precall_key = ''
131115

132116
result = func(*args, **kwargs)
133117
cache_thing(prefix, cache_key, result, cond_dnfs, timeout, dbs=dbs,
134-
precall_key=precall_key)
118+
precall_key=precall_key, expected_checksum=expected_checksum)
135119
return result
136120

137121
return wrapper
@@ -275,7 +259,7 @@ def _fetch_all(self):
275259
cache_key = self._cache_key()
276260
lock = self._cacheprofile['lock']
277261

278-
with redis_client.getting(cache_key, lock=lock) as cache_data:
262+
with getting(cache_key, self._cond_dnfs, self._prefix, lock=lock) as cache_data:
279263
cache_read.send(sender=self.model, func=None, hit=cache_data is not None)
280264
if cache_data is not None:
281265
self._result_cache = settings.CACHEOPS_SERIALIZER.loads(cache_data)

0 commit comments

Comments
 (0)