|
| 1 | +from rediscluster import RedisCluster |
| 2 | +import threading |
| 3 | +from time import sleep |
| 4 | + |
| 5 | +""" |
| 6 | +This file will show the difference and how to use the READONLY feature to offload READ specific commands |
| 7 | +to replica nodes in your cluster. The script will do two runs with 10 sets of commands each in a threaded environment |
| 8 | +both with read_from_replica feature turned off and turned on so you can simulate both cases and test out your code |
| 9 | +and ensure that it works before opting in to that feature. |
| 10 | +
|
| 11 | +The absolute best way to show what node is used inside the pipeline is to add a print(node) here |
| 12 | +
|
| 13 | +# pipeline.py |
| 14 | +def _send_cluster_command(...): |
| 15 | + ... |
| 16 | + slot = self._determine_slot(*c.args) |
| 17 | + node = self.connection_pool.get_node_by_slot(slot, self.read_from_replicas and c.args[0] in READ_COMMANDS) |
| 18 | + print(node) |
| 19 | + ... |
| 20 | +
|
| 21 | +and when you run this test script it will show you what node is used in both cases and the first scenario it should show |
| 22 | +only "master" as the node type all commands will be sent to. In the second run with read_from_replica=True it should |
| 23 | +be a mix of "master" and "slave". |
| 24 | +""" |
| 25 | + |
| 26 | + |
| 27 | +def test_run(read_from_replica): |
| 28 | + print(f"########\nStarting test run with read_from_replica={read_from_replica}") |
| 29 | + rc = RedisCluster(host="127.0.0.1", port=7000, decode_responses=True, read_from_replicas=read_from_replica) |
| 30 | + |
| 31 | + print(rc.set("foo1", "bar")) |
| 32 | + print(rc.set("foo2", "bar")) |
| 33 | + print(rc.set("foo3", "bar")) |
| 34 | + print(rc.set("foo4", "bar")) |
| 35 | + print(rc.set("foo5", "bar")) |
| 36 | + print(rc.set("foo6", "bar")) |
| 37 | + print(rc.set("foo7", "bar")) |
| 38 | + print(rc.set("foo8", "bar")) |
| 39 | + print(rc.set("foo9", "bar")) |
| 40 | + |
| 41 | + print(rc.get("foo1")) |
| 42 | + print(rc.get("foo2")) |
| 43 | + print(rc.get("foo3")) |
| 44 | + print(rc.get("foo4")) |
| 45 | + print(rc.get("foo5")) |
| 46 | + print(rc.get("foo6")) |
| 47 | + print(rc.get("foo7")) |
| 48 | + print(rc.get("foo8")) |
| 49 | + print(rc.get("foo9")) |
| 50 | + |
| 51 | + def thread_func(num): |
| 52 | + # sleep(0.1) |
| 53 | + pipe = rc.pipeline(read_from_replicas=read_from_replica) |
| 54 | + pipe.set(f"foo{num}", "bar") |
| 55 | + pipe.get(f"foo{num}") |
| 56 | + pipe.get(f"foo{num}") |
| 57 | + pipe.get(f"foo{num}") |
| 58 | + pipe.get(f"foo{num}") |
| 59 | + pipe.get(f"foo{num}") |
| 60 | + pipe.get(f"foo{num}") |
| 61 | + pipe.get(f"foo{num}") |
| 62 | + pipe.get(f"foo{num}") |
| 63 | + print(threading.current_thread().getName(), pipe.execute()) |
| 64 | + |
| 65 | + for i in range(0, 15): |
| 66 | + x = threading.Thread(target=thread_func, args=(i,), name=f"{i}") |
| 67 | + x.start() |
| 68 | + |
| 69 | + |
| 70 | +test_run(False) |
| 71 | +sleep(2) |
| 72 | +test_run(True) |
0 commit comments