diff --git a/src/VecSim/algorithms/hnsw/graph_data.h b/src/VecSim/algorithms/hnsw/graph_data.h index 396e21227..3d3bc5753 100644 --- a/src/VecSim/algorithms/hnsw/graph_data.h +++ b/src/VecSim/algorithms/hnsw/graph_data.h @@ -30,6 +30,7 @@ struct ElementLevelData { : incomingUnidirectionalEdges(new (allocator) vecsim_stl::vector(allocator)), totalIncomingLinks(0), numLinks(0) {} + /*************************** Getters ********************************/ linkListSize getNumLinks() const { return this->numLinks; } idType getLinkAtPos(size_t pos) const { assert(pos < numLinks); @@ -38,11 +39,14 @@ struct ElementLevelData { const vecsim_stl::vector &getIncomingEdges() const { return *incomingUnidirectionalEdges; } + size_t inDegree() const { return totalIncomingLinks; } std::vector copyLinks() { std::vector links_copy; links_copy.assign(links, links + numLinks); return links_copy; } + + /************************************ Setters ****************************/ // Sets the outgoing links of the current element. // Assumes that the object has the capacity to hold all the links. void setLinks(vecsim_stl::vector &links) { diff --git a/src/VecSim/algorithms/hnsw/hnsw.h b/src/VecSim/algorithms/hnsw/hnsw.h index 6615a06b5..f48727181 100644 --- a/src/VecSim/algorithms/hnsw/hnsw.h +++ b/src/VecSim/algorithms/hnsw/hnsw.h @@ -49,12 +49,19 @@ template using candidatesLabelsMaxHeap = vecsim_stl::abstract_priority_queue; using graphNodeType = pair; // represented as: (element_id, level) +class hashForPair { +public: + size_t operator()(const graphNodeType &x) const { + return std::hash()(x.first) ^ std::hash()(x.second); + } +}; + ////////////////////////////////////// Auxiliary HNSW structs ////////////////////////////////////// // Vectors flags (for marking a specific vector) typedef enum { DELETE_MARK = 0x1, // element is logically deleted, but still exists in the graph - IN_PROCESS = 0x2, // element is being inserted into the graph + IN_PROCESS = 0x2, // element is being inserted/reinserted into the graph } Flags; // The state of the index and the newly inserted vector to be passed into addVector API in case that @@ -112,6 +119,8 @@ class HNSWIndex : public VecSimIndexAbstract, size_t curElementCount; idType entrypointNode; size_t maxLevel; // this is the top level of the entry point's element + vecsim_stl::unordered_set unreachableNodes; + vecsim_stl::unordered_set hardUnreachableNodes; // Index data vecsim_stl::vector vectorBlocks; @@ -122,6 +131,7 @@ class HNSWIndex : public VecSimIndexAbstract, // This is mutable since the object changes upon search operations as well (which are const). mutable VisitedNodesHandlerPool visitedNodesHandlerPool; mutable std::shared_mutex indexDataGuard; + std::mutex unreachableNodesGuard; #ifdef BUILD_TESTS #include "VecSim/algorithms/hnsw/hnsw_base_tests_friends.h" @@ -144,7 +154,6 @@ class HNSWIndex : public VecSimIndexAbstract, tag_t *elements_tags, tag_t visited_tag, std::unique_ptr &top_candidates, candidatesMaxHeap &candidate_set, DistType lowerBound, DistType radius) const; - template candidatesMaxHeap searchLayer(idType ep_id, const void *data_point, size_t layer, size_t ef) const; template @@ -156,13 +165,13 @@ class HNSWIndex : public VecSimIndexAbstract, searchRangeBottomLayer_WithTimeout(idType ep_id, const void *data_point, double epsilon, DistType radius, void *timeoutCtx, VecSimQueryReply_Code *rc) const; - idType getNeighborsByHeuristic2(candidatesList &top_candidates, size_t M) const; + idType getNeighborsByHeuristic2(candidatesList &top_candidates, size_t M, float alpha = 1.0f) const; void getNeighborsByHeuristic2(candidatesList &top_candidates, size_t M, vecsim_stl::vector ¬_chosen_candidates) const; template void getNeighborsByHeuristic2_internal( candidatesList &top_candidates, size_t M, - vecsim_stl::vector *removed_candidates = nullptr) const; + vecsim_stl::vector *removed_candidates = nullptr, float alpha = 1.0f) const; // Helper function for re-selecting node's neighbors which was selected as a neighbor for // a newly inserted node. Also, responsible for mutually connect the new node and the neighbor // (unidirectional or bidirectional connection). @@ -170,9 +179,12 @@ class HNSWIndex : public VecSimIndexAbstract, void revisitNeighborConnections(size_t level, idType new_node_id, const std::pair &neighbor_data, ElementLevelData &new_node_level, - ElementLevelData &neighbor_level); + ElementLevelData &neighbor_level, bool &unreachable); idType mutuallyConnectNewElement(idType new_node_id, candidatesMaxHeap &top_candidates, size_t level); + template + size_t mutuallyReconnectElement(idType node_id, candidatesMaxHeap &top_candidates, + size_t level); void mutuallyUpdateForRepairedNode(idType node_id, size_t level, vecsim_stl::vector &neighbors_to_remove, vecsim_stl::vector &nodes_to_update, @@ -213,6 +225,11 @@ class HNSWIndex : public VecSimIndexAbstract, template void removeAndSwap(idType internalId); + template + vecsim_stl::unordered_set fetchAndClearUnreachableNodes(); + template + void setUnreachableNode(const graphNodeType &node); + size_t getVectorRelativeIndex(idType id) const { return id % this->blockSize; } // Flagging API @@ -230,6 +247,8 @@ class HNSWIndex : public VecSimIndexAbstract, } void mutuallyRemoveNeighborAtPos(ElementLevelData &node_level, size_t level, idType node_id, size_t pos); + template + void reinsertElementToGraphAtLevel(idType element_id, size_t level_to_insert); public: HNSWIndex(const HNSWParams *params, const AbstractIndexInitParams &abstractInitParams, @@ -262,6 +281,8 @@ class HNSWIndex : public VecSimIndexAbstract, void lockNodeLinks(ElementGraphData *node_data) const; void unlockNodeLinks(ElementGraphData *node_data) const; VisitedNodesHandler *getVisitedList() const; + template + void connectUnreachableNodes(); void returnVisitedList(VisitedNodesHandler *visited_nodes_handler) const; VecSimIndexInfo info() const override; VecSimIndexBasicInfo basicInfo() const override; @@ -282,7 +303,6 @@ class HNSWIndex : public VecSimIndexAbstract, void markDeletedInternal(idType internalId); bool isMarkedDeleted(idType internalId) const; bool isInProcess(idType internalId) const; - void unmarkInProcess(idType internalId); AddVectorCtx storeNewElement(labelType label, const void *vector_data); void removeAndSwapDeletedElement(idType internalId); void repairNodeConnections(idType node_id, size_t level); @@ -412,6 +432,47 @@ ElementLevelData &HNSWIndex::getElementLevelData(idType inte return getGraphDataByInternalId(internal_id)->getElementLevelData(level, this->levelDataSize); } +template +template +vecsim_stl::unordered_set +HNSWIndex::fetchAndClearUnreachableNodes() { + std::unique_lock lock(unreachableNodesGuard); + auto unreachable_copy = force ? hardUnreachableNodes : unreachableNodes; + force ? hardUnreachableNodes.clear() : unreachableNodes.clear(); + return unreachable_copy; +} + +template +template +void HNSWIndex::setUnreachableNode(const graphNodeType &node) { + if (isMarkedDeleted(node.first)) + return; + std::unique_lock lock(unreachableNodesGuard); + if (force) { + this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Element %zu still unreachable in " + "level %zu after trying to reinsert - setting as permanent unreachable", + node.first, node.second); + hardUnreachableNodes.insert(node); + } else { + this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Element %zu is unreachable in level %zu", + node.first, node.second); + unreachableNodes.insert(node); + } +} + +template +template +void HNSWIndex::connectUnreachableNodes() { + auto nodes_to_connect = fetchAndClearUnreachableNodes(); + if (nodes_to_connect.empty()) + return; + this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Try to reinsert %zu nodes to the graph %s", + nodes_to_connect.size(), force ? "- hard unreachable nodes" : ""); + for (auto node : nodes_to_connect) { + reinsertElementToGraphAtLevel(node.first, node.second); + } +} + template ElementLevelData &HNSWIndex::getElementLevelData(ElementGraphData *graph_data, size_t level) const { @@ -438,6 +499,8 @@ void HNSWIndex::markDeletedInternal(idType internalId) { if (internalId == entrypointNode) { // Internally, we hold and release the entrypoint neighbors lock. replaceEntryPoint(); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, + "New entry point due to deletion is %zu", entrypointNode); } // Atomically set the deletion mark flag (note that other parallel threads may set the flags // at the same time (for changing the IN_PROCESS flag). @@ -456,13 +519,6 @@ bool HNSWIndex::isInProcess(idType internalId) const { return isMarkedAs(internalId); } -template -void HNSWIndex::unmarkInProcess(idType internalId) { - // Atomically unset the IN_PROCESS mark flag (note that other parallel threads may set the flags - // at the same time (for marking the element with IN_PROCCESS flag). - unmarkAs(internalId); -} - template void HNSWIndex::lockIndexDataGuard() const { indexDataGuard.lock(); @@ -506,7 +562,6 @@ void HNSWIndex::unlockNodeLinks(idType node_id) const { /** * helper functions */ - template void HNSWIndex::emplaceToHeap( vecsim_stl::abstract_priority_queue &heap, DistType dist, idType id) const { @@ -676,7 +731,6 @@ void HNSWIndex::processCandidate_RangeSearch( } template -template candidatesMaxHeap HNSWIndex::searchLayer(idType ep_id, const void *data_point, size_t layer, size_t ef) const { @@ -688,7 +742,7 @@ HNSWIndex::searchLayer(idType ep_id, const void *data_point, candidatesMaxHeap candidate_set(this->allocator); DistType lowerBound; - if (!has_marked_deleted || !isMarkedDeleted(ep_id)) { + if (!isMarkedDeleted(ep_id)) { DistType dist = this->distFunc(data_point, getDataByInternalId(ep_id), this->dim); lowerBound = dist; top_candidates.emplace(dist, ep_id); @@ -708,9 +762,9 @@ HNSWIndex::searchLayer(idType ep_id, const void *data_point, } candidate_set.pop(); - processCandidate(curr_el_pair.second, data_point, layer, ef, - visited_nodes_handler->getElementsTags(), visited_tag, - top_candidates, candidate_set, lowerBound); + processCandidate(curr_el_pair.second, data_point, layer, ef, + visited_nodes_handler->getElementsTags(), visited_tag, + top_candidates, candidate_set, lowerBound); } returnVisitedList(visited_nodes_handler); @@ -720,13 +774,13 @@ HNSWIndex::searchLayer(idType ep_id, const void *data_point, template idType HNSWIndex::getNeighborsByHeuristic2(candidatesList &top_candidates, - const size_t M) const { + const size_t M, float alpha) const { if (top_candidates.size() < M) { return std::min_element(top_candidates.begin(), top_candidates.end(), [](const auto &a, const auto &b) { return a.first < b.first; }) ->second; } - getNeighborsByHeuristic2_internal(top_candidates, M, nullptr); + getNeighborsByHeuristic2_internal(top_candidates, M, nullptr, alpha); return top_candidates.front().second; } @@ -741,7 +795,7 @@ template template void HNSWIndex::getNeighborsByHeuristic2_internal( candidatesList &top_candidates, const size_t M, - vecsim_stl::vector *removed_candidates) const { + vecsim_stl::vector *removed_candidates, float alpha) const { if (top_candidates.size() < M) { return; } @@ -770,7 +824,7 @@ void HNSWIndex::getNeighborsByHeuristic2_internal( for (size_t i = 0; i < return_list.size(); i++) { DistType candidate_to_selected_dist = this->distFunc(cached_vectors[i], curr_vector, this->dim); - if (candidate_to_selected_dist < candidate_to_query_dist) { + if (alpha * candidate_to_selected_dist < candidate_to_query_dist) { if constexpr (record_removed) { removed_candidates->push_back(current_pair->second); } @@ -796,7 +850,7 @@ void HNSWIndex::getNeighborsByHeuristic2_internal( template void HNSWIndex::revisitNeighborConnections( size_t level, idType new_node_id, const std::pair &neighbor_data, - ElementLevelData &new_node_level, ElementLevelData &neighbor_level) { + ElementLevelData &new_node_level, ElementLevelData &neighbor_level, bool &unreachable) { // Note - expect that node_lock and neighbor_lock are locked at that point. // Collect the existing neighbors and the new node as the neighbor's neighbors candidates. @@ -856,6 +910,7 @@ void HNSWIndex::revisitNeighborConnections( // as is. neighbor_level.setLinkAtPos(neighbour_neighbours_idx++, neighbor_level.getLinkAtPos(i)); update_cur_node_required = false; + unreachable = false; continue; } // Now we know that we are looking at a node to be removed from the neighbor's neighbors. @@ -871,6 +926,7 @@ void HNSWIndex::revisitNeighborConnections( // connection is mutual - both new node and the selected neighbor in each other's list. neighbor_level.setLinkAtPos(neighbour_neighbours_idx++, new_node_id); new_node_level.increaseTotalIncomingEdgesNum(); + unreachable = false; } else { // unidirectional connection - put the new node in the neighbour's incoming edges. neighbor_level.newIncomingUnidirectionalEdge(new_node_id); @@ -883,6 +939,140 @@ void HNSWIndex::revisitNeighborConnections( } } +template +template +size_t HNSWIndex::mutuallyReconnectElement( + idType node_id, candidatesMaxHeap &top_candidates, size_t level) { + + size_t max_M_cur = level ? M : M0; + candidatesList selected_neighbors_cands(this->allocator); + vecsim_stl::vector nodes_to_update(this->allocator); + + auto *node_graph_data = getGraphDataByInternalId(node_id); + lockNodeLinks(node_graph_data); + auto &node_level_data = getElementLevelData(node_graph_data, level); + for (size_t i = 0; i < node_level_data.getNumLinks(); i++) { + nodes_to_update.push_back(node_level_data.getLinkAtPos(i)); + } + unlockNodeLinks(node_graph_data); + + selected_neighbors_cands.insert(selected_neighbors_cands.end(), top_candidates.begin(), + top_candidates.end()); + float alpha = force ? 1.5f : 1.2f; // we relax the heuristcs to get more neighbors + getNeighborsByHeuristic2(selected_neighbors_cands, M, alpha); + assert(selected_neighbors_cands.size() <= M && + "Should not be more than M candidates returned by the heuristic"); + + vecsim_stl::vector selected_neighbors(selected_neighbors_cands.size(), this->allocator); + std::transform(selected_neighbors_cands.begin(), selected_neighbors_cands.end(), + selected_neighbors.begin(), [](pair el) { return el.second; }); + + // Sort the selected neighbors set for fast lookup. + std::sort(selected_neighbors.begin(), selected_neighbors.end()); + + // Remove duplications in advanced here - if a selected neighbors already exists in this set. + std::vector ids_to_remove; + for (idType node : nodes_to_update) { + if (std::binary_search(selected_neighbors.begin(), selected_neighbors.end(), node)) { + ids_to_remove.push_back(node); + } + } + for (idType id_to_remove : ids_to_remove) { + nodes_to_update.remove(id_to_remove); + } + nodes_to_update.insert(nodes_to_update.end(), selected_neighbors.begin(), + selected_neighbors.end()); + nodes_to_update.push_back(node_id); + + // Acquire the required locks for the updates, after sorting the nodes to update + // (to avoid deadlocks) + std::sort(nodes_to_update.begin(), nodes_to_update.end()); + size_t nodes_to_update_count = nodes_to_update.size(); + for (size_t i = 0; i < nodes_to_update_count; i++) { + lockNodeLinks(nodes_to_update[i]); + } + + // Try to make mutuall connection for the current node neighbors that were found. + node_level_data = getElementLevelData(node_graph_data, level); + for (size_t i = 0; i < node_level_data.getNumLinks(); i++) { + idType neighbor = node_level_data.getLinkAtPos(i); + if (!std::binary_search(nodes_to_update.begin(), nodes_to_update.end(), neighbor)) { + // This neighbor was added in the meantime while we release the lock - we cannot + // perofrm mutuall update. + continue; + } + if (std::find(selected_neighbors.begin(), selected_neighbors.end(), neighbor) != + selected_neighbors.end()) { + // Neighbor already exists, no need to connect it later on. + selected_neighbors.remove(neighbor); + } + // Connect the neighbor back if it has the capacity, and... + auto &neighbor_data = getElementLevelData(neighbor, level); + if (neighbor_data.getNumLinks() < max_M_cur && !isMarkedDeleted(node_id) && + !isMarkedDeleted(neighbor)) { + // The edge was unidirectional (no exisiting outgoing edge from the neighbor to the + // node) + if (neighbor_data.removeIncomingUnidirectionalEdgeIfExists(node_id)) { + neighbor_data.appendLink(node_id); + node_level_data.increaseTotalIncomingEdgesNum(); + } + } + } + + // Go over the chosen new neighbors that are not connected yet and perform mutuall updates + // if possible. + for (idType chosen_id : selected_neighbors) { + if (isMarkedDeleted(node_id)) { + // don't add neighbors from/to a deleted node + break; + } + // If this specific new neighbor is deleted, we don't add this connection and continue. + // Also, don't add a new node whose being indexed in parallel, as it may choose this node + // as its neighbor and create a double connection (then this node will have a duplicate + // neighbor). + if (isMarkedDeleted(chosen_id) || isInProcess(chosen_id)) { + continue; + } + auto &chosen_node_level_data = getElementLevelData(chosen_id, level); + // Perform mutuall or exclusive unidriectional connectoins accordin to degree limitations. + if (node_level_data.getNumLinks() < max_M_cur) { + node_level_data.appendLink(chosen_id); + chosen_node_level_data.increaseTotalIncomingEdgesNum(); + if (chosen_node_level_data.getNumLinks() < max_M_cur) { + // Unless the chosen new neighbor was already pointing the node id, connect them + // now. + if (!node_level_data.removeIncomingUnidirectionalEdgeIfExists(chosen_id)) { + chosen_node_level_data.appendLink(node_id); + node_level_data.increaseTotalIncomingEdgesNum(); + } // else - the neighbors is already pointing to the node and the undirectional + // edge has been removed. + } else if (!node_level_data.removeIncomingUnidirectionalEdgeIfExists(chosen_id)) { + // The neighbor cannot add the node to its neighbors (and it was not there before) - + // so we update the unidirectional incoming edges. + chosen_node_level_data.newIncomingUnidirectionalEdge(node_id); + } + } else if (chosen_node_level_data.getNumLinks() < max_M_cur) { + // We can only connect the chosen neighbor to the node as a unidirectional link, if it + // was not already exist. + auto &incoming_uni_edges = node_level_data.getIncomingEdges(); + if (std::find(incoming_uni_edges.begin(), incoming_uni_edges.end(), chosen_id) == + incoming_uni_edges.end()) { + chosen_node_level_data.appendLink(node_id); + node_level_data.increaseTotalIncomingEdgesNum(); + node_level_data.newIncomingUnidirectionalEdge(chosen_id); + } // nothing to do otherwise - edge already exists. + } + } + size_t ret = node_level_data.inDegree(); + this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, + "Node %d now holds %zu links and %zu incoming links", node_id, + node_level_data.getNumLinks(), ret); + for (size_t i = 0; i < nodes_to_update_count; i++) { + unlockNodeLinks(nodes_to_update[i]); + } + return ret; +} + template idType HNSWIndex::mutuallyConnectNewElement( idType new_node_id, candidatesMaxHeap &top_candidates, size_t level) { @@ -898,23 +1088,25 @@ idType HNSWIndex::mutuallyConnectNewElement( // Use the heuristic to filter the top candidates, and get the next closest entry point. idType next_closest_entry_point = getNeighborsByHeuristic2(top_candidates_list, M); assert(top_candidates_list.size() <= M && - "Should be not be more than M candidates returned by the heuristic"); + "Should not be more than M candidates returned by the heuristic"); - auto *new_node_level = getGraphDataByInternalId(new_node_id); - ElementLevelData &new_node_level_data = getElementLevelData(new_node_level, level); + auto *new_node_graph_data = getGraphDataByInternalId(new_node_id); + ElementLevelData &new_node_level_data = getElementLevelData(new_node_graph_data, level); assert(new_node_level_data.getNumLinks() == 0 && "The newly inserted element should have blank link list"); + bool unreachable = true; for (auto &neighbor_data : top_candidates_list) { idType selected_neighbor = neighbor_data.second; // neighbor's id auto *neighbor_graph_data = getGraphDataByInternalId(selected_neighbor); if (new_node_id < selected_neighbor) { - lockNodeLinks(new_node_level); + lockNodeLinks(new_node_graph_data); lockNodeLinks(neighbor_graph_data); } else { lockNodeLinks(neighbor_graph_data); - lockNodeLinks(new_node_level); + lockNodeLinks(new_node_graph_data); } + new_node_level_data = getElementLevelData(new_node_graph_data, level); // validations... assert(new_node_level_data.getNumLinks() <= max_M_cur && "Neighbors number exceeds limit"); @@ -926,14 +1118,14 @@ idType HNSWIndex::mutuallyConnectNewElement( // The new node cannot add more neighbors this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Couldn't add all chosen neighbors upon inserting a new node"); - unlockNodeLinks(new_node_level); + unlockNodeLinks(new_node_graph_data); unlockNodeLinks(neighbor_graph_data); break; } // If one of the two nodes has already deleted - skip the operation. if (isMarkedDeleted(new_node_id) || isMarkedDeleted(selected_neighbor)) { - unlockNodeLinks(new_node_level); + unlockNodeLinks(new_node_graph_data); unlockNodeLinks(neighbor_graph_data); continue; } @@ -947,8 +1139,9 @@ idType HNSWIndex::mutuallyConnectNewElement( neighbor_level_data.increaseTotalIncomingEdgesNum(); neighbor_level_data.appendLink(new_node_id); new_node_level_data.increaseTotalIncomingEdgesNum(); - unlockNodeLinks(new_node_level); + unlockNodeLinks(new_node_graph_data); unlockNodeLinks(neighbor_graph_data); + unreachable = false; continue; } @@ -956,7 +1149,10 @@ idType HNSWIndex::mutuallyConnectNewElement( // We collect all the existing neighbors and the new node as candidates, and mutually update // the neighbor's neighbors. We also release the acquired locks inside this call. revisitNeighborConnections(level, new_node_id, neighbor_data, new_node_level_data, - neighbor_level_data); + neighbor_level_data, unreachable); + } + if (unreachable) { + this->setUnreachableNode(std::make_pair(new_node_id, level)); } return next_closest_entry_point; } @@ -1020,6 +1216,9 @@ void HNSWIndex::repairConnectionsForDeletion( } // anyway update the incoming nodes counter. node_level_data.decreaseTotalIncomingEdgesNum(); + if (node_level_data.inDegree() == 0) { + this->setUnreachableNode(std::make_pair(node_id, level)); + } } } } else { @@ -1191,6 +1390,18 @@ void HNSWIndex::SwapLastIdWithDeletedId(idType element_inter this->idToMetaData[element_internal_id] = this->idToMetaData[curElementCount]; + // Update the unreachable nodes conrainer for every level in which an unreachable node exists. + for (size_t l = 0; l <= last_element->toplevel; l++) { + if (this->unreachableNodes.contains({curElementCount, l})) { + this->unreachableNodes.erase({curElementCount, l}); + this->unreachableNodes.insert(std::make_pair(element_internal_id, l)); + } + if (this->hardUnreachableNodes.contains({curElementCount, l})) { + this->hardUnreachableNodes.erase({curElementCount, l}); + this->hardUnreachableNodes.insert(std::make_pair(element_internal_id, l)); + } + } + if (curElementCount == this->entrypointNode) { this->entrypointNode = element_internal_id; } @@ -1423,7 +1634,6 @@ void HNSWIndex::mutuallyUpdateForRepairedNode( template void HNSWIndex::repairNodeConnections(idType node_id, size_t level) { - vecsim_stl::vector neighbors_candidate_ids(this->allocator); // Use bitmaps for fast accesses: // node_orig_neighbours_set is used to differentiate between the neighbors that will *not* be @@ -1547,6 +1757,9 @@ void HNSWIndex::mutuallyRemoveNeighborAtPos(ElementLevelData node_level.newIncomingUnidirectionalEdge(removed_node); } removed_node_level.decreaseTotalIncomingEdgesNum(); + if (removed_node_level.inDegree() == 0) { + this->setUnreachableNode(std::make_pair(removed_node, level)); + } } template @@ -1577,11 +1790,69 @@ void HNSWIndex::insertElementToGraph(idType element_id, for (auto level = static_cast(max_common_level); level >= 0; level--) { candidatesMaxHeap top_candidates = - searchLayer(curr_element, vector_data, level, efConstruction); - curr_element = mutuallyConnectNewElement(element_id, top_candidates, level); + searchLayer(curr_element, vector_data, level, efConstruction); + // If the entry point was marked deleted between iterations, we may recieve an empty + // candidates set. + if (!top_candidates.empty()) { + curr_element = mutuallyConnectNewElement(element_id, top_candidates, level); + } else { + // Node still has no neighbors - it is defintly unreachable + this->setUnreachableNode(std::make_pair(element_id, level)); + } } } +template +template +void HNSWIndex::reinsertElementToGraphAtLevel(idType element_id, + size_t level_to_insert) { + + auto [entry_point, max_level] = this->safeGetEntryPointState(); + if (element_id == entry_point || entry_point == INVALID_ID || isMarkedDeleted(element_id)) { + return; // entry point is always reachable, no need to connect deleted element + } + if (this->isInProcess(element_id)) { + return; // If it being inserted - no need to reconnect, and if it is being reconnected by + // another thread, there is no need for someelse to this job as well. + } + this->markAs(element_id); + DistType cur_dist = std::numeric_limits::max(); + const void *vector_data = this->getDataByInternalId(element_id); + idType curr_element = entry_point; + cur_dist = this->distFunc(vector_data, getDataByInternalId(curr_element), this->dim); + for (auto level = max_level; level > level_to_insert; --level) { + // this is done for the levels which are above the level to insert + // to which we are going to insert the new element. We do + // a greedy search in the graph starting from the entry point + // at each level, and move on with the closest element we can find. + // When there is no improvement to do, we take a step down. + greedySearchLevel(vector_data, level, curr_element, cur_dist); + } + + lockNodeLinks(element_id); + auto &node_data = getElementLevelData(element_id, level_to_insert); + bool unreachable = node_data.inDegree() == 0; + unlockNodeLinks(element_id); + if (!unreachable) { + this->unmarkAs(element_id); + return; // node is no longer unreachable, we can skip on reinserting it. + } + + candidatesMaxHeap top_candidates = + searchLayer(curr_element, vector_data, level_to_insert, efConstruction); + if (top_candidates.empty()) { + // No candidates found (entry point been deleted in the meantime). + this->setUnreachableNode(std::make_pair(element_id, level_to_insert)); + } else { + size_t ret = mutuallyReconnectElement(element_id, top_candidates, level_to_insert); + if (ret == 0) { + // Node is still unreachable with 0 inDegree. + this->setUnreachableNode(std::make_pair(element_id, level_to_insert)); + } + } + this->unmarkAs(element_id); +} + /** * Ctor / Dtor */ @@ -1602,8 +1873,9 @@ HNSWIndex::HNSWIndex(const HNSWParams *params, size_t random_seed, size_t pool_initial_size) : VecSimIndexAbstract(abstractInitParams), VecSimIndexTombstone(), maxElements(RoundUpInitialCapacity(params->initialCapacity, this->blockSize)), - vectorBlocks(this->allocator), graphDataBlocks(this->allocator), - idToMetaData(maxElements, this->allocator), + unreachableNodes(this->allocator), hardUnreachableNodes(this->allocator), +vectorBlocks(this->allocator), + graphDataBlocks(this->allocator), idToMetaData(maxElements, this->allocator), visitedNodesHandlerPool(pool_initial_size, maxElements, this->allocator) { M = params->M ? params->M : HNSW_DEFAULT_M; @@ -1672,9 +1944,18 @@ void HNSWIndex::removeAndSwap(idType internalId) { // this point (after all the repair jobs are done). neighbour.removeIncomingUnidirectionalEdgeIfExists(internalId); neighbour.decreaseTotalIncomingEdgesNum(); + if (neighbour.inDegree() == 0) { + this->setUnreachableNode(std::make_pair(cur_level.getLinkAtPos(i), level)); + } } } + // Remove it from the unreachable nodes container for every level in which it exists. + for (size_t l = 0; l <= element->toplevel; l++) { + this->unreachableNodes.erase({internalId, l}); + this->hardUnreachableNodes.erase({internalId, l}); + } + // Free the element's resources element->destroy(this->levelDataSize, this->allocator); @@ -1759,6 +2040,7 @@ void HNSWIndex::removeVectorInPlace(const idType element_int // Finally, remove the element from the index and make a swap with the last internal id to // avoid fragmentation and reclaim memory when needed. removeAndSwap(element_internal_id); + this->connectUnreachableNodes(); } // Store the new element in the global data structures and keep the new state. In multithreaded @@ -1857,10 +2139,14 @@ void HNSWIndex::appendVector(const void *vector_data, const insertElementToGraph(new_element_id, element_max_level, prev_entry_point, prev_max_level, vector_data); } - unmarkInProcess(new_element_id); - if (auxiliaryCtx == nullptr && state.currMaxLevel < state.elementMaxLevel) { - // No external auxiliaryCtx, so it's this function responsibility to release the lock. - this->unlockIndexDataGuard(); + unmarkAs(new_element_id); + if (auxiliaryCtx == nullptr) { + // No external auxiliaryCtx, so it's this function responsibility to release the lock if + // needed and connect unreachable nodes that were created due to this operation. + if (state.currMaxLevel < state.elementMaxLevel) { + this->unlockIndexDataGuard(); + } + this->connectUnreachableNodes(); } } diff --git a/src/VecSim/algorithms/hnsw/hnsw_serializer.h b/src/VecSim/algorithms/hnsw/hnsw_serializer.h index 90129ad1f..64eb9eb8b 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_serializer.h +++ b/src/VecSim/algorithms/hnsw/hnsw_serializer.h @@ -6,8 +6,9 @@ HNSWIndex::HNSWIndex(std::ifstream &input, const HNSWParams Serializer::EncodingVersion version) : VecSimIndexAbstract(abstractInitParams), Serializer(version), maxElements(RoundUpInitialCapacity(params->initialCapacity, this->blockSize)), - epsilon(params->epsilon), vectorBlocks(this->allocator), graphDataBlocks(this->allocator), - idToMetaData(maxElements, this->allocator), + epsilon(params->epsilon), unreachableNodes(this->allocator), hardUnreachableNodes(this->allocator), + vectorBlocks(this->allocator), + graphDataBlocks(this->allocator), idToMetaData(maxElements, this->allocator), visitedNodesHandlerPool(1, maxElements, this->allocator) { this->restoreIndexFields(input); diff --git a/src/VecSim/algorithms/hnsw/hnsw_tiered.h b/src/VecSim/algorithms/hnsw/hnsw_tiered.h index ffb567abf..b44788e61 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_tiered.h +++ b/src/VecSim/algorithms/hnsw/hnsw_tiered.h @@ -208,6 +208,10 @@ class TieredHNSWIndex : public VecSimTieredIndex { TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING, "running asynchronous GC for tiered HNSW index"); this->executeReadySwapJobs(this->pendingSwapJobsThreshold); + // Try to reinsert permanent unreachable nodes + this->mainIndexGuard.lock_shared(); + this->getHNSWIndex()->template connectUnreachableNodes(); + this->mainIndexGuard.unlock_shared(); } void acquireSharedLocks() override { this->flatIndexGuard.lock_shared(); @@ -423,9 +427,13 @@ void TieredHNSWIndex::insertVectorToHNSW( hnsw_index->lockIndexDataGuard(); // Check if resizing is needed for HNSW index (requires write lock). if (hnsw_index->indexCapacity() == hnsw_index->indexSize()) { + hnsw_index->unlockIndexDataGuard(); + // Try to reinsert permanent unreachable nodes + TIERED_LOG(VecSimCommonStrings::LOG_VERBOSE_STRING, + "Going over permanent unreachable nodes:"); + hnsw_index->template connectUnreachableNodes(); // Release the inner HNSW data lock before we re-acquire the global HNSW lock. this->mainIndexGuard.unlock_shared(); - hnsw_index->unlockIndexDataGuard(); this->mainIndexGuard.lock(); hnsw_index->lockIndexDataGuard(); @@ -472,6 +480,8 @@ void TieredHNSWIndex::insertVectorToHNSW( if (state.elementMaxLevel > state.currMaxLevel) { hnsw_index->unlockIndexDataGuard(); } + // Reinsert nodes that became unreachable due to this operation. + hnsw_index->connectUnreachableNodes(); this->mainIndexGuard.unlock_shared(); } } @@ -589,7 +599,8 @@ void TieredHNSWIndex::executeRepairJob(HNSWRepairJob *job) { this->idToRepairJobsGuard.unlock(); hnsw_index->repairNodeConnections(job->node_id, job->level); - + // Reinsert nodes that became unreachable due to this operation. + hnsw_index->connectUnreachableNodes(); this->mainIndexGuard.unlock_shared(); } diff --git a/src/VecSim/utils/vecsim_stl.h b/src/VecSim/utils/vecsim_stl.h index 0b24c2258..739801504 100644 --- a/src/VecSim/utils/vecsim_stl.h +++ b/src/VecSim/utils/vecsim_stl.h @@ -92,18 +92,16 @@ class set : public VecsimBaseObject, public std::set, VecsimSTLA : VecsimBaseObject(alloc), std::set, VecsimSTLAllocator>(alloc) {} }; -template -class unordered_set - : public VecsimBaseObject, - public std::unordered_set, std::equal_to, VecsimSTLAllocator> { +template > +class unordered_set : public VecsimBaseObject, + public std::unordered_set, VecsimSTLAllocator> { public: explicit unordered_set(const std::shared_ptr &alloc) : VecsimBaseObject(alloc), - std::unordered_set, std::equal_to, VecsimSTLAllocator>(alloc) {} + std::unordered_set, VecsimSTLAllocator>(alloc) {} explicit unordered_set(size_t n_bucket, const std::shared_ptr &alloc) : VecsimBaseObject(alloc), - std::unordered_set, std::equal_to, VecsimSTLAllocator>(n_bucket, - alloc) {} + std::unordered_set, VecsimSTLAllocator>(n_bucket, alloc) {} }; } // namespace vecsim_stl diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 66e6230b3..01ffd3b43 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -271,10 +271,10 @@ template VecSimIndexInfo VecSimTieredIndex::info() const { VecSimIndexInfo info; this->flatIndexGuard.lock_shared(); + this->mainIndexGuard.lock(); VecSimIndexInfo frontendInfo = this->frontendIndex->info(); this->flatIndexGuard.unlock_shared(); - this->mainIndexGuard.lock(); VecSimIndexInfo backendInfo = this->backendIndex->info(); this->mainIndexGuard.unlock(); diff --git a/tests/unit/test_allocator.cpp b/tests/unit/test_allocator.cpp index c761e3a87..7874ad490 100644 --- a/tests/unit/test_allocator.cpp +++ b/tests/unit/test_allocator.cpp @@ -18,6 +18,8 @@ const size_t vecsimAllocationOverhead = VecSimAllocator::getAllocationOverheadSi const size_t hashTableNodeSize = getLabelsLookupNodeSize(); +const size_t unreachableNodeHashTableNodeSize = getUnreachableNodeSize(); + class AllocatorTest : public ::testing::Test {}; struct SimpleObject : public VecsimBaseObject { public: @@ -323,8 +325,9 @@ TYPED_TEST(IndexAllocatorTest, testIncomingEdgesSet) { /* Compute the expected allocation delta: * 1. empty incoming edges set in every level (+ allocator's header). - * 2. A node in the labels_lookup has table (+ allocator's header). If rehashing occurred, we - * account also for the diff in the buckets size (each bucket has sizeof(size_t) overhead). + * 2. A node in the labels_lookup hash table (+ allocator's header). If rehashing occurred, we + * account also for the diff in the buckets size (each bucket has sizeof(size_t) overhead). Same + * applies for the unreachable nodes unoredered set. * 3. Account for allocating link lists for levels higher than 0, if exists. * 4. Finally, expect an allocation of the data buffer in the incoming edges vector of vec1 due * to the insertion, and the fact that vec1 will re-select its neighbours. @@ -333,7 +336,10 @@ TYPED_TEST(IndexAllocatorTest, testIncomingEdgesSet) { (vec_max_level + 1) * (sizeof(vecsim_stl::vector) + vecsimAllocationOverhead) + hashTableNodeSize; size_t buckets_diff = hnswIndex->labelLookup.bucket_count() - buckets_num_before; - expected_allocation_delta += buckets_diff * sizeof(size_t); + size_t unreachable_nodes_overhead = + hnswIndex->unreachableNodes.bucket_count() * sizeof(graphNodeType) + + vecsimAllocationOverhead; + expected_allocation_delta += buckets_diff * sizeof(size_t) + unreachable_nodes_overhead; if (vec_max_level > 0) { expected_allocation_delta += hnswIndex->levelDataSize * vec_max_level + vecsimAllocationOverhead; @@ -448,6 +454,8 @@ TYPED_TEST(IndexAllocatorTest, test_hnsw_reclaim_memory) { // All data structures' memory returns to as it was, with the exceptional of the labels_lookup // (STL unordered_map with hash table implementation), that leaves some empty buckets. size_t hash_table_memory = hnswIndex->labelLookup.bucket_count() * sizeof(size_t); + // This applies for the unordered ser unreachable nodes as well. + hash_table_memory += hnswIndex->unreachableNodes.bucket_count() * sizeof(size_t); // Data block vectors do not shrink on resize so extra memory is expected. size_t block_vectors_memory = sizeof(DataBlock) * (hnswIndex->graphDataBlocks.capacity() + hnswIndex->vectorBlocks.capacity()) + diff --git a/tests/unit/test_common.cpp b/tests/unit/test_common.cpp index 4546da7f4..f791a8ca2 100644 --- a/tests/unit/test_common.cpp +++ b/tests/unit/test_common.cpp @@ -539,15 +539,17 @@ TEST(CommonAPITest, testlogTieredIndex) { GenerateAndAddVector(tiered_index, 4, 1); mock_thread_pool.thread_iteration(); tiered_index->deleteVector(1); - ASSERT_EQ(log.logBuffer.size(), 4); + ASSERT_EQ(log.logBuffer.size(), 5); ASSERT_EQ(log.logBuffer[0], "verbose: " + log.prefix + "Updating HNSW index capacity from 0 to 1024"); - ASSERT_EQ(log.logBuffer[1], + ASSERT_EQ(log.logBuffer[1], "debug: " + log.prefix + "New entry point due to deletion is " + + std::to_string(INVALID_ID)); + ASSERT_EQ(log.logBuffer[2], "verbose: " + log.prefix + "Tiered HNSW index GC: there are 1 ready swap jobs. Start executing 1 swap jobs"); - ASSERT_EQ(log.logBuffer[2], - "verbose: " + log.prefix + "Updating HNSW index capacity from 1024 to 0"); ASSERT_EQ(log.logBuffer[3], + "verbose: " + log.prefix + "Updating HNSW index capacity from 1024 to 0"); + ASSERT_EQ(log.logBuffer[4], "verbose: " + log.prefix + "Tiered HNSW index GC: done executing 1 swap jobs"); } diff --git a/tests/unit/test_hnsw.cpp b/tests/unit/test_hnsw.cpp index 79c9fbdf0..bffd1c2f9 100644 --- a/tests/unit/test_hnsw.cpp +++ b/tests/unit/test_hnsw.cpp @@ -916,7 +916,7 @@ TYPED_TEST(HNSWTest, hnsw_bad_params) { } TYPED_TEST(HNSWTest, hnsw_delete_entry_point) { - size_t n = 10000; + size_t n = 1000; size_t dim = 4; size_t M = 2; @@ -931,11 +931,13 @@ TYPED_TEST(HNSWTest, hnsw_delete_entry_point) { ASSERT_TRUE(index != NULL); - int64_t vec[dim]; - for (size_t i = 0; i < dim; i++) - vec[i] = i; - for (size_t j = 0; j < n; j++) + for (size_t j = 0; j < n; j++) { + TEST_DATA_T vec[dim]; + for (size_t i = 0; i < dim; i++) { + vec[i] = std::rand() / (TEST_DATA_T)RAND_MAX; + } VecSimIndex_AddVector(index, vec, j); + } VecSimIndexInfo info = VecSimIndex_Info(index); @@ -2267,3 +2269,33 @@ TYPED_TEST(HNSWTest, FitMemoryTest) { VecSimIndex_Free(index); } + +TYPED_TEST(HNSWTest, NewNodeIsReachable) { + size_t dim = 4; + size_t n = 100; + HNSWParams params = {.dim = dim, .metric = VecSimMetric_L2, .efConstruction = 10}; + VecSimIndex *index = this->CreateNewIndex(params); + auto hnsw_index = this->CastToHNSW_Single(index); + + // Add 99 vectors + for (size_t i = 0; i < n; i++) { + GenerateAndAddVector(index, dim, i, i); + } + // Mark all vectors as deleted, except from first. + for (size_t i = 1; i < n; i++) { + hnsw_index->markDelete(i); + } + // Insert New node. The scan should not go through deleted candidates (otherwise, the new vector + // would have been unreachable. + EXPECT_EQ(hnsw_index->getEntryPointLabel(), 0); + GenerateAndAddVector(index, dim, n, n); + + TEST_DATA_T query[] = {(TEST_DATA_T)n, (TEST_DATA_T)n, (TEST_DATA_T)n, (TEST_DATA_T)n}; + auto verify_n_reachable = [&](size_t id, double score, size_t index) { + ASSERT_EQ(id, n); + ASSERT_EQ(score, 0); + }; + runTopKSearchTest(index, query, 1, verify_n_reachable); + + VecSimIndex_Free(index); +} diff --git a/tests/unit/test_hnsw_multi.cpp b/tests/unit/test_hnsw_multi.cpp index 3f17c0422..69c37801d 100644 --- a/tests/unit/test_hnsw_multi.cpp +++ b/tests/unit/test_hnsw_multi.cpp @@ -1223,7 +1223,7 @@ TYPED_TEST(HNSWMultiTest, test_query_runtime_params_user_build_args) { } TYPED_TEST(HNSWMultiTest, hnsw_delete_entry_point) { - size_t n = 10000; + size_t n = 1000; size_t per_label = 5; size_t dim = 4; size_t M = 2; @@ -1239,11 +1239,13 @@ TYPED_TEST(HNSWMultiTest, hnsw_delete_entry_point) { ASSERT_TRUE(index != NULL); - TEST_DATA_T vec[dim]; - for (size_t i = 0; i < dim; i++) - vec[i] = i; - for (size_t j = 0; j < n; j++) + for (size_t j = 0; j < n; j++) { + TEST_DATA_T vec[dim]; + for (size_t i = 0; i < dim; i++) { + vec[i] = std::rand() / (TEST_DATA_T)RAND_MAX; + } VecSimIndex_AddVector(index, vec, j / per_label); + } VecSimIndexInfo info = VecSimIndex_Info(index); diff --git a/tests/unit/test_hnsw_tiered.cpp b/tests/unit/test_hnsw_tiered.cpp index 83c2e4995..8cc42c9c3 100644 --- a/tests/unit/test_hnsw_tiered.cpp +++ b/tests/unit/test_hnsw_tiered.cpp @@ -1005,7 +1005,7 @@ TYPED_TEST(HNSWTieredIndexTestBasic, deleteFromHNSWMultiLevels) { TYPED_TEST(HNSWTieredIndexTest, deleteFromHNSWWithRepairJobExec) { // Create TieredHNSW index instance with a mock queue. - size_t n = 1000; + size_t n = 200; size_t dim = 4; bool isMulti = TypeParam::isMulti(); @@ -2953,10 +2953,6 @@ TYPED_TEST(HNSWTieredIndexTest, switchWriteModes) { // (the label that we just inserted), and the first result should be this vector. auto ver_res = [&](size_t label, double score, size_t index) { if (index == 0) { - if (label != i % n_labels + n_labels && !TypeParam::isMulti()) { - // TODO: remove after we have a mechanism for connecting new elements - return; // this is flaky - ignore for now - } EXPECT_EQ(label, i % n_labels + n_labels); EXPECT_DOUBLE_EQ(score, 0); } diff --git a/tests/unit/test_utils.cpp b/tests/unit/test_utils.cpp index 7b99eba22..073fcfc8d 100644 --- a/tests/unit/test_utils.cpp +++ b/tests/unit/test_utils.cpp @@ -376,3 +376,12 @@ size_t getLabelsLookupNodeSize() { size_t memory_after = allocator->getAllocationSize(); return memory_after - memory_before; } + +size_t getUnreachableNodeSize() { + std::shared_ptr allocator = VecSimAllocator::newVecsimAllocator(); + auto dummy_lookup = vecsim_stl::unordered_set(1, allocator); + size_t memory_before = allocator->getAllocationSize(); + dummy_lookup.insert({1, 1}); // Insert a dummy {key, value} element pair. + size_t memory_after = allocator->getAllocationSize(); + return memory_after - memory_before; +} diff --git a/tests/unit/test_utils.h b/tests/unit/test_utils.h index 5b2b7e56d..deb9e1b8e 100644 --- a/tests/unit/test_utils.h +++ b/tests/unit/test_utils.h @@ -145,6 +145,8 @@ void runRangeQueryTest(VecSimIndex *index, const void *query, double radius, size_t getLabelsLookupNodeSize(); +size_t getUnreachableNodeSize(); + inline double GetInfVal(VecSimType type) { if (type == VecSimType_FLOAT64) { return exp(500);