Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 218 additions & 32 deletions integration/test_fulltext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from valkeytestframework.conftest import resource_port_tracker
from ft_info_parser import FTInfoParser
from valkeytestframework.util import waiters
import threading
import time
from utils import IndexingTestHelper

"""
Expand Down Expand Up @@ -274,7 +276,7 @@ def test_ft_create_and_info(self):
assert tag_attr.get("SEPARATOR") == "|"

# Validate backfill fields
waiters.wait_for_equal(lambda: IndexingTestHelper.is_backfill_complete_on_node(client, "idx4"), True, timeout=5)
IndexingTestHelper.wait_for_backfill_complete_on_node(client, "idx4")

# Add validation checks for specific fields
assert parser.num_docs == 0, f"num_docs should be zero"
Expand Down Expand Up @@ -355,14 +357,13 @@ def test_text_per_field_search(self):
result = client.execute_command("FT.SEARCH", "products2", '@desc2:"1 2 3 4 5 6 7 8 9 10"')
assert result[0] == 0

def test_default_ingestion_pipeline(self):
def test_default_tokenization(self):
"""
Test comprehensive ingestion pipeline: FT.CREATE → HSET → FT.SEARCH with full tokenization
Test FT.CREATE → HSET → FT.SEARCH with full tokenization
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE idx ON HASH SCHEMA content TEXT")
client.execute_command("HSET", "doc:1", "content", "The quick-running searches are finding EFFECTIVE results!")
client.execute_command("HSET", "doc:2", "content", "But slow searches aren't working...")

# List of queries with pass/fail expectations
test_cases = [
Expand All @@ -383,36 +384,36 @@ def test_default_ingestion_pipeline(self):
else:
assert result[0] == 0, f"Failed: {description}"

def test_multi_text_field(self):
@pytest.mark.skip(reason="TODO: ingest original words when stemming enabled")
def test_stemming(self):
"""
Test different TEXT field configs in same index
Test text index NOSTEM option
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE idx ON HASH SCHEMA title TEXT content TEXT NOSTEM")
client.execute_command("HSET", "doc:1", "title", "running fast", "content", "running quickly")

expected_value = {
b'title': b'running fast',
b'content': b'running quickly'
}
expected = [1, b'doc:1', [b'content', b'running quickly', b'title', b'running fast']]

result = client.execute_command("FT.SEARCH", "idx", '@title:"run"')
actual_fields = dict(zip(result[2][::2], result[2][1::2]))
assert actual_fields == expected_value
# We can find stems on 'title'
assert client.execute_command("FT.SEARCH", "idx", '@title:"run"') == expected

result = client.execute_command("FT.SEARCH", "idx", '@content:"run"')
assert result[0] == 0 # Should not find (NOSTEM)
# We cannot find stems on 'content' with NOSTEM
assert client.execute_command("FT.SEARCH", "idx", '@content:"run"') == [0]

# We can find original words in both cases
assert client.execute_command("FT.SEARCH", "idx", '@title:"running"') == expected # TODO: fails here
assert client.execute_command("FT.SEARCH", "idx", '@content:"running"') == expected

def test_custom_stopwords(self):
"""
End-to-end test: FT.CREATE STOPWORDS config actually filters custom stop words in search
Test FT.CREATE STOPWORDS option filters out custom stop words
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE idx ON HASH STOPWORDS 2 the and SCHEMA content TEXT")
client.execute_command("HSET", "doc:1", "content", "the cat and dog are good")

# Stop words should not be findable

result = client.execute_command("FT.SEARCH", "idx", '@content:"and"')
assert result[0] == 0 # Stop word "and" filtered out

Expand All @@ -422,23 +423,9 @@ def test_custom_stopwords(self):
assert result[1] == b'doc:1'
assert result[2] == [b'content', b"the cat and dog are good"]

def test_nostem(self):
"""
End-to-end test: FT.CREATE NOSTEM config actually affects stemming in search
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE idx ON HASH NOSTEM SCHEMA content TEXT")
client.execute_command("HSET", "doc:1", "content", "running quickly")

# With NOSTEM, exact forms should be findable
result = client.execute_command("FT.SEARCH", "idx", '@content:"running"')
assert result[0] == 1 # Exact form "running" found
assert result[1] == b'doc:1'
assert result[2] == [b'content', b"running quickly"]

def test_custom_punctuation(self):
"""
Test PUNCTUATION directive configures custom tokenization separators
Test FT.CREATE PUNCTUATION directive configures custom tokenization separators
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE idx ON HASH PUNCTUATION . SCHEMA content TEXT")
Expand All @@ -453,3 +440,202 @@ def test_custom_punctuation(self):
# @ NOT configured as separator - should not be able with split words
result = client.execute_command("FT.SEARCH", "idx", '@content:"test"')
assert result[0] == 0

def test_add_update_delete_documents_single_client(self):
"""
Tests we properly ingest added, updated, and deleted documents from a single client
"""
client: Valkey = self.server.get_new_client()
client.execute_command("FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "content", "TEXT")
num_docs = 5

# Add
for i in range(num_docs):
client.execute_command("HSET", f"doc:{i}", "content", f"What a cool document{i}")
result = client.execute_command("FT.SEARCH", "idx", "@content:document*")
assert result[0] == num_docs

# Update
for i in range(num_docs):
client.execute_command("HSET", f"doc:{i}", "content", f"What a cool doc{i}")
result = client.execute_command("FT.SEARCH", "idx", "@content:document*")
assert result[0] == 0
result = client.execute_command("FT.SEARCH", "idx", "@content:doc*")
assert result[0] == num_docs

# Delete
for i in range(num_docs):
client.execute_command("DEL", f"doc:{i}")
result = client.execute_command("FT.SEARCH", "idx", "@content:doc*")
assert result[0] == 0

def test_add_update_delete_documents_multi_client(self):
"""
Tests we properly ingest added, updated, and deleted documents from multiple clients

TODO: To ensure concurrent ingestion, add a debug config to pause updates from the
waiting room and then let them index in batches. Otherwise, we're dependent on
this Python test sending them faster than the server is cutting indexing batches.
"""

def perform_concurrent_searches(clients, num_clients, searches, phase_name):
"""
Helper function to perform concurrent searches across multiple clients and validate consistency

Args:
clients: List of client connections
num_clients: Number of clients to use
searches: List of (query, description) tuples to execute
phase_name: Name of the phase for error reporting (ADD/UPDATE/DELETE)
"""
search_results = {}
def concurrent_search(client_id):
client = clients[client_id]
client_results = []
for query, desc in searches:
result = client.execute_command("FT.SEARCH", "idx", query)
client_results.append((desc, result[0])) # Store description and count
search_results[client_id] = client_results

threads = []
for client_id in range(num_clients):
thread = threading.Thread(target=concurrent_search, args=(client_id,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# Validate concurrent search results are consistent
expected_results = search_results[0] # Use first client as reference
for client_id in range(1, num_clients):
assert search_results[client_id] == expected_results, f"{phase_name}: Search results inconsistent between clients 0 and {client_id}"

# Setup
num_clients = 50
docs_per_client = 50
clients = [self.server.get_new_client() for _ in range(num_clients)]

# Create the index
clients[0].execute_command("FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "content", "TEXT")
IndexingTestHelper.wait_for_backfill_complete_on_node(clients[0], "idx")

# Phase 1: Concurrent ADD
def add_documents(client_id):
client = clients[client_id]
for i in range(docs_per_client):
# Longer content to increase ingestion processing time
content = f"client{client_id} document doc{i} original content with many additional words to process during indexing. " \
f"This extended text includes various terms like analysis, processing, indexing, searching, and retrieval. " \
f"The purpose is to create substantial content that requires more computational effort during text analysis. " \
f"Additional keywords include: database, storage, performance, optimization, concurrent, threading, synchronization. " \
f"More descriptive text about document {i} from client {client_id} with original data and content."
client.execute_command("HSET", f"doc:{client_id}_{i}", "content", content)

threads = []
for client_id in range(num_clients):
thread = threading.Thread(target=add_documents, args=(client_id,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# Validate ADD phase with concurrent searching
client = clients[0]
total_docs = num_clients * docs_per_client

result = client.execute_command("FT.SEARCH", "idx", "@content:document")
assert result[0] == total_docs, f"ADD: Expected {total_docs} documents with 'document', got {result[0]}"

result = client.execute_command("FT.SEARCH", "idx", "@content:origin") # "original" stems to "origin"
assert result[0] == total_docs, f"ADD: Expected {total_docs} documents with 'origin', got {result[0]}"

# Concurrent search phase after ADD
add_searches = [
("@content:document", "document"),
("@content:origin", "origin"),
("@content:analysis", "analysis"),
("@content:process*", "process*"),
("@content:database", "database"),
("@content:concurrent", "concurrent")
]
perform_concurrent_searches(clients, num_clients, add_searches, "ADD")

# Phase 2: Concurrent UPDATE
def update_documents(client_id):
client = clients[client_id]
for i in range(docs_per_client):
# Longer updated content to increase ingestion processing time
content = f"client{client_id} document doc{i} updated content with comprehensive text for thorough indexing analysis. " \
f"This modified version contains different terminology including: revision, modification, alteration, enhancement. " \
f"The updated document now features expanded vocabulary for testing concurrent update operations effectively. " \
f"Technical terms added: algorithm, computation, execution, validation, verification, testing, debugging. " \
f"Enhanced description of document {i} from client {client_id} with updated information and revised content."
client.execute_command("HSET", f"doc:{client_id}_{i}", "content", content)

threads = []
for client_id in range(num_clients):
thread = threading.Thread(target=update_documents, args=(client_id,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# Validate UPDATE phase with concurrent searching
result = client.execute_command("FT.SEARCH", "idx", "@content:origin") # "original" stems to "origin"
assert result[0] == 0, f"UPDATE: Expected 0 documents with 'origin', got {result[0]}"

result = client.execute_command("FT.SEARCH", "idx", "@content:updat") # "updated" stems to "updat"
assert result[0] == total_docs, f"UPDATE: Expected {total_docs} documents with 'updat', got {result[0]}"

# Concurrent search phase after UPDATE
update_searches = [
("@content:document", "document"),
("@content:updat", "updat"),
("@content:revision", "revision"),
("@content:modif*", "modif*"),
("@content:algorithm", "algorithm"),
("@content:validation", "validation")
]
perform_concurrent_searches(clients, num_clients, update_searches, "UPDATE")

# Phase 3: Concurrent DELETE
def delete_documents(client_id):
client = clients[client_id]
for i in range(docs_per_client // 2): # Delete half the documents
client.execute_command("DEL", f"doc:{client_id}_{i}")

threads = []
for client_id in range(num_clients):
thread = threading.Thread(target=delete_documents, args=(client_id,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# Validate DELETE phase with concurrent searching
remaining_docs = total_docs // 2

result = client.execute_command("FT.SEARCH", "idx", "@content:updat") # "updated" stems to "updat"
assert result[0] == remaining_docs, f"DELETE: Expected {remaining_docs} documents with 'updat', got {result[0]}"

result = client.execute_command("FT.SEARCH", "idx", "@content:document")
assert result[0] == remaining_docs, f"DELETE: Expected {remaining_docs} documents with 'document', got {result[0]}"

# Concurrent search phase after DELETE
delete_searches = [
("@content:document", "document"),
("@content:updat", "updat"),
("@content:revision", "revision"),
("@content:algorithm", "algorithm"),
("@content:validation", "validation"),
("@content:enhanced", "enhanced")
]
perform_concurrent_searches(clients, num_clients, delete_searches, "DELETE")

def test_suffix_search(self):
# TODO
pass
6 changes: 5 additions & 1 deletion integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from valkey.client import Valkey
from valkey import ResponseError
from ft_info_parser import FTInfoParser
from valkeytestframework.util import waiters

class IndexingTestHelper:
"""
Expand Down Expand Up @@ -51,6 +52,10 @@ def is_backfill_complete_on_node(client: Valkey, index_name: str) -> bool:
parser = IndexingTestHelper.get_ft_info(client, index_name)
return parser.is_backfill_complete()

@staticmethod
def wait_for_backfill_complete_on_node(client: Valkey, index_name: str) -> bool:
"""Check if backfill is complete on a single node."""
waiters.wait_for_true(lambda: IndexingTestHelper.is_backfill_complete_on_node(client, index_name))

@staticmethod
def is_indexing_complete_cluster(client: Valkey, index_name: str) -> bool:
Expand All @@ -61,7 +66,6 @@ def is_indexing_complete_cluster(client: Valkey, index_name: str) -> bool:
@staticmethod
def wait_for_indexing_complete_on_all_nodes(clients: list, index_name: str):
"""Wait for indexing to complete on all provided nodes."""
from valkeytestframework.util.waiters import wait_for_true

def check_all_nodes_complete():
return all(
Expand Down
5 changes: 5 additions & 0 deletions src/index_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ void IndexSchema::SyncProcessMutation(ValkeyModuleCtx *ctx,
MutatedAttributes &mutated_attributes,
const InternedStringPtr &key) {
vmsdk::WriterMutexLock lock(&time_sliced_mutex_);
if (text_index_schema_) {
// Always clean up indexed words from all text attributes of the key up
// front
text_index_schema_->DeleteKeyData(key);
}
for (auto &attribute_data_itr : mutated_attributes) {
const auto itr = attributes_.find(attribute_data_itr.first);
if (itr == attributes_.end()) {
Expand Down
Loading
Loading