Skip to content

fix: prevent crash on KEYLOCK_ACQUIRED check for NO_KEY_TRANSACTIONAL commands #5185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 29, 2025
242 changes: 242 additions & 0 deletions tests/dragonfly/search_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Search correctness should be ensured with unit tests.
"""
import pytest
import random
import redis
import threading
import time
from redis import asyncio as aioredis
from .utility import *
from . import dfly_args
Expand Down Expand Up @@ -558,3 +562,241 @@ def make_car(producer, description, speed):

for index in client.execute_command("FT._LIST"):
client.ft(index.decode()).dropindex()


@dfly_args({"proactor_threads": 4})
async def test_search_race_condition_threaded_issue_5173(async_client: aioredis.Redis, df_server):
"""
Alternative test using threading for true parallelism to reproduce race condition.
This version uses actual threads instead of asyncio tasks for maximum concurrency.
"""
import threading
import time
import redis # Sync redis client for threading

# Index name for the test
index_name = "myRaceIdx"

# Shared flags
server_crashed = threading.Event()
stop_threads = threading.Event()

def writer_thread():
"""Thread that writes documents"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
i = 1
while not stop_threads.is_set() and not server_crashed.is_set():
try:
client.hset(
f"doc:{i}",
mapping={
"content": f"text data item {i} for race condition test",
"tags": f"tagA,tagB,{i % 10}",
"numeric_field": i,
},
)
i += 1
# Reset counter to avoid huge numbers
if i > 100000:
i = 1
# Small delay to prevent CPU overload
time.sleep(0.001)
except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
# Expected during index operations
time.sleep(0.001)
finally:
client.close()

def reindexer_thread():
"""Thread that drops and recreates index"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
while not stop_threads.is_set() and not server_crashed.is_set():
try:
# Drop index
client.execute_command("FT.DROPINDEX", index_name)

# Recreate index
client.execute_command(
"FT.CREATE",
index_name,
"ON",
"HASH",
"PREFIX",
"1",
"doc:",
"SCHEMA",
"content",
"TEXT",
"SORTABLE",
"tags",
"TAG",
"SORTABLE",
"numeric_field",
"NUMERIC",
"SORTABLE",
)
# Small delay between index operations
time.sleep(0.01)
except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
# Expected during concurrent operations
time.sleep(0.01)
finally:
client.close()

def searcher_thread():
"""Thread that performs search operations"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
while not stop_threads.is_set() and not server_crashed.is_set():
try:
random_val = random.randint(1, 20000)
random_tag_val = random.randint(0, 20)

# Various search operations
client.execute_command("FT.SEARCH", index_name, f"@content:item{random_val}")

client.execute_command(
"FT.SEARCH", index_name, f"@tags:{{tagA|tagC|tag{random_tag_val}}}"
)

client.execute_command(
"FT.SEARCH", index_name, f"@numeric_field:[{random_val} {random_val + 100}]"
)

# Small delay between search operations
time.sleep(0.005)

except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
# Expected during index operations
time.sleep(0.005)
finally:
client.close()

def writer2_thread():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not need to clone the same function twice, you can just launch a second thread with the same function.

"""Thread that writes additional documents (like writer2.sh)"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
i = 50001 # Start from different range to avoid conflicts
while not stop_threads.is_set() and not server_crashed.is_set():
try:
client.hset(
f"doc:{i}",
mapping={
"content": f"additional text data item {i} for race condition test",
"tags": f"tagC,tagD,{i % 15}",
"numeric_field": i + 1000,
},
)
i += 1
# Reset counter to avoid huge numbers
if i > 150000:
i = 50001
# Small delay to prevent CPU overload
time.sleep(0.001)
except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
# Expected during index operations
time.sleep(0.001)
finally:
client.close()

def deleter_thread():
"""Thread that deletes documents (like deleter.sh)"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
while not stop_threads.is_set() and not server_crashed.is_set():
try:
# Delete random documents
doc_id = random.randint(1, 100000)
client.delete(f"doc:{doc_id}")

# Small delay between deletions
time.sleep(0.01)
except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
# Expected during index operations
time.sleep(0.01)
finally:
client.close()

def health_monitor():
"""Monitor server health"""
client = redis.Redis(port=df_server.port, decode_responses=True)
try:
while not stop_threads.is_set() and not server_crashed.is_set():
try:
client.ping()
time.sleep(1.0)
except (redis.ConnectionError, OSError):
server_crashed.set()
break
except Exception:
time.sleep(0.5)
finally:
client.close()

# Start all threads
threads = [
threading.Thread(target=health_monitor, name="health"),
threading.Thread(target=writer_thread, name="writer"),
threading.Thread(target=writer2_thread, name="writer2"),
threading.Thread(target=deleter_thread, name="deleter"),
threading.Thread(target=reindexer_thread, name="reindexer"),
threading.Thread(target=searcher_thread, name="searcher"),
]

for thread in threads:
thread.start()

try:
# Wait for threads to complete or server to crash
start_time = time.time()
last_progress_time = start_time

while time.time() - start_time < 30: # 30 seconds
if server_crashed.is_set():
break

# Check if any thread died unexpectedly
alive_threads = [t for t in threads if t.is_alive()]
if len(alive_threads) < len(threads):
break

current_time = time.time()
if current_time - last_progress_time >= 30:
elapsed = current_time - start_time
remaining = 600 - elapsed
last_progress_time = current_time

time.sleep(0.1)

# Stop all threads
stop_threads.set()

# Wait for threads to finish
for thread in threads:
thread.join(timeout=5.0)

except Exception as e:
stop_threads.set()

# Check if server crashed
if server_crashed.is_set():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the server crashed, the testing framework should identify it automatically.

pytest.fail(
"Dragonfly server crashed during threaded race condition test - issue #5173 reproduced!"
)
Loading