-
Notifications
You must be signed in to change notification settings - Fork 20
[MOD-9557] Fix incorrect vector blob size calculation #665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
25f3c3a
ffe9e81
64b3d3f
94e5789
3524565
66a7237
80d5cec
3fe0515
b097a20
b10dd24
6b2a89e
c71876b
07f9ec1
a618046
a038e1d
3662895
284646e
5a672ea
2d4e7ca
4fa002a
0c3d47d
2495ae6
1cf9456
b4c0f3f
1747911
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -172,7 +172,7 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> { | |
inline void filter_irrelevant_results(VecSimQueryResultContainer &); | ||
|
||
public: | ||
TieredHNSW_BatchIterator(void *query_vector, | ||
TieredHNSW_BatchIterator(const void *query_vector, | ||
const TieredHNSWIndex<DataType, DistType> *index, | ||
VecSimQueryParams *queryParams, | ||
std::shared_ptr<VecSimAllocator> allocator); | ||
|
@@ -206,11 +206,9 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> { | |
VecSimDebugInfoIterator *debugInfoIterator() const override; | ||
VecSimBatchIterator *newBatchIterator(const void *queryBlob, | ||
VecSimQueryParams *queryParams) const override { | ||
size_t blobSize = this->frontendIndex->getDim() * sizeof(DataType); | ||
void *queryBlobCopy = this->allocator->allocate(blobSize); | ||
memcpy(queryBlobCopy, queryBlob, blobSize); | ||
// The query blob will be processed and copied by the internal indexes's batch iterator. | ||
return new (this->allocator) | ||
TieredHNSW_BatchIterator(queryBlobCopy, this, queryParams, this->allocator); | ||
TieredHNSW_BatchIterator(queryBlob, this, queryParams, this->allocator); | ||
} | ||
inline void setLastSearchMode(VecSearchMode mode) override { | ||
return this->backendIndex->setLastSearchMode(mode); | ||
|
@@ -545,10 +543,11 @@ void TieredHNSWIndex<DataType, DistType>::executeInsertJob(HNSWInsertJob *job) { | |
HNSWIndex<DataType, DistType> *hnsw_index = this->getHNSWIndex(); | ||
// Copy the vector blob from the flat buffer, so we can release the flat lock while we are | ||
// indexing the vector into HNSW index. | ||
auto blob_copy = this->getAllocator()->allocate_unique(this->frontendIndex->getDataSize()); | ||
|
||
memcpy(blob_copy.get(), this->frontendIndex->getDataByInternalId(job->id), | ||
this->frontendIndex->getDim() * sizeof(DataType)); | ||
size_t data_size = this->frontendIndex->getDataSize(); | ||
auto blob_copy = this->getAllocator()->allocate_unique(data_size); | ||
// Assuming the size of the blob stored in the frontend index matches the size of the blob | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Consider adding an assertion or runtime check to verify that the blob size from the frontend index (getDataSize()) matches the expected size in the HNSW index. This extra check would help catch unexpected mismatches early. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
// stored in the HNSW index. | ||
memcpy(blob_copy.get(), this->frontendIndex->getDataByInternalId(job->id), data_size); | ||
|
||
this->insertVectorToHNSW<true>(hnsw_index, job->label, blob_copy.get()); | ||
|
||
|
@@ -719,7 +718,7 @@ int TieredHNSWIndex<DataType, DistType>::addVector(const void *blob, labelType l | |
int ret = 1; | ||
auto hnsw_index = this->getHNSWIndex(); | ||
// writeMode is not protected since it is assumed to be called only from the "main thread" | ||
// (that is the thread that is exculusively calling add/delete vector). | ||
// (that is the thread that is exclusively calling add/delete vector). | ||
if (this->getWriteMode() == VecSim_WriteInPlace) { | ||
// First, check if we need to overwrite the vector in-place for single (from both indexes). | ||
if (!this->backendIndex->isMultiValue()) { | ||
|
@@ -849,7 +848,7 @@ int TieredHNSWIndex<DataType, DistType>::deleteVector(labelType label) { | |
// Note that we may remove the same vector that has been removed from the flat index, if it was | ||
// being ingested at that time. | ||
// writeMode is not protected since it is assumed to be called only from the "main thread" | ||
// (that is the thread that is exculusively calling add/delete vector). | ||
// (that is the thread that is exclusively calling add/delete vector). | ||
if (this->getWriteMode() == VecSim_WriteAsync) { | ||
num_deleted_vectors += this->deleteLabelFromHNSW(label); | ||
// Apply ready swap jobs if number of deleted vectors reached the threshold | ||
|
@@ -924,9 +923,10 @@ double TieredHNSWIndex<DataType, DistType>::getDistanceFrom_Unsafe(labelType lab | |
|
||
template <typename DataType, typename DistType> | ||
TieredHNSWIndex<DataType, DistType>::TieredHNSW_BatchIterator::TieredHNSW_BatchIterator( | ||
void *query_vector, const TieredHNSWIndex<DataType, DistType> *index, | ||
const void *query_vector, const TieredHNSWIndex<DataType, DistType> *index, | ||
VecSimQueryParams *queryParams, std::shared_ptr<VecSimAllocator> allocator) | ||
: VecSimBatchIterator(query_vector, queryParams ? queryParams->timeoutCtx : nullptr, | ||
// Tiered batch iterator doesn't hold its own copy of the query vector. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is that? Please extend the document here since it is not trivial.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
: VecSimBatchIterator(nullptr, queryParams ? queryParams->timeoutCtx : nullptr, | ||
std::move(allocator)), | ||
index(index), flat_results(this->allocator), hnsw_results(this->allocator), | ||
flat_iterator(this->index->frontendIndex->newBatchIterator(query_vector, queryParams)), | ||
|
@@ -1192,4 +1192,5 @@ void TieredHNSWIndex<DataType, DistType>::getDataByLabel( | |
labelType label, std::vector<std::vector<DataType>> &vectors_output) const { | ||
this->getHNSWIndex()->getDataByLabel(label, vectors_output); | ||
} | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -155,11 +155,12 @@ | |||||||||
return MemoryUtils::unique_blob{const_cast<void *>(original_data), [](void *) {}}; | ||||||||||
} | ||||||||||
|
||||||||||
const auto data_size = this->dim * sizeof(DataType) * n; | ||||||||||
const auto data_size = this->getDataSize() * n; | ||||||||||
|
||||||||||
auto processed_blob = | ||||||||||
MemoryUtils::unique_blob{this->allocator->allocate(data_size), | ||||||||||
[this](void *ptr) { this->allocator->free_allocation(ptr); }}; | ||||||||||
// Assuming original data size equals to processed data size | ||||||||||
memcpy(processed_blob.get(), original_data, data_size); | ||||||||||
// Preprocess each vector in place | ||||||||||
for (size_t i = 0; i < n; i++) { | ||||||||||
|
@@ -435,17 +436,18 @@ | |||||||||
|
||||||||||
VecSimBatchIterator *newBatchIterator(const void *queryBlob, | ||||||||||
VecSimQueryParams *queryParams) const override { | ||||||||||
auto *queryBlobCopy = | ||||||||||
this->allocator->allocate_aligned(this->dataSize, this->preprocessors->getAlignment()); | ||||||||||
memcpy(queryBlobCopy, queryBlob, this->dim * sizeof(DataType)); | ||||||||||
this->preprocessQueryInPlace(queryBlobCopy); | ||||||||||
// force_copy == true. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Clarify in documentation the rationale for forcing a copy (force_copy == true) of the query blob in the newBatchIterator to make the memory ownership semantics explicit.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||
auto queryBlobCopy = this->preprocessQuery(queryBlob, true); | ||||||||||
|
||||||||||
// take ownership of the blob copy and pass it to the batch iterator. | ||||||||||
auto *queryBlobCopyPtr = queryBlobCopy.release(); | ||||||||||
// Ownership of queryBlobCopy moves to VecSimBatchIterator that will free it at the end. | ||||||||||
if (indexSize() == 0) { | ||||||||||
return new (this->getAllocator()) | ||||||||||
NullSVS_BatchIterator(queryBlobCopy, queryParams, this->getAllocator()); | ||||||||||
NullSVS_BatchIterator(queryBlobCopyPtr, queryParams, this->getAllocator()); | ||||||||||
} else { | ||||||||||
return new (this->getAllocator()) SVS_BatchIterator<impl_type, data_type>( | ||||||||||
queryBlobCopy, impl_.get(), queryParams, this->getAllocator()); | ||||||||||
queryBlobCopyPtr, impl_.get(), queryParams, this->getAllocator()); | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -479,6 +481,15 @@ | |||||||||
} | ||||||||||
|
||||||||||
#ifdef BUILD_TESTS | ||||||||||
virtual void fitMemory() {}; | ||||||||||
void fitMemory() override {} | ||||||||||
std::vector<std::vector<char>> getStoredVectorDataByLabel(labelType label) const override { | ||||||||||
assert(nullptr && "Not implemented"); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] If getStoredVectorDataByLabel in SVSIndex is likely to be invoked during tests or in production, consider providing a safe default implementation instead of an assert, or clearly document that it should not be used.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||||||
return {}; | ||||||||||
} | ||||||||||
void getDataByLabel( | ||||||||||
labelType label, | ||||||||||
std::vector<std::vector<svs_details::vecsim_dt<DataType>>> &vectors_output) const override { | ||||||||||
assert(nullptr && "Not implemented"); | ||||||||||
} | ||||||||||
#endif | ||||||||||
}; |
Uh oh!
There was an error while loading. Please reload this page.