Skip to content

Solve issues with mutexes while running tests on MacOS M1 #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions src/python_bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ class PyVecSimIndex {

class PyHNSWLibIndex : public PyVecSimIndex {
private:
std::shared_mutex indexGuard; // to protect parallel operations on the index.
std::shared_ptr<std::shared_mutex>
indexGuard; // to protect parallel operations on the index. Make sure to release the GIL
// while locking the mutex.
template <typename search_param_t> // size_t/double for KNN/range queries.
using QueryFunc =
std::function<VecSimQueryReply *(const char *, search_param_t, VecSimQueryParams *)>;
Expand All @@ -258,9 +260,10 @@ class PyHNSWLibIndex : public PyVecSimIndex {
if (ind >= n_queries) {
break;
}
indexGuard.lock_shared();
results[ind] = queryFunc((const char *)items.data(ind), param, query_params);
indexGuard.unlock_shared();
{
std::shared_lock<std::shared_mutex> lock(*indexGuard);
results[ind] = queryFunc((const char *)items.data(ind), param, query_params);
}
}
};
std::thread thread_objs[n_threads];
Expand All @@ -281,12 +284,14 @@ class PyHNSWLibIndex : public PyVecSimIndex {
VecSimParams params = {.algo = VecSimAlgo_HNSWLIB,
.algoParams = {.hnswParams = HNSWParams{hnsw_params}}};
this->index = std::shared_ptr<VecSimIndex>(VecSimIndex_New(&params), VecSimIndex_Free);
this->indexGuard = std::make_shared<std::shared_mutex>();
}

// @params is required only in V1.
explicit PyHNSWLibIndex(const std::string &location) {
this->index =
std::shared_ptr<VecSimIndex>(HNSWFactory::NewIndex(location), VecSimIndex_Free);
this->indexGuard = std::make_shared<std::shared_mutex>();
}

void setDefaultEf(size_t ef) {
Expand Down Expand Up @@ -403,15 +408,16 @@ class PyHNSWLibIndex : public PyVecSimIndex {
break;
}
if (ind % block_size != 0) {
indexGuard.lock_shared();
// Read lock for normal operations
indexGuard->lock_shared();
exclusive = false;
} else {
// Lock exclusively if we are performing resizing due to a new block.
indexGuard.lock();
// Exclusive lock for block resizing operations
indexGuard->lock();
}
barrier.unlock();
this->addVectorInternal((const char *)data.data(ind), labels.at(ind));
exclusive ? indexGuard.unlock() : indexGuard.unlock_shared();
exclusive ? indexGuard->unlock() : indexGuard->unlock_shared();
}
};
std::thread thread_objs[n_threads];
Expand Down Expand Up @@ -459,12 +465,15 @@ class PyHNSWLibIndex : public PyVecSimIndex {
}
PyBatchIterator createBatchIterator(const py::object &input,
VecSimQueryParams *query_params) override {

py::array query(input);
auto del = [&](VecSimBatchIterator *pyBatchIter) {
py::gil_scoped_release py_gil;
// Passing indexGuardPtr by value, so that the refCount of the mutex
auto del = [indexGuardPtr = this->indexGuard](VecSimBatchIterator *pyBatchIter) {
VecSimBatchIterator_Free(pyBatchIter);
this->indexGuard.unlock_shared();
indexGuardPtr->unlock_shared();
};
indexGuard.lock_shared();
indexGuard->lock_shared();
auto py_batch_ptr = std::shared_ptr<VecSimBatchIterator>(
VecSimBatchIterator_New(index.get(), (const char *)query.data(0), query_params), del);
return PyBatchIterator(index, py_batch_ptr);
Expand Down
Loading