From b59d461df09bb692368f708ef85d553f95172772 Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Thu, 2 Oct 2025 20:22:25 +0000 Subject: [PATCH 1/7] In progress Signed-off-by: Brennan Cathcart --- src/index_schema.cc | 5 ++ src/indexes/text.cc | 96 ++++++++++++++++++++-------------- src/indexes/text/lexer.cc | 18 ++++--- src/indexes/text/lexer.h | 13 +++-- src/indexes/text/text_index.cc | 96 +++++++++++++++++++++++++++++++--- src/indexes/text/text_index.h | 21 ++++++-- testing/lexer_test.cc | 12 ++--- 7 files changed, 194 insertions(+), 67 deletions(-) diff --git a/src/index_schema.cc b/src/index_schema.cc index f86c471f..e9f792af 100644 --- a/src/index_schema.cc +++ b/src/index_schema.cc @@ -424,6 +424,11 @@ void IndexSchema::SyncProcessMutation(ValkeyModuleCtx *ctx, MutatedAttributes &mutated_attributes, const InternedStringPtr &key) { vmsdk::WriterMutexLock lock(&time_sliced_mutex_); + if (text_index_schema_) { + // Always clean up indexed words from all text attributes of the key up + // front + text_index_schema_->DeleteKeyData(key); + } for (auto &attribute_data_itr : mutated_attributes) { const auto itr = attributes_.find(attribute_data_itr.first); if (itr == attributes_.end()) { diff --git a/src/indexes/text.cc b/src/indexes/text.cc index bcbbd9b0..a0c61a0b 100644 --- a/src/indexes/text.cc +++ b/src/indexes/text.cc @@ -28,53 +28,73 @@ Text::Text(const data_model::TextIndex& text_index_proto, absl::StatusOr Text::AddRecord(const InternedStringPtr& key, absl::string_view data) { - valkey_search::indexes::text::Lexer lexer; - - auto tokens = - lexer.Tokenize(data, text_index_schema_->GetPunctuationBitmap(), - text_index_schema_->GetStemmer(), !no_stem_, - min_stem_size_, text_index_schema_->GetStopWordsSet()); - - if (!tokens.ok()) { - if (tokens.status().code() == absl::StatusCode::kInvalidArgument) { - return false; // UTF-8 errors → hash_indexing_failures - } - return tokens.status(); - } - - for (uint32_t position = 0; position < tokens->size(); ++position) { - const auto& token = (*tokens)[position]; - text_index_schema_->GetTextIndex()->prefix_.Mutate( - token, - [&](std::optional> existing) - -> std::optional> { - std::shared_ptr postings; - if (existing.has_value()) { - postings = existing.value(); - } else { - // Create new Postings object with schema configuration - bool save_positions = text_index_schema_->GetWithOffsets(); - uint8_t num_text_fields = text_index_schema_->GetNumTextFields(); - postings = std::make_shared(save_positions, - num_text_fields); - } - - postings->InsertPosting(key, text_field_number_, position); - return postings; - }); - } - return true; + // TODO: Key Tracking + + // valkey_search::indexes::text::Lexer lexer; + + // auto tokens = + // lexer.Tokenize(data, text_index_schema_->GetPunctuationBitmap(), + // text_index_schema_->GetStemmer(), !no_stem_, + // min_stem_size_, text_index_schema_->GetStopWordsSet()); + + // if (!tokens.ok()) { + // if (tokens.status().code() == absl::StatusCode::kInvalidArgument) { + // return false; // UTF-8 errors → hash_indexing_failures + // } + // return tokens.status(); + // } + + // for (uint32_t position = 0; position < tokens->size(); ++position) { + // const auto& token = (*tokens)[position]; + // text_index_schema_->GetTextIndex()->prefix_.Mutate( + // token, + // [&](std::optional> existing) + // -> std::optional> { + // std::shared_ptr postings; + // if (existing.has_value()) { + // postings = existing.value(); + // } else { + // // Create new Postings object with schema configuration + // bool save_positions = text_index_schema_->GetWithOffsets(); + // uint8_t num_text_fields = text_index_schema_->GetNumTextFields(); + // postings = std::make_shared(save_positions, + // num_text_fields); + // } + + // postings->InsertPosting(key, text_field_number_, position); + // return postings; + // }); + // } + + return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); } absl::StatusOr Text::RemoveRecord(const InternedStringPtr& key, DeletionType deletion_type) { - throw std::runtime_error("Text::RemoveRecord not implemented"); + // if (text_field_number_ == 0 && deletion_type == DeletionType::kRecord) { + // // Call API to clean up the whole key + // } else { + // // Call API to clean up the key for this particular text attribute + // // - how do we know what words? Could search through whole tree to find the ones with this field + // // Can also think of a trick to remove whole tree and re-ingest it + // } + // throw std::runtime_error("Text::RemoveRecord not implemented"); + + // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey() at + // this point so don't need to touch the index structures + + // TODO: key tracking + return true; } absl::StatusOr Text::ModifyRecord(const InternedStringPtr& key, absl::string_view data) { - throw std::runtime_error("Text::ModifyRecord not implemented"); + // TODO: key tracking + + // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey() at + // this point so we simply add the new key data + return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); } int Text::RespondWithInfo(ValkeyModuleCtx* ctx) const { diff --git a/src/indexes/text/lexer.cc b/src/indexes/text/lexer.cc index d7f825df..3701e5ac 100644 --- a/src/indexes/text/lexer.cc +++ b/src/indexes/text/lexer.cc @@ -14,9 +14,14 @@ namespace valkey_search::indexes::text { +Lexer::Lexer(const char* language) + : stemmer_(sb_stemmer_new(language, "UTF_8")) {} + +Lexer::~Lexer() { sb_stemmer_delete(stemmer_); } + absl::StatusOr> Lexer::Tokenize( absl::string_view text, const std::bitset<256>& punct_bitmap, - sb_stemmer* stemmer, bool stemming_enabled, uint32_t min_stem_size, + bool stemming_enabled, uint32_t min_stem_size, const absl::flat_hash_set& stop_words_set) const { if (!IsValidUtf8(text)) { return absl::InvalidArgumentError("Invalid UTF-8"); @@ -45,7 +50,7 @@ absl::StatusOr> Lexer::Tokenize( continue; // Skip stop words } - word = StemWord(word, stemmer, stemming_enabled, min_stem_size); + word = StemWord(word, stemming_enabled, min_stem_size); tokens.push_back(std::move(word)); } @@ -54,21 +59,20 @@ absl::StatusOr> Lexer::Tokenize( return tokens; } -std::string Lexer::StemWord(const std::string& word, sb_stemmer* stemmer, +std::string Lexer::StemWord(const std::string& word, bool stemming_enabled, uint32_t min_stem_size) const { if (word.empty() || !stemming_enabled || word.length() < min_stem_size) { return word; } - - CHECK(stemmer) << "Stemmer not initialized"; + std::lock_guard guard(stemmer_mutex_); const sb_symbol* stemmed = sb_stemmer_stem( - stemmer, reinterpret_cast(word.c_str()), word.length()); + stemmer_, reinterpret_cast(word.c_str()), word.length()); DCHECK(stemmed) << "Stemming failed for word: " + word; - int stemmed_length = sb_stemmer_length(stemmer); + int stemmed_length = sb_stemmer_length(stemmer_); return std::string(reinterpret_cast(stemmed), stemmed_length); } diff --git a/src/indexes/text/lexer.h b/src/indexes/text/lexer.h index 679a8eea..9420570d 100644 --- a/src/indexes/text/lexer.h +++ b/src/indexes/text/lexer.h @@ -37,9 +37,12 @@ struct sb_stemmer; namespace valkey_search::indexes::text { struct Lexer { + Lexer(const char *); + ~Lexer(); + absl::StatusOr> Tokenize( absl::string_view text, const std::bitset<256>& punct_bitmap, - sb_stemmer* stemmer, bool stemming_enabled, uint32_t min_stem_size, + bool stemming_enabled, uint32_t min_stem_size, const absl::flat_hash_set& stop_words_set) const; // Punctuation checking API @@ -55,8 +58,12 @@ struct Lexer { } private: - std::string StemWord(const std::string& word, sb_stemmer* stemmer, - bool stemming_enabled, uint32_t min_stem_size) const; + mutable sb_stemmer* stemmer_ = nullptr; + + // TODO: Create a little connection pool of stemmers protected by this mutex + mutable std::mutex stemmer_mutex_; + + std::string StemWord(const std::string& word, bool stemming_enabled, uint32_t min_stem_size) const; // UTF-8 processing helpers bool IsValidUtf8(absl::string_view text) const; diff --git a/src/indexes/text/text_index.cc b/src/indexes/text/text_index.cc index e334fe8b..f60f083b 100644 --- a/src/indexes/text/text_index.cc +++ b/src/indexes/text/text_index.cc @@ -7,6 +7,8 @@ #include "src/indexes/text/text_index.h" +#include "absl/log/check.h" +#include "absl/status/status.h" #include "absl/strings/ascii.h" #include "libstemmer.h" @@ -44,6 +46,33 @@ absl::flat_hash_set BuildStopWordsSet( return stop_words_set; } +std::optional> AddWordToPostings( + std::optional> existing, + const InternedStringPtr& key, size_t text_field_number, uint32_t position, + bool save_positions, uint8_t num_text_fields) { + std::shared_ptr postings; + if (existing.has_value()) { + postings = existing.value(); + } else { + postings = std::make_shared(save_positions, num_text_fields); + } + postings->InsertPosting(key, text_field_number, position); + return postings; +} + +std::optional> RemoveKeyFromPostings( + std::optional> existing, + const InternedStringPtr& key) { + CHECK(existing.has_value()) << "Per-key tree became unaligned"; + auto postings = existing.value(); + postings->RemoveKey(key); + if (!postings->IsEmpty()) { + return postings; + } else { + return std::nullopt; + } +} + } // namespace TextIndexSchema::TextIndexSchema(data_model::Language language, @@ -54,13 +83,7 @@ TextIndexSchema::TextIndexSchema(data_model::Language language, punct_bitmap_(BuildPunctuationBitmap(punctuation)), stop_words_set_(BuildStopWordsSet(stop_words)), with_offsets_(with_offsets), - stemmer_(sb_stemmer_new(GetLanguageString(), "UTF_8")) {} - -TextIndexSchema::~TextIndexSchema() { - if (stemmer_) { - sb_stemmer_delete(stemmer_); - } -} + lexer_(Lexer(GetLanguageString())) {} const char* TextIndexSchema::GetLanguageString() const { switch (language_) { @@ -71,4 +94,63 @@ const char* TextIndexSchema::GetLanguageString() const { } } +absl::StatusOr TextIndexSchema::IndexAttributeData(const InternedStringPtr& key, + absl::string_view data, size_t text_field_number, bool stem, + size_t min_stem_size, bool suffix) { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + + auto tokens = + lexer_.Tokenize(data, GetPunctuationBitmap(), stem, min_stem_size, GetStopWordsSet()); + + if (!tokens.ok()) { + if (tokens.status().code() == absl::StatusCode::kInvalidArgument) { + return false; // UTF-8 errors → hash_indexing_failures + } + return tokens.status(); + } + + // Be smart about how we update in main trees since don't want to traverse + // twice to same PO + + for (uint32_t position = 0; position < tokens->size(); ++position) { + const auto& token = (*tokens)[position]; + text_index_->prefix_.Mutate( + token, + [&](auto existing) { + return AddWordToPostings(existing, key, text_field_number, position, + GetWithOffsets(), GetNumTextFields()); + }); + } + + return true; +} + +void TextIndexSchema::DeleteKeyData(const InternedStringPtr& key) { + std::optional key_index; + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + auto it = per_key_text_indexes_.find(key); + if (it != per_key_text_indexes_.end()) { + key_index.emplace(std::move(it->second)); + } + } + + if (key_index.has_value()) { + std::lock_guard main_tree_guard(text_index_->mutex_); + + // Cleanup prefix tree + auto iter = key_index->prefix_.GetWordIterator(""); + while (!iter.Done()) { + std::string_view word = iter.GetWord(); + text_index_->prefix_.Mutate(word, [&](auto existing) { + return RemoveKeyFromPostings(existing, key); + }); + } + + if (text_index_->suffix_.has_value()) { + // TODO: Cleanup suffix tree + } + } +} + } // namespace valkey_search::indexes::text diff --git a/src/indexes/text/text_index.h b/src/indexes/text/text_index.h index bb670bc7..98569ecf 100644 --- a/src/indexes/text/text_index.h +++ b/src/indexes/text/text_index.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "absl/container/flat_hash_map.h" @@ -20,6 +21,8 @@ #include "src/index_schema.pb.h" #include "src/indexes/text/posting.h" #include "src/indexes/text/radix_tree.h" +#include "src/indexes/text/lexer.h" + struct sb_stemmer; @@ -45,6 +48,9 @@ struct TextIndex { // RadixTree, false> prefix_; std::optional, true>> suffix_; + + // Dumb lock to prevent concurrent tree mutations for now + std::mutex mutex_; }; class TextIndexSchema { @@ -54,6 +60,10 @@ class TextIndexSchema { const std::vector& stop_words); ~TextIndexSchema(); + absl::StatusOr IndexAttributeData(const InternedStringPtr& key, absl::string_view data, size_t text_field_number, + bool stem, size_t min_stem_size, bool suffix); + void DeleteKeyData(const InternedStringPtr& key); + uint8_t AllocateTextFieldNumber() { return num_text_fields_++; } uint8_t GetNumTextFields() const { return num_text_fields_; } @@ -65,7 +75,6 @@ class TextIndexSchema { return stop_words_set_; } data_model::Language GetLanguage() const { return language_; } - sb_stemmer* GetStemmer() const { return stemmer_; }; bool GetWithOffsets() const { return with_offsets_; } private: @@ -83,7 +92,12 @@ class TextIndexSchema { // This object must also ensure that updates of this object are multi-thread // safe. // - absl::flat_hash_map by_key_; + absl::flat_hash_map per_key_text_indexes_; + + Lexer lexer_; + + // Prevent concurrent mutations to per_key_text_indexes_ + std::mutex per_key_text_indexes_mutex_; // Punctuation bitmap PunctuationBitmap punct_bitmap_; @@ -94,9 +108,6 @@ class TextIndexSchema { // Language needed for stemmer creation data_model::Language language_ = data_model::LANGUAGE_UNSPECIFIED; - // Stemmer reused across all operations for this index - mutable sb_stemmer* stemmer_ = nullptr; - // Whether to store position offsets for phrase queries bool with_offsets_ = false; diff --git a/testing/lexer_test.cc b/testing/lexer_test.cc index 61274d90..8dcb9179 100644 --- a/testing/lexer_test.cc +++ b/testing/lexer_test.cc @@ -36,7 +36,7 @@ struct LexerTestCase { class LexerTest : public ::testing::Test { protected: void SetUp() override { - lexer_ = std::make_unique(); + lexer_ = std::make_unique("english"); // Create TextIndexSchema to get real bitmap (tests real integration) std::vector stop_words = {"the", "and", "or"}; @@ -44,7 +44,6 @@ class LexerTest : public ::testing::Test { data_model::LANGUAGE_ENGLISH, " \t\n\r!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~", true, stop_words); - language_ = "english"; stemming_enabled_ = true; min_stem_size_ = 3; } @@ -60,7 +59,6 @@ class LexerTest : public ::testing::Test { } std::unique_ptr lexer_; - std::string language_; bool stemming_enabled_; uint32_t min_stem_size_; }; @@ -80,7 +78,7 @@ TEST_P(LexerParameterizedTest, TokenizeTest) { auto result = lexer_->Tokenize(test_case.input, schema->GetPunctuationBitmap(), - schema->GetStemmer(), test_case.stemming_enabled, + test_case.stemming_enabled, test_case.min_stem_size, schema->GetStopWordsSet()); ASSERT_TRUE(result.ok()) << "Test case: " << test_case.description; @@ -171,7 +169,7 @@ TEST_F(LexerTest, InvalidUTF8) { std::string invalid_utf8 = "hello \xFF\xFE world"; auto result = lexer_->Tokenize(invalid_utf8, text_schema_->GetPunctuationBitmap(), - text_schema_->GetStemmer(), stemming_enabled_, + stemming_enabled_, min_stem_size_, text_schema_->GetStopWordsSet()); EXPECT_FALSE(result.ok()); EXPECT_EQ(result.status().code(), absl::StatusCode::kInvalidArgument); @@ -182,7 +180,7 @@ TEST_F(LexerTest, LongWord) { std::string long_word(1000, 'a'); auto result = lexer_->Tokenize(long_word, text_schema_->GetPunctuationBitmap(), - text_schema_->GetStemmer(), stemming_enabled_, + stemming_enabled_, min_stem_size_, text_schema_->GetStopWordsSet()); ASSERT_TRUE(result.ok()); EXPECT_EQ(*result, std::vector({long_word})); @@ -202,7 +200,7 @@ TEST_F(LexerTest, EmptyStopWordsHandling) { auto result = lexer_->Tokenize("Hello, world! TESTING 123 with-dashes and/or symbols", no_stop_schema->GetPunctuationBitmap(), - no_stop_schema->GetStemmer(), true, 3, empty_set); + true, 3, empty_set); ASSERT_TRUE(result.ok()); EXPECT_EQ(*result, From 55a8e31305215e7b078b3f168d40b0b99de2951a Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Sat, 4 Oct 2025 21:13:26 +0000 Subject: [PATCH 2/7] Fix up text_index.h/.cc to what I had Signed-off-by: Brennan Cathcart --- src/indexes/text/lexer.h | 2 +- src/indexes/text/text_index.cc | 15 ++++++++------- src/indexes/text/text_index.h | 7 +++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/indexes/text/lexer.h b/src/indexes/text/lexer.h index 9420570d..280f8dcc 100644 --- a/src/indexes/text/lexer.h +++ b/src/indexes/text/lexer.h @@ -60,7 +60,7 @@ struct Lexer { private: mutable sb_stemmer* stemmer_ = nullptr; - // TODO: Create a little connection pool of stemmers protected by this mutex + // TODO: Create a little pool of stemmers protected by this mutex mutable std::mutex stemmer_mutex_; std::string StemWord(const std::string& word, bool stemming_enabled, uint32_t min_stem_size) const; diff --git a/src/indexes/text/text_index.cc b/src/indexes/text/text_index.cc index f60f083b..3f73b8fb 100644 --- a/src/indexes/text/text_index.cc +++ b/src/indexes/text/text_index.cc @@ -126,29 +126,30 @@ absl::StatusOr TextIndexSchema::IndexAttributeData(const InternedStringPtr } void TextIndexSchema::DeleteKeyData(const InternedStringPtr& key) { - std::optional key_index; + TextIndex* key_index = nullptr; { std::lock_guard per_key_guard(per_key_text_indexes_mutex_); auto it = per_key_text_indexes_.find(key); if (it != per_key_text_indexes_.end()) { - key_index.emplace(std::move(it->second)); + key_index = &it->second; } } - if (key_index.has_value()) { + if (key_index) { std::lock_guard main_tree_guard(text_index_->mutex_); - // Cleanup prefix tree + // Cleanup schema-level text index auto iter = key_index->prefix_.GetWordIterator(""); while (!iter.Done()) { std::string_view word = iter.GetWord(); text_index_->prefix_.Mutate(word, [&](auto existing) { return RemoveKeyFromPostings(existing, key); }); - } - if (text_index_->suffix_.has_value()) { - // TODO: Cleanup suffix tree + if (text_index_->suffix_.has_value()) { + // TODO: Cleanup suffix tree + } + iter.Next(); } } } diff --git a/src/indexes/text/text_index.h b/src/indexes/text/text_index.h index 98569ecf..fa5487c4 100644 --- a/src/indexes/text/text_index.h +++ b/src/indexes/text/text_index.h @@ -14,8 +14,8 @@ #include #include -#include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "absl/container/node_hash_map.h" #include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "src/index_schema.pb.h" @@ -49,7 +49,7 @@ struct TextIndex { RadixTree, false> prefix_; std::optional, true>> suffix_; - // Dumb lock to prevent concurrent tree mutations for now + // TODO: develop a proper TextIndex locking scheme std::mutex mutex_; }; @@ -58,7 +58,6 @@ class TextIndexSchema { TextIndexSchema(data_model::Language language, const std::string& punctuation, bool with_offsets, const std::vector& stop_words); - ~TextIndexSchema(); absl::StatusOr IndexAttributeData(const InternedStringPtr& key, absl::string_view data, size_t text_field_number, bool stem, size_t min_stem_size, bool suffix); @@ -92,7 +91,7 @@ class TextIndexSchema { // This object must also ensure that updates of this object are multi-thread // safe. // - absl::flat_hash_map per_key_text_indexes_; + absl::node_hash_map per_key_text_indexes_; Lexer lexer_; From e0c76d8a86dbbab0e3d96ccc8c8bf5f64f9e1a37 Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Wed, 8 Oct 2025 01:29:00 +0000 Subject: [PATCH 3/7] Added suffix tree mutation Signed-off-by: Brennan Cathcart --- src/indexes/text.cc | 52 ++------------- src/indexes/text/radix_tree.h | 87 +++++++++++++++++++------ src/indexes/text/text_index.cc | 113 ++++++++++++++++++++++----------- testing/radix_test.cc | 18 +++--- 4 files changed, 157 insertions(+), 113 deletions(-) diff --git a/src/indexes/text.cc b/src/indexes/text.cc index a0c61a0b..d2934121 100644 --- a/src/indexes/text.cc +++ b/src/indexes/text.cc @@ -31,60 +31,16 @@ absl::StatusOr Text::AddRecord(const InternedStringPtr& key, // TODO: Key Tracking - // valkey_search::indexes::text::Lexer lexer; - - // auto tokens = - // lexer.Tokenize(data, text_index_schema_->GetPunctuationBitmap(), - // text_index_schema_->GetStemmer(), !no_stem_, - // min_stem_size_, text_index_schema_->GetStopWordsSet()); - - // if (!tokens.ok()) { - // if (tokens.status().code() == absl::StatusCode::kInvalidArgument) { - // return false; // UTF-8 errors → hash_indexing_failures - // } - // return tokens.status(); - // } - - // for (uint32_t position = 0; position < tokens->size(); ++position) { - // const auto& token = (*tokens)[position]; - // text_index_schema_->GetTextIndex()->prefix_.Mutate( - // token, - // [&](std::optional> existing) - // -> std::optional> { - // std::shared_ptr postings; - // if (existing.has_value()) { - // postings = existing.value(); - // } else { - // // Create new Postings object with schema configuration - // bool save_positions = text_index_schema_->GetWithOffsets(); - // uint8_t num_text_fields = text_index_schema_->GetNumTextFields(); - // postings = std::make_shared(save_positions, - // num_text_fields); - // } - - // postings->InsertPosting(key, text_field_number_, position); - // return postings; - // }); - // } - return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); } absl::StatusOr Text::RemoveRecord(const InternedStringPtr& key, DeletionType deletion_type) { - // if (text_field_number_ == 0 && deletion_type == DeletionType::kRecord) { - // // Call API to clean up the whole key - // } else { - // // Call API to clean up the key for this particular text attribute - // // - how do we know what words? Could search through whole tree to find the ones with this field - // // Can also think of a trick to remove whole tree and re-ingest it - // } - // throw std::runtime_error("Text::RemoveRecord not implemented"); - - // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey() at - // this point so don't need to touch the index structures + // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey(), + // so there is no need to touch the index structures here // TODO: key tracking + return true; } @@ -93,7 +49,7 @@ absl::StatusOr Text::ModifyRecord(const InternedStringPtr& key, // TODO: key tracking // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey() at - // this point so we simply add the new key data + // this point, so we simply add the new key data return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); } diff --git a/src/indexes/text/radix_tree.h b/src/indexes/text/radix_tree.h index ecca3cc5..44f0a484 100644 --- a/src/indexes/text/radix_tree.h +++ b/src/indexes/text/radix_tree.h @@ -88,23 +88,44 @@ struct RadixTree { RadixTree() = default; // - // This function is the only way to mutate the RadixTree, all other functions - // are read-only. This function is explicitly multi-thread safe and is + // Adds a target to the for the given word. If the target already exists, + // it will be replaced by the new target. An empty target will cause the + // word to be deleted from the tree. + // + // This function is explicitly multi-thread safe and is // designed to allow other mutations to be performed on other words and // targets simultaneously, with minimal collisions. // - // In all cases, the mutate function is invoked once under the locking - // provided by the RadixTree itself, so if the target objects are disjoint - // (which is normal) then no locking is required within the mutate function - // itself. + // It's expected that the caller will know whether or not the word + // exists. Passing in a word that doesn't exist along with a + // nullopt new_target will cause the word to be added and + // then immediately deleted from the tree. + // + void SetTarget( + absl::string_view word, + std::optional new_target); + + // + // Applies the mutation function to the target of the given word to generate + // a new target. If the word doesn't already exist, a path for it will be + // first added to the tree. The new target is also returned to the caller. // // The input parameter to the mutate function will be nullopt if there is no // entry for this word. Otherwise it will contain the value for this word. The // return value of the mutate function is the new value for this word. if the // return value is nullopt then this word is deleted from the RadixTree. // + // This function is explicitly multi-thread safe and is + // designed to allow other mutations to be performed on other words and + // targets simultaneously, with minimal collisions. // - void Mutate( + // In all cases, the mutate function is invoked once under the locking + // provided by the RadixTree itself, so if the target objects are disjoint + // (which is normal) then no locking is required within the mutate function + // itself. + // + // + std::optional MutateTarget( absl::string_view word, absl::FunctionRef(std::optional)> mutate); @@ -190,6 +211,9 @@ struct RadixTree { Node root_; + // Gets the path of nodes for the given word, creating it if it doesn't exist. + std::deque GetOrCreateWordPath(absl::string_view word); + // Restructures tree after a word is deleted from it void PostDeleteTreeCleanup(absl::string_view word, std::deque& node_path); @@ -300,10 +324,44 @@ struct RadixTree { }; template -void RadixTree::Mutate( +void RadixTree::SetTarget( + absl::string_view word, + std::optional new_target) { + CHECK(!word.empty()) << "Can't add the target for an empty word"; + std::deque node_path = GetOrCreateWordPath(word); + Node* n = node_path.back(); + if (new_target) { + n->target = new_target; + } else { + // Delete the word from the tree + n->target = std::nullopt; + PostDeleteTreeCleanup(word, node_path); + } +} + +template +std::optional RadixTree::MutateTarget( absl::string_view word, absl::FunctionRef(std::optional)> mutate) { - CHECK(!word.empty()) << "Can't mutate the target at an empty word"; + CHECK(!word.empty()) << "Can't mutate the target for an empty word"; + std::deque node_path = GetOrCreateWordPath(word); + Node* n = node_path.back(); + + // Apply mutating function + std::optional new_target = mutate(n->target); + + if (new_target) { + n->target = new_target; + } else { + // Delete the word from the tree + n->target = std::nullopt; + PostDeleteTreeCleanup(word, node_path); + } + return new_target; +} + +template +std::deque::Node*> RadixTree::GetOrCreateWordPath(absl::string_view word) { Node* n = &root_; absl::string_view remaining = word; std::deque node_path{n}; @@ -384,16 +442,7 @@ void RadixTree::Mutate( node_path.push_back(n); } } - - std::optional new_target = mutate(n->target); - - if (new_target) { - n->target = new_target; - } else { - // Delete the word from the tree - n->target = std::nullopt; - PostDeleteTreeCleanup(word, node_path); - } + return node_path; } template diff --git a/src/indexes/text/text_index.cc b/src/indexes/text/text_index.cc index 3f73b8fb..cba0ab0b 100644 --- a/src/indexes/text/text_index.cc +++ b/src/indexes/text/text_index.cc @@ -54,7 +54,8 @@ std::optional> AddWordToPostings( if (existing.has_value()) { postings = existing.value(); } else { - postings = std::make_shared(save_positions, num_text_fields); + postings = + std::make_shared(save_positions, num_text_fields); } postings->InsertPosting(key, text_field_number, position); return postings; @@ -94,13 +95,11 @@ const char* TextIndexSchema::GetLanguageString() const { } } -absl::StatusOr TextIndexSchema::IndexAttributeData(const InternedStringPtr& key, - absl::string_view data, size_t text_field_number, bool stem, - size_t min_stem_size, bool suffix) { - std::lock_guard per_key_guard(per_key_text_indexes_mutex_); - - auto tokens = - lexer_.Tokenize(data, GetPunctuationBitmap(), stem, min_stem_size, GetStopWordsSet()); +absl::StatusOr TextIndexSchema::IndexAttributeData( + const InternedStringPtr& key, absl::string_view data, + size_t text_field_number, bool stem, size_t min_stem_size, bool suffix) { + auto tokens = lexer_.Tokenize(data, GetPunctuationBitmap(), stem, + min_stem_size, GetStopWordsSet()); if (!tokens.ok()) { if (tokens.status().code() == absl::StatusCode::kInvalidArgument) { @@ -109,49 +108,89 @@ absl::StatusOr TextIndexSchema::IndexAttributeData(const InternedStringPtr return tokens.status(); } - // Be smart about how we update in main trees since don't want to traverse - // twice to same PO + TextIndex* key_index; // Key-specific index + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + key_index = &per_key_text_indexes_[key]; + } for (uint32_t position = 0; position < tokens->size(); ++position) { const auto& token = (*tokens)[position]; - text_index_->prefix_.Mutate( - token, - [&](auto existing) { - return AddWordToPostings(existing, key, text_field_number, position, - GetWithOffsets(), GetNumTextFields()); - }); + const std::optional reverse_token = + suffix ? std::optional( + std::string(token.rbegin(), token.rend())) + : std::nullopt; + + // Mutate key index + { + std::lock_guard key_guard(key_index->mutex_); + std::optional> new_target = + key_index->prefix_.MutateTarget(token, [&](auto existing) { + return AddWordToPostings(existing, key, text_field_number, position, + GetWithOffsets(), GetNumTextFields()); + }); + + if (suffix) { + if (!key_index->suffix_.has_value()) { + key_index->suffix_.emplace(); + } + key_index->suffix_.value().SetTarget(*reverse_token, new_target); + } + } + + // Mutate schema index + { + std::lock_guard schema_guard(text_index_->mutex_); + std::optional> new_target = + text_index_->prefix_.MutateTarget(token, [&](auto existing) { + return AddWordToPostings(existing, key, text_field_number, position, + GetWithOffsets(), GetNumTextFields()); + }); + + if (suffix) { + if (!text_index_->suffix_.has_value()) { + text_index_->suffix_.emplace(); + } + text_index_->suffix_.value().SetTarget(*reverse_token, new_target); + } + } } return true; } void TextIndexSchema::DeleteKeyData(const InternedStringPtr& key) { - TextIndex* key_index = nullptr; - { - std::lock_guard per_key_guard(per_key_text_indexes_mutex_); - auto it = per_key_text_indexes_.find(key); - if (it != per_key_text_indexes_.end()) { - key_index = &it->second; + TextIndex* key_index = nullptr; + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + auto it = per_key_text_indexes_.find(key); + if (it != per_key_text_indexes_.end()) { + key_index = &it->second; + } } - } - - if (key_index) { - std::lock_guard main_tree_guard(text_index_->mutex_); - // Cleanup schema-level text index - auto iter = key_index->prefix_.GetWordIterator(""); - while (!iter.Done()) { - std::string_view word = iter.GetWord(); - text_index_->prefix_.Mutate(word, [&](auto existing) { - return RemoveKeyFromPostings(existing, key); - }); + if (key_index) { + std::lock_guard main_tree_guard(text_index_->mutex_); - if (text_index_->suffix_.has_value()) { - // TODO: Cleanup suffix tree + // Cleanup schema-level text index + auto iter = key_index->prefix_.GetWordIterator(""); + while (!iter.Done()) { + std::string_view word = iter.GetWord(); + std::optional> new_target = text_index_->prefix_.MutateTarget(word, [&](auto existing) { + return RemoveKeyFromPostings(existing, key); + }); + if (text_index_->suffix_.has_value()) { + std::string reverse_word(word.rbegin(), word.rend()); + text_index_->suffix_.value().SetTarget(reverse_word, new_target); + } + iter.Next(); } - iter.Next(); + } + + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + per_key_text_indexes_.erase(key); } } -} } // namespace valkey_search::indexes::text diff --git a/testing/radix_test.cc b/testing/radix_test.cc index ab3e4a4e..4edeeb87 100644 --- a/testing/radix_test.cc +++ b/testing/radix_test.cc @@ -36,20 +36,20 @@ class RadixTreeTest : public vmsdk::ValkeyTest { void AddWords(const std::vector>& words) { for (const auto& [word, value] : words) { - prefix_tree_->Mutate(word, [value](auto) { return TestTarget(value); }); + prefix_tree_->MutateTarget(word, [value](auto) { return TestTarget(value); }); } } void DeleteWords(const std::vector& words) { for (const auto& word : words) { - prefix_tree_->Mutate( + prefix_tree_->MutateTarget( word, [](auto) -> std::optional { return std::nullopt; }); } } void VerifyWords(const std::vector>& expected) { for (const auto& [word, value] : expected) { - prefix_tree_->Mutate(word, [value, word](auto existing) { + prefix_tree_->MutateTarget(word, [value, word](auto existing) { EXPECT_TRUE(existing.has_value()) << "Word '" << word << "' should exist"; EXPECT_EQ(existing->value, value) @@ -61,7 +61,7 @@ class RadixTreeTest : public vmsdk::ValkeyTest { void VerifyWordsDeleted(const std::vector& words) { for (const auto& word : words) { - prefix_tree_->Mutate(word, [&word](auto existing) { + prefix_tree_->MutateTarget(word, [&word](auto existing) { EXPECT_FALSE(existing.has_value()) << "Word '" << word << "' should be deleted"; return existing; @@ -435,7 +435,7 @@ TEST_F(RadixTreeTest, WordIteratorEmpty) { } TEST_F(RadixTreeTest, WordIteratorNoMatch) { - prefix_tree_->Mutate("hello", [](auto) { return TestTarget(1); }); + prefix_tree_->MutateTarget("hello", [](auto) { return TestTarget(1); }); // Test iterator with non-matching prefix auto iter = prefix_tree_->GetWordIterator("world"); @@ -443,7 +443,7 @@ TEST_F(RadixTreeTest, WordIteratorNoMatch) { } TEST_F(RadixTreeTest, WordIteratorSingleWord) { - prefix_tree_->Mutate("test", [](auto) { return TestTarget(42); }); + prefix_tree_->MutateTarget("test", [](auto) { return TestTarget(42); }); auto iter = prefix_tree_->GetWordIterator("test"); EXPECT_FALSE(iter.Done()); @@ -560,7 +560,7 @@ TEST_F(RadixTreeTest, WordIteratorLargeScale) { for (const auto& w : words) { word_counts[w]++; // Add word to tree, incrementing count each time - prefix_tree_->Mutate(w, [](auto existing) { + prefix_tree_->MutateTarget(w, [](auto existing) { if (existing.has_value()) { return TestTarget(existing->value + 1); } else { @@ -583,7 +583,7 @@ TEST_F(RadixTreeTest, WordIteratorLargeScale) { std::default_random_engine{static_cast(std::time(nullptr))}); std::set words_to_delete(words.begin(), words.begin() + 100); for (const auto& w : words_to_delete) { - prefix_tree_->Mutate(w, [](auto) { return std::nullopt; }); + prefix_tree_->MutateTarget(w, [](auto) { return std::nullopt; }); word_counts.erase(w); } word_pairs = std::vector>(word_counts.begin(), @@ -592,7 +592,7 @@ TEST_F(RadixTreeTest, WordIteratorLargeScale) { // Delete all words for (const auto& w : words) { - prefix_tree_->Mutate(w, [](auto) { return std::nullopt; }); + prefix_tree_->MutateTarget(w, [](auto) { return std::nullopt; }); } // clang-format off VerifyTreeStructure({ From c857c3c3f6393c881c3828a3e85125390aca940c Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Fri, 10 Oct 2025 04:23:36 +0000 Subject: [PATCH 4/7] Add integration test Signed-off-by: Brennan Cathcart --- integration/test_fulltext.py | 78 +++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/integration/test_fulltext.py b/integration/test_fulltext.py index 88fb1bdf..e0e8b81e 100644 --- a/integration/test_fulltext.py +++ b/integration/test_fulltext.py @@ -283,14 +283,13 @@ def test_text_search_per_field(self): doc_fields_desc2 = dict(zip(document_desc2[::2], document_desc2[1::2])) assert doc_fields_desc2 == expected_desc2_hash_value - def test_default_ingestion_pipeline(self): + def test_default_tokenization(self): """ - Test comprehensive ingestion pipeline: FT.CREATE → HSET → FT.SEARCH with full tokenization + Test FT.CREATE → HSET → FT.SEARCH with full tokenization """ client: Valkey = self.server.get_new_client() client.execute_command("FT.CREATE idx ON HASH SCHEMA content TEXT") client.execute_command("HSET", "doc:1", "content", "The quick-running searches are finding EFFECTIVE results!") - client.execute_command("HSET", "doc:2", "content", "But slow searches aren't working...") # List of queries with pass/fail expectations test_cases = [ @@ -311,36 +310,36 @@ def test_default_ingestion_pipeline(self): else: assert result[0] == 0, f"Failed: {description}" - def test_multi_text_field(self): + @pytest.mark.skip(reason="TODO: ingest original words when stemming enabled") + def test_stemming(self): """ - Test different TEXT field configs in same index + Test text index NOSTEM option """ client: Valkey = self.server.get_new_client() client.execute_command("FT.CREATE idx ON HASH SCHEMA title TEXT content TEXT NOSTEM") client.execute_command("HSET", "doc:1", "title", "running fast", "content", "running quickly") - expected_value = { - b'title': b'running fast', - b'content': b'running quickly' - } + expected = [1, b'doc:1', [b'content', b'running quickly', b'title', b'running fast']] - result = client.execute_command("FT.SEARCH", "idx", '@title:"run"') - actual_fields = dict(zip(result[2][::2], result[2][1::2])) - assert actual_fields == expected_value + # We can find stems on 'title' + assert client.execute_command("FT.SEARCH", "idx", '@title:"run"') == expected - result = client.execute_command("FT.SEARCH", "idx", '@content:"run"') - assert result[0] == 0 # Should not find (NOSTEM) + # We cannot find stems on 'content' with NOSTEM + assert client.execute_command("FT.SEARCH", "idx", '@content:"run"') == [0] + + # We can find original words in both cases + assert client.execute_command("FT.SEARCH", "idx", '@title:"running"') == expected # TODO: fails here + assert client.execute_command("FT.SEARCH", "idx", '@content:"running"') == expected def test_custom_stopwords(self): """ - End-to-end test: FT.CREATE STOPWORDS config actually filters custom stop words in search + Test FT.CREATE STOPWORDS option filters out custom stop words """ client: Valkey = self.server.get_new_client() client.execute_command("FT.CREATE idx ON HASH STOPWORDS 2 the and SCHEMA content TEXT") client.execute_command("HSET", "doc:1", "content", "the cat and dog are good") # Stop words should not be findable - result = client.execute_command("FT.SEARCH", "idx", '@content:"and"') assert result[0] == 0 # Stop word "and" filtered out @@ -350,23 +349,9 @@ def test_custom_stopwords(self): assert result[1] == b'doc:1' assert result[2] == [b'content', b"the cat and dog are good"] - def test_nostem(self): - """ - End-to-end test: FT.CREATE NOSTEM config actually affects stemming in search - """ - client: Valkey = self.server.get_new_client() - client.execute_command("FT.CREATE idx ON HASH NOSTEM SCHEMA content TEXT") - client.execute_command("HSET", "doc:1", "content", "running quickly") - - # With NOSTEM, exact forms should be findable - result = client.execute_command("FT.SEARCH", "idx", '@content:"running"') - assert result[0] == 1 # Exact form "running" found - assert result[1] == b'doc:1' - assert result[2] == [b'content', b"running quickly"] - def test_custom_punctuation(self): """ - Test PUNCTUATION directive configures custom tokenization separators + Test FT.CREATE PUNCTUATION directive configures custom tokenization separators """ client: Valkey = self.server.get_new_client() client.execute_command("FT.CREATE idx ON HASH PUNCTUATION . SCHEMA content TEXT") @@ -380,4 +365,33 @@ def test_custom_punctuation(self): # @ NOT configured as separator - should not be able with split words result = client.execute_command("FT.SEARCH", "idx", '@content:"test"') - assert result[0] == 0 \ No newline at end of file + assert result[0] == 0 + + def test_add_update_delete_documents(self): + """ + Tests we accurately reflect added, updated, and deleted documents in text indexes + """ + client: Valkey = self.server.get_new_client() + + client.execute_command("FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "content", "TEXT") + + # Add + for i in range(10): + client.execute_command("HSET", f"doc:{i}", "content", f"What a cool document{i}") + result = client.execute_command("FT.SEARCH", "idx", "@content:document*") + assert result[0] == 10 + + # Update + for i in range(10): + client.execute_command("HSET", f"doc:{i}", "content", f"What a cool doc{i}") + result = client.execute_command("FT.SEARCH", "idx", "@content:document*") + assert result[0] == 0 + result = client.execute_command("FT.SEARCH", "idx", "@content:doc*") + assert result[0] == 10 + + # Delete + for i in range(10): + client.execute_command("DEL", f"doc:{i}") + result = client.execute_command("FT.SEARCH", "idx", "@content:doc*") + assert result[0] == 0 + \ No newline at end of file From c8d3725d3a7a1038debb88c478767aeae563375f Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Sat, 11 Oct 2025 00:09:48 +0000 Subject: [PATCH 5/7] Vibed multi-client test Signed-off-by: Brennan Cathcart --- integration/test_fulltext.py | 190 ++++++++++++++++++++++++++++++++-- src/indexes/text/radix_tree.h | 1 - 2 files changed, 181 insertions(+), 10 deletions(-) diff --git a/integration/test_fulltext.py b/integration/test_fulltext.py index 6e464246..200cf21c 100644 --- a/integration/test_fulltext.py +++ b/integration/test_fulltext.py @@ -5,6 +5,8 @@ from valkeytestframework.conftest import resource_port_tracker from ft_info_parser import FTInfoParser from valkeytestframework.util import waiters +import threading +import time """ This file contains tests for full text search. @@ -446,31 +448,201 @@ def test_custom_punctuation(self): result = client.execute_command("FT.SEARCH", "idx", '@content:"test"') assert result[0] == 0 - def test_add_update_delete_documents(self): + def test_add_update_delete_documents_single_client(self): """ - Tests we accurately reflect added, updated, and deleted documents in text indexes + Tests we properly ingest added, updated, and deleted documents from a single client """ client: Valkey = self.server.get_new_client() - client.execute_command("FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "content", "TEXT") + num_docs = 5 # Add - for i in range(10): + for i in range(num_docs): client.execute_command("HSET", f"doc:{i}", "content", f"What a cool document{i}") result = client.execute_command("FT.SEARCH", "idx", "@content:document*") - assert result[0] == 10 + assert result[0] == num_docs # Update - for i in range(10): + for i in range(num_docs): client.execute_command("HSET", f"doc:{i}", "content", f"What a cool doc{i}") result = client.execute_command("FT.SEARCH", "idx", "@content:document*") assert result[0] == 0 result = client.execute_command("FT.SEARCH", "idx", "@content:doc*") - assert result[0] == 10 + assert result[0] == num_docs # Delete - for i in range(10): + for i in range(num_docs): client.execute_command("DEL", f"doc:{i}") result = client.execute_command("FT.SEARCH", "idx", "@content:doc*") assert result[0] == 0 - \ No newline at end of file + + def test_add_update_delete_documents_multi_client(self): + """ + Tests we properly ingest added, updated, and deleted documents from multiple clients + + TODO: To ensure concurrent ingestion, add a debug config to pause updates from the + waiting room and then let them index in batches. Otherwise, we're dependent on + this Python test sending them faster than the server is cutting indexing batches. + """ + + def perform_concurrent_searches(clients, num_clients, searches, phase_name): + """ + Helper function to perform concurrent searches across multiple clients and validate consistency + + Args: + clients: List of client connections + num_clients: Number of clients to use + searches: List of (query, description) tuples to execute + phase_name: Name of the phase for error reporting (ADD/UPDATE/DELETE) + """ + search_results = {} + def concurrent_search(client_id): + client = clients[client_id] + client_results = [] + for query, desc in searches: + result = client.execute_command("FT.SEARCH", "idx", query) + client_results.append((desc, result[0])) # Store description and count + search_results[client_id] = client_results + + threads = [] + for client_id in range(num_clients): + thread = threading.Thread(target=concurrent_search, args=(client_id,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Validate concurrent search results are consistent + expected_results = search_results[0] # Use first client as reference + for client_id in range(1, num_clients): + assert search_results[client_id] == expected_results, f"{phase_name}: Search results inconsistent between clients 0 and {client_id}" + + # Setup + num_clients = 50 + docs_per_client = 50 + clients = [self.server.get_new_client() for _ in range(num_clients)] + + # Create the index + clients[0].execute_command("FT.CREATE", "idx", "ON", "HASH", "PREFIX", "1", "doc:", "SCHEMA", "content", "TEXT") + time.sleep(0.1) + + # Phase 1: Concurrent ADD + def add_documents(client_id): + client = clients[client_id] + for i in range(docs_per_client): + # Longer content to increase ingestion processing time + content = f"client{client_id} document doc{i} original content with many additional words to process during indexing. " \ + f"This extended text includes various terms like analysis, processing, indexing, searching, and retrieval. " \ + f"The purpose is to create substantial content that requires more computational effort during text analysis. " \ + f"Additional keywords include: database, storage, performance, optimization, concurrent, threading, synchronization. " \ + f"More descriptive text about document {i} from client {client_id} with original data and content." + client.execute_command("HSET", f"doc:{client_id}_{i}", "content", content) + + threads = [] + for client_id in range(num_clients): + thread = threading.Thread(target=add_documents, args=(client_id,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Validate ADD phase with concurrent searching + client = clients[0] + total_docs = num_clients * docs_per_client + + result = client.execute_command("FT.SEARCH", "idx", "@content:document") + assert result[0] == total_docs, f"ADD: Expected {total_docs} documents with 'document', got {result[0]}" + + result = client.execute_command("FT.SEARCH", "idx", "@content:origin") # "original" stems to "origin" + assert result[0] == total_docs, f"ADD: Expected {total_docs} documents with 'origin', got {result[0]}" + + # Concurrent search phase after ADD + add_searches = [ + ("@content:document", "document"), + ("@content:origin", "origin"), + ("@content:analysis", "analysis"), + ("@content:process*", "process*"), + ("@content:database", "database"), + ("@content:concurrent", "concurrent") + ] + perform_concurrent_searches(clients, num_clients, add_searches, "ADD") + + # Phase 2: Concurrent UPDATE + def update_documents(client_id): + client = clients[client_id] + for i in range(docs_per_client): + # Longer updated content to increase ingestion processing time + content = f"client{client_id} document doc{i} updated content with comprehensive text for thorough indexing analysis. " \ + f"This modified version contains different terminology including: revision, modification, alteration, enhancement. " \ + f"The updated document now features expanded vocabulary for testing concurrent update operations effectively. " \ + f"Technical terms added: algorithm, computation, execution, validation, verification, testing, debugging. " \ + f"Enhanced description of document {i} from client {client_id} with updated information and revised content." + client.execute_command("HSET", f"doc:{client_id}_{i}", "content", content) + + threads = [] + for client_id in range(num_clients): + thread = threading.Thread(target=update_documents, args=(client_id,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Validate UPDATE phase with concurrent searching + result = client.execute_command("FT.SEARCH", "idx", "@content:origin") # "original" stems to "origin" + assert result[0] == 0, f"UPDATE: Expected 0 documents with 'origin', got {result[0]}" + + result = client.execute_command("FT.SEARCH", "idx", "@content:updat") # "updated" stems to "updat" + assert result[0] == total_docs, f"UPDATE: Expected {total_docs} documents with 'updat', got {result[0]}" + + # Concurrent search phase after UPDATE + update_searches = [ + ("@content:document", "document"), + ("@content:updat", "updat"), + ("@content:revision", "revision"), + ("@content:modif*", "modif*"), + ("@content:algorithm", "algorithm"), + ("@content:validation", "validation") + ] + perform_concurrent_searches(clients, num_clients, update_searches, "UPDATE") + + # Phase 3: Concurrent DELETE + def delete_documents(client_id): + client = clients[client_id] + for i in range(docs_per_client // 2): # Delete half the documents + client.execute_command("DEL", f"doc:{client_id}_{i}") + + threads = [] + for client_id in range(num_clients): + thread = threading.Thread(target=delete_documents, args=(client_id,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Validate DELETE phase with concurrent searching + remaining_docs = total_docs // 2 + + result = client.execute_command("FT.SEARCH", "idx", "@content:updat") # "updated" stems to "updat" + assert result[0] == remaining_docs, f"DELETE: Expected {remaining_docs} documents with 'updat', got {result[0]}" + + result = client.execute_command("FT.SEARCH", "idx", "@content:document") + assert result[0] == remaining_docs, f"DELETE: Expected {remaining_docs} documents with 'document', got {result[0]}" + + # Concurrent search phase after DELETE + delete_searches = [ + ("@content:document", "document"), + ("@content:updat", "updat"), + ("@content:revision", "revision"), + ("@content:algorithm", "algorithm"), + ("@content:validation", "validation"), + ("@content:enhanced", "enhanced") + ] + perform_concurrent_searches(clients, num_clients, delete_searches, "DELETE") + + def test_suffix_search(self): + # TODO + pass diff --git a/src/indexes/text/radix_tree.h b/src/indexes/text/radix_tree.h index 44f0a484..8e98ff72 100644 --- a/src/indexes/text/radix_tree.h +++ b/src/indexes/text/radix_tree.h @@ -124,7 +124,6 @@ struct RadixTree { // (which is normal) then no locking is required within the mutate function // itself. // - // std::optional MutateTarget( absl::string_view word, absl::FunctionRef(std::optional)> mutate); From 5f26dd8d78b06da727e7f157e0db589120bc52bc Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Sat, 11 Oct 2025 02:16:36 +0000 Subject: [PATCH 6/7] Clang format Signed-off-by: Brennan Cathcart --- src/indexes/text.cc | 19 +++++++----- src/indexes/text/lexer.cc | 6 ++-- src/indexes/text/lexer.h | 7 +++-- src/indexes/text/radix_tree.h | 12 ++++---- src/indexes/text/text_index.cc | 53 +++++++++++++++++----------------- src/indexes/text/text_index.h | 9 +++--- testing/lexer_test.cc | 25 +++++++--------- testing/radix_test.cc | 3 +- 8 files changed, 69 insertions(+), 65 deletions(-) diff --git a/src/indexes/text.cc b/src/indexes/text.cc index 630992aa..bec22541 100644 --- a/src/indexes/text.cc +++ b/src/indexes/text.cc @@ -27,16 +27,18 @@ Text::Text(const data_model::TextIndex& text_index_proto, absl::StatusOr Text::AddRecord(const InternedStringPtr& key, absl::string_view data) { - // TODO: Key Tracking - return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); + return text_index_schema_->IndexAttributeData(key, data, text_field_number_, + !no_stem_, min_stem_size_, + with_suffix_trie_); } absl::StatusOr Text::RemoveRecord(const InternedStringPtr& key, DeletionType deletion_type) { - // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey(), - // so there is no need to touch the index structures here + // The old key value has already been removed from the index by a call to + // TextIndexSchema::DeleteKey(), so there is no need to touch the index + // structures here // TODO: key tracking @@ -47,9 +49,12 @@ absl::StatusOr Text::ModifyRecord(const InternedStringPtr& key, absl::string_view data) { // TODO: key tracking - // The old key value has already been removed from the index by a call to TextIndexSchema::DeleteKey() at - // this point, so we simply add the new key data - return text_index_schema_->IndexAttributeData(key, data, text_field_number_, !no_stem_, min_stem_size_, with_suffix_trie_); + // The old key value has already been removed from the index by a call to + // TextIndexSchema::DeleteKey() at this point, so we simply add the new key + // data + return text_index_schema_->IndexAttributeData(key, data, text_field_number_, + !no_stem_, min_stem_size_, + with_suffix_trie_); } int Text::RespondWithInfo(ValkeyModuleCtx* ctx) const { diff --git a/src/indexes/text/lexer.cc b/src/indexes/text/lexer.cc index 3701e5ac..ca6d2428 100644 --- a/src/indexes/text/lexer.cc +++ b/src/indexes/text/lexer.cc @@ -59,8 +59,7 @@ absl::StatusOr> Lexer::Tokenize( return tokens; } -std::string Lexer::StemWord(const std::string& word, - bool stemming_enabled, +std::string Lexer::StemWord(const std::string& word, bool stemming_enabled, uint32_t min_stem_size) const { if (word.empty() || !stemming_enabled || word.length() < min_stem_size) { return word; @@ -68,7 +67,8 @@ std::string Lexer::StemWord(const std::string& word, std::lock_guard guard(stemmer_mutex_); const sb_symbol* stemmed = sb_stemmer_stem( - stemmer_, reinterpret_cast(word.c_str()), word.length()); + stemmer_, reinterpret_cast(word.c_str()), + word.length()); DCHECK(stemmed) << "Stemming failed for word: " + word; diff --git a/src/indexes/text/lexer.h b/src/indexes/text/lexer.h index 280f8dcc..018f1741 100644 --- a/src/indexes/text/lexer.h +++ b/src/indexes/text/lexer.h @@ -37,9 +37,9 @@ struct sb_stemmer; namespace valkey_search::indexes::text { struct Lexer { - Lexer(const char *); + Lexer(const char*); ~Lexer(); - + absl::StatusOr> Tokenize( absl::string_view text, const std::bitset<256>& punct_bitmap, bool stemming_enabled, uint32_t min_stem_size, @@ -63,7 +63,8 @@ struct Lexer { // TODO: Create a little pool of stemmers protected by this mutex mutable std::mutex stemmer_mutex_; - std::string StemWord(const std::string& word, bool stemming_enabled, uint32_t min_stem_size) const; + std::string StemWord(const std::string& word, bool stemming_enabled, + uint32_t min_stem_size) const; // UTF-8 processing helpers bool IsValidUtf8(absl::string_view text) const; diff --git a/src/indexes/text/radix_tree.h b/src/indexes/text/radix_tree.h index 8e98ff72..71e367a5 100644 --- a/src/indexes/text/radix_tree.h +++ b/src/indexes/text/radix_tree.h @@ -101,9 +101,7 @@ struct RadixTree { // nullopt new_target will cause the word to be added and // then immediately deleted from the tree. // - void SetTarget( - absl::string_view word, - std::optional new_target); + void SetTarget(absl::string_view word, std::optional new_target); // // Applies the mutation function to the target of the given word to generate @@ -323,9 +321,8 @@ struct RadixTree { }; template -void RadixTree::SetTarget( - absl::string_view word, - std::optional new_target) { +void RadixTree::SetTarget(absl::string_view word, + std::optional new_target) { CHECK(!word.empty()) << "Can't add the target for an empty word"; std::deque node_path = GetOrCreateWordPath(word); Node* n = node_path.back(); @@ -360,7 +357,8 @@ std::optional RadixTree::MutateTarget( } template -std::deque::Node*> RadixTree::GetOrCreateWordPath(absl::string_view word) { +std::deque::Node*> +RadixTree::GetOrCreateWordPath(absl::string_view word) { Node* n = &root_; absl::string_view remaining = word; std::deque node_path{n}; diff --git a/src/indexes/text/text_index.cc b/src/indexes/text/text_index.cc index cba0ab0b..9afe52cf 100644 --- a/src/indexes/text/text_index.cc +++ b/src/indexes/text/text_index.cc @@ -160,37 +160,38 @@ absl::StatusOr TextIndexSchema::IndexAttributeData( } void TextIndexSchema::DeleteKeyData(const InternedStringPtr& key) { - TextIndex* key_index = nullptr; - { - std::lock_guard per_key_guard(per_key_text_indexes_mutex_); - auto it = per_key_text_indexes_.find(key); - if (it != per_key_text_indexes_.end()) { - key_index = &it->second; - } + TextIndex* key_index = nullptr; + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + auto it = per_key_text_indexes_.find(key); + if (it != per_key_text_indexes_.end()) { + key_index = &it->second; } + } - if (key_index) { - std::lock_guard main_tree_guard(text_index_->mutex_); - - // Cleanup schema-level text index - auto iter = key_index->prefix_.GetWordIterator(""); - while (!iter.Done()) { - std::string_view word = iter.GetWord(); - std::optional> new_target = text_index_->prefix_.MutateTarget(word, [&](auto existing) { - return RemoveKeyFromPostings(existing, key); - }); - if (text_index_->suffix_.has_value()) { - std::string reverse_word(word.rbegin(), word.rend()); - text_index_->suffix_.value().SetTarget(reverse_word, new_target); - } - iter.Next(); + if (key_index) { + std::lock_guard main_tree_guard(text_index_->mutex_); + + // Cleanup schema-level text index + auto iter = key_index->prefix_.GetWordIterator(""); + while (!iter.Done()) { + std::string_view word = iter.GetWord(); + std::optional> new_target = + text_index_->prefix_.MutateTarget(word, [&](auto existing) { + return RemoveKeyFromPostings(existing, key); + }); + if (text_index_->suffix_.has_value()) { + std::string reverse_word(word.rbegin(), word.rend()); + text_index_->suffix_.value().SetTarget(reverse_word, new_target); } + iter.Next(); } + } - { - std::lock_guard per_key_guard(per_key_text_indexes_mutex_); - per_key_text_indexes_.erase(key); - } + { + std::lock_guard per_key_guard(per_key_text_indexes_mutex_); + per_key_text_indexes_.erase(key); } +} } // namespace valkey_search::indexes::text diff --git a/src/indexes/text/text_index.h b/src/indexes/text/text_index.h index fd857de1..9f9caa41 100644 --- a/src/indexes/text/text_index.h +++ b/src/indexes/text/text_index.h @@ -19,10 +19,9 @@ #include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "src/index_schema.pb.h" +#include "src/indexes/text/lexer.h" #include "src/indexes/text/posting.h" #include "src/indexes/text/radix_tree.h" -#include "src/indexes/text/lexer.h" - struct sb_stemmer; @@ -57,8 +56,10 @@ class TextIndexSchema { bool with_offsets, const std::vector& stop_words); - absl::StatusOr IndexAttributeData(const InternedStringPtr& key, absl::string_view data, size_t text_field_number, - bool stem, size_t min_stem_size, bool suffix); + absl::StatusOr IndexAttributeData(const InternedStringPtr& key, + absl::string_view data, + size_t text_field_number, bool stem, + size_t min_stem_size, bool suffix); void DeleteKeyData(const InternedStringPtr& key); uint8_t AllocateTextFieldNumber() { return num_text_fields_++; } diff --git a/testing/lexer_test.cc b/testing/lexer_test.cc index 8dcb9179..79572644 100644 --- a/testing/lexer_test.cc +++ b/testing/lexer_test.cc @@ -78,8 +78,8 @@ TEST_P(LexerParameterizedTest, TokenizeTest) { auto result = lexer_->Tokenize(test_case.input, schema->GetPunctuationBitmap(), - test_case.stemming_enabled, - test_case.min_stem_size, schema->GetStopWordsSet()); + test_case.stemming_enabled, test_case.min_stem_size, + schema->GetStopWordsSet()); ASSERT_TRUE(result.ok()) << "Test case: " << test_case.description; EXPECT_EQ(*result, test_case.expected) @@ -167,10 +167,9 @@ INSTANTIATE_TEST_SUITE_P( // Separate tests for error cases and special scenarios TEST_F(LexerTest, InvalidUTF8) { std::string invalid_utf8 = "hello \xFF\xFE world"; - auto result = - lexer_->Tokenize(invalid_utf8, text_schema_->GetPunctuationBitmap(), - stemming_enabled_, - min_stem_size_, text_schema_->GetStopWordsSet()); + auto result = lexer_->Tokenize( + invalid_utf8, text_schema_->GetPunctuationBitmap(), stemming_enabled_, + min_stem_size_, text_schema_->GetStopWordsSet()); EXPECT_FALSE(result.ok()); EXPECT_EQ(result.status().code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(result.status().message(), "Invalid UTF-8"); @@ -178,10 +177,9 @@ TEST_F(LexerTest, InvalidUTF8) { TEST_F(LexerTest, LongWord) { std::string long_word(1000, 'a'); - auto result = - lexer_->Tokenize(long_word, text_schema_->GetPunctuationBitmap(), - stemming_enabled_, - min_stem_size_, text_schema_->GetStopWordsSet()); + auto result = lexer_->Tokenize( + long_word, text_schema_->GetPunctuationBitmap(), stemming_enabled_, + min_stem_size_, text_schema_->GetStopWordsSet()); ASSERT_TRUE(result.ok()); EXPECT_EQ(*result, std::vector({long_word})); } @@ -197,10 +195,9 @@ TEST_F(LexerTest, EmptyStopWordsHandling) { const auto& empty_set = no_stop_schema->GetStopWordsSet(); // Test tokenization with empty stop words - all words preserved - auto result = - lexer_->Tokenize("Hello, world! TESTING 123 with-dashes and/or symbols", - no_stop_schema->GetPunctuationBitmap(), - true, 3, empty_set); + auto result = lexer_->Tokenize( + "Hello, world! TESTING 123 with-dashes and/or symbols", + no_stop_schema->GetPunctuationBitmap(), true, 3, empty_set); ASSERT_TRUE(result.ok()); EXPECT_EQ(*result, diff --git a/testing/radix_test.cc b/testing/radix_test.cc index 4edeeb87..1cffa504 100644 --- a/testing/radix_test.cc +++ b/testing/radix_test.cc @@ -36,7 +36,8 @@ class RadixTreeTest : public vmsdk::ValkeyTest { void AddWords(const std::vector>& words) { for (const auto& [word, value] : words) { - prefix_tree_->MutateTarget(word, [value](auto) { return TestTarget(value); }); + prefix_tree_->MutateTarget(word, + [value](auto) { return TestTarget(value); }); } } From 49378e47c09ecddc3233bb8de2ec4b85cfa82cb8 Mon Sep 17 00:00:00 2001 From: Brennan Cathcart Date: Tue, 14 Oct 2025 19:54:52 +0000 Subject: [PATCH 7/7] Address PR comments and failing analyzers Signed-off-by: Brennan Cathcart --- .config/typos.toml | 1 + integration/utils.py | 2 +- src/indexes/text/radix_tree.h | 16 +++++++++------- src/indexes/text/text_index.cc | 4 ++++ 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/.config/typos.toml b/.config/typos.toml index 71c52e31..c98ba77c 100644 --- a/.config/typos.toml +++ b/.config/typos.toml @@ -21,6 +21,7 @@ CrEaTe = "CrEaTe" LiSt = "LiSt" DeBuG = "DeBuG" DrOpInDeX = "DrOpInDeX" +updat = "updat" # Used for stem matching [type.cpp] extend-ignore-re = [ diff --git a/integration/utils.py b/integration/utils.py index 788ec879..6ce3774b 100644 --- a/integration/utils.py +++ b/integration/utils.py @@ -73,4 +73,4 @@ def check_all_nodes_complete(): for client in clients ) - wait_for_true(check_all_nodes_complete) + waiters.wait_for_true(check_all_nodes_complete) diff --git a/src/indexes/text/radix_tree.h b/src/indexes/text/radix_tree.h index 71e367a5..a04db1de 100644 --- a/src/indexes/text/radix_tree.h +++ b/src/indexes/text/radix_tree.h @@ -88,11 +88,12 @@ struct RadixTree { RadixTree() = default; // - // Adds a target to the for the given word. If the target already exists, - // it will be replaced by the new target. An empty target will cause the - // word to be deleted from the tree. + // Adds the target for the given word, replacing the existing target + // if there is one. Providing an empty target will cause the word to be + // deleted from the tree. Only use this API when you don't care about any + // existing target. // - // This function is explicitly multi-thread safe and is + // (TODO) This function is explicitly multi-thread safe and is // designed to allow other mutations to be performed on other words and // targets simultaneously, with minimal collisions. // @@ -104,16 +105,17 @@ struct RadixTree { void SetTarget(absl::string_view word, std::optional new_target); // - // Applies the mutation function to the target of the given word to generate + // Applies the mutation function to the current target of the word to generate // a new target. If the word doesn't already exist, a path for it will be - // first added to the tree. The new target is also returned to the caller. + // first added to the tree and the target will be std::nullopt. The new target + // is returned to the caller. // // The input parameter to the mutate function will be nullopt if there is no // entry for this word. Otherwise it will contain the value for this word. The // return value of the mutate function is the new value for this word. if the // return value is nullopt then this word is deleted from the RadixTree. // - // This function is explicitly multi-thread safe and is + // (TODO) This function is explicitly multi-thread safe and is // designed to allow other mutations to be performed on other words and // targets simultaneously, with minimal collisions. // diff --git a/src/indexes/text/text_index.cc b/src/indexes/text/text_index.cc index 9afe52cf..af95ef8a 100644 --- a/src/indexes/text/text_index.cc +++ b/src/indexes/text/text_index.cc @@ -114,6 +114,10 @@ absl::StatusOr TextIndexSchema::IndexAttributeData( key_index = &per_key_text_indexes_[key]; } + // TODO: Once we optimize the postings object for space efficiency, it won't + // be cheap to incrementally update. We likely want to build the position map + // structure up front for each word in the key and then merge them into the + // trees' posting objects at the end of the key ingestion. for (uint32_t position = 0; position < tokens->size(); ++position) { const auto& token = (*tokens)[position]; const std::optional reverse_token =