-
Notifications
You must be signed in to change notification settings - Fork 1k
fix: prevent crash on KEYLOCK_ACQUIRED check for NO_KEY_TRANSACTIONAL commands #5185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
974d172
ea887a9
56d84e1
fa049fb
29e9778
e68c027
9c3b773
2aecceb
17bba4e
e66d563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,10 @@ | |
Search correctness should be ensured with unit tests. | ||
""" | ||
import pytest | ||
import random | ||
import redis | ||
import threading | ||
import time | ||
from redis import asyncio as aioredis | ||
from .utility import * | ||
from . import dfly_args | ||
|
@@ -558,3 +562,241 @@ def make_car(producer, description, speed): | |
|
||
for index in client.execute_command("FT._LIST"): | ||
client.ft(index.decode()).dropindex() | ||
|
||
|
||
@dfly_args({"proactor_threads": 4}) | ||
async def test_search_race_condition_threaded_issue_5173(async_client: aioredis.Redis, df_server): | ||
""" | ||
Alternative test using threading for true parallelism to reproduce race condition. | ||
This version uses actual threads instead of asyncio tasks for maximum concurrency. | ||
""" | ||
import threading | ||
import time | ||
import redis # Sync redis client for threading | ||
romange marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Index name for the test | ||
index_name = "myRaceIdx" | ||
|
||
# Shared flags | ||
server_crashed = threading.Event() | ||
romange marked this conversation as resolved.
Show resolved
Hide resolved
|
||
stop_threads = threading.Event() | ||
|
||
def writer_thread(): | ||
"""Thread that writes documents""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
romange marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
i = 1 | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
client.hset( | ||
f"doc:{i}", | ||
mapping={ | ||
"content": f"text data item {i} for race condition test", | ||
"tags": f"tagA,tagB,{i % 10}", | ||
"numeric_field": i, | ||
}, | ||
) | ||
i += 1 | ||
# Reset counter to avoid huge numbers | ||
if i > 100000: | ||
i = 1 | ||
# Small delay to prevent CPU overload | ||
time.sleep(0.001) | ||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
# Expected during index operations | ||
time.sleep(0.001) | ||
finally: | ||
client.close() | ||
|
||
def reindexer_thread(): | ||
"""Thread that drops and recreates index""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
try: | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
# Drop index | ||
client.execute_command("FT.DROPINDEX", index_name) | ||
|
||
# Recreate index | ||
client.execute_command( | ||
"FT.CREATE", | ||
index_name, | ||
"ON", | ||
"HASH", | ||
"PREFIX", | ||
"1", | ||
"doc:", | ||
"SCHEMA", | ||
"content", | ||
"TEXT", | ||
"SORTABLE", | ||
"tags", | ||
"TAG", | ||
"SORTABLE", | ||
"numeric_field", | ||
"NUMERIC", | ||
"SORTABLE", | ||
) | ||
# Small delay between index operations | ||
time.sleep(0.01) | ||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
# Expected during concurrent operations | ||
time.sleep(0.01) | ||
finally: | ||
client.close() | ||
|
||
def searcher_thread(): | ||
"""Thread that performs search operations""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
try: | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
random_val = random.randint(1, 20000) | ||
random_tag_val = random.randint(0, 20) | ||
|
||
# Various search operations | ||
client.execute_command("FT.SEARCH", index_name, f"@content:item{random_val}") | ||
|
||
client.execute_command( | ||
"FT.SEARCH", index_name, f"@tags:{{tagA|tagC|tag{random_tag_val}}}" | ||
) | ||
|
||
client.execute_command( | ||
"FT.SEARCH", index_name, f"@numeric_field:[{random_val} {random_val + 100}]" | ||
) | ||
|
||
# Small delay between search operations | ||
time.sleep(0.005) | ||
|
||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
# Expected during index operations | ||
time.sleep(0.005) | ||
finally: | ||
client.close() | ||
|
||
def writer2_thread(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you do not need to clone the same function twice, you can just launch a second thread with the same function. |
||
"""Thread that writes additional documents (like writer2.sh)""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
try: | ||
i = 50001 # Start from different range to avoid conflicts | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
client.hset( | ||
f"doc:{i}", | ||
mapping={ | ||
"content": f"additional text data item {i} for race condition test", | ||
"tags": f"tagC,tagD,{i % 15}", | ||
"numeric_field": i + 1000, | ||
}, | ||
) | ||
i += 1 | ||
# Reset counter to avoid huge numbers | ||
if i > 150000: | ||
i = 50001 | ||
# Small delay to prevent CPU overload | ||
time.sleep(0.001) | ||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
# Expected during index operations | ||
time.sleep(0.001) | ||
finally: | ||
client.close() | ||
|
||
def deleter_thread(): | ||
"""Thread that deletes documents (like deleter.sh)""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
try: | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
# Delete random documents | ||
doc_id = random.randint(1, 100000) | ||
client.delete(f"doc:{doc_id}") | ||
|
||
# Small delay between deletions | ||
time.sleep(0.01) | ||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
# Expected during index operations | ||
time.sleep(0.01) | ||
finally: | ||
client.close() | ||
|
||
def health_monitor(): | ||
"""Monitor server health""" | ||
client = redis.Redis(port=df_server.port, decode_responses=True) | ||
try: | ||
while not stop_threads.is_set() and not server_crashed.is_set(): | ||
try: | ||
client.ping() | ||
time.sleep(1.0) | ||
except (redis.ConnectionError, OSError): | ||
server_crashed.set() | ||
break | ||
except Exception: | ||
time.sleep(0.5) | ||
finally: | ||
client.close() | ||
|
||
# Start all threads | ||
threads = [ | ||
threading.Thread(target=health_monitor, name="health"), | ||
threading.Thread(target=writer_thread, name="writer"), | ||
threading.Thread(target=writer2_thread, name="writer2"), | ||
threading.Thread(target=deleter_thread, name="deleter"), | ||
threading.Thread(target=reindexer_thread, name="reindexer"), | ||
threading.Thread(target=searcher_thread, name="searcher"), | ||
] | ||
|
||
for thread in threads: | ||
thread.start() | ||
|
||
try: | ||
# Wait for threads to complete or server to crash | ||
start_time = time.time() | ||
last_progress_time = start_time | ||
|
||
while time.time() - start_time < 30: # 30 seconds | ||
if server_crashed.is_set(): | ||
break | ||
|
||
# Check if any thread died unexpectedly | ||
alive_threads = [t for t in threads if t.is_alive()] | ||
if len(alive_threads) < len(threads): | ||
break | ||
|
||
current_time = time.time() | ||
if current_time - last_progress_time >= 30: | ||
elapsed = current_time - start_time | ||
remaining = 600 - elapsed | ||
last_progress_time = current_time | ||
|
||
time.sleep(0.1) | ||
|
||
# Stop all threads | ||
stop_threads.set() | ||
|
||
# Wait for threads to finish | ||
for thread in threads: | ||
thread.join(timeout=5.0) | ||
|
||
except Exception as e: | ||
stop_threads.set() | ||
|
||
# Check if server crashed | ||
if server_crashed.is_set(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the server crashed, the testing framework should identify it automatically. |
||
pytest.fail( | ||
"Dragonfly server crashed during threaded race condition test - issue #5173 reproduced!" | ||
) |
Uh oh!
There was an error while loading. Please reload this page.