Skip to content

Basic Multithreads #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions app/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ void Create(std::unordered_map<std::string, std::string> args) {
std::string L_small, R_small, R_stiched;
bool save = false;
bool leaveEmpty = false;
int distanceThreads = 1; // Default value

std::vector<std::string> validArguments = {"-index-type", "-base-file", "-L", "-L-small", "-R", "-R-small", "-R-stiched", "-alpha", "-save", "-random-edges", "-connection-mode"};
std::vector<std::string> validArguments = {"-index-type", "-base-file", "-L", "-L-small", "-R", "-R-small", "-R-stiched", "-alpha", "-save", "-random-edges", "-connection-mode", "-distance-threads"};
for (auto arg : args) {
if (std::find(validArguments.begin(), validArguments.end(), arg.first) == validArguments.end()) {
throw std::invalid_argument("Error: Invalid argument: " + arg.first + ". Valid arguments are: -index-type, -base-file, -L, -L-small, -R, -R-small, -R-stiched, -alpha, -save, -connection-mode");
throw std::invalid_argument("Error: Invalid argument: " + arg.first + ". Valid arguments are: -index-type, -base-file, -L, -L-small, -R, -R-small, -R-stiched, -alpha, -save, -connection-mode, -distance-threads");
}
}

Expand Down Expand Up @@ -195,6 +196,10 @@ void Create(std::unordered_map<std::string, std::string> args) {
}
}

if (args.find("-distance-threads") != args.end()) {
distanceThreads = std::stoi(args["-distance-threads"]);
}

if (indexType == "simple") {
BaseVectors base_vectors = ReadVectorFile(baseFile);
if (base_vectors.empty()) {
Expand All @@ -203,7 +208,7 @@ void Create(std::unordered_map<std::string, std::string> args) {
}

VamanaIndex<DataVector<float>> vamanaIndex = VamanaIndex<DataVector<float>>();
vamanaIndex.createGraph(base_vectors, std::stof(alpha), std::stoi(L), std::stoi(R));
vamanaIndex.createGraph(base_vectors, std::stof(alpha), std::stoi(L), std::stoi(R), distanceThreads, true);

if (save) {
if (!vamanaIndex.saveGraph(outputFile)) {
Expand All @@ -223,15 +228,15 @@ void Create(std::unordered_map<std::string, std::string> args) {

if (indexType == "filtered") {
FilteredVamanaIndex<BaseDataVector<float>> index(filters);
index.createGraph(base_vectors, std::stoi(alpha), std::stoi(L), std::stoi(R), true, leaveEmpty);
index.createGraph(base_vectors, std::stoi(alpha), std::stoi(L), std::stoi(R), distanceThreads, true, leaveEmpty);

if (save) {
index.saveGraph(outputFile);
std::cout << std::endl << green << "Vamana Index was saved successfully to " << brightYellow << "`" << outputFile << "`" << reset << std::endl;
}
} else if (indexType == "stiched") {
StichedVamanaIndex<BaseDataVector<float>> index(filters);
index.createGraph(base_vectors, std::stof(alpha), std::stoi(L_small), std::stoi(R_small), std::stoi(R_stiched), true, leaveEmpty);
index.createGraph(base_vectors, std::stof(alpha), std::stoi(L_small), std::stoi(R_small), std::stoi(R_stiched), distanceThreads, true, leaveEmpty);

if (save) {
index.saveGraph(outputFile);
Expand Down
2 changes: 1 addition & 1 deletion include/FilteredVamanaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ template <typename vamana_t> class FilteredVamanaIndex : public VamanaIndex<vama
* @param L An unsigned int parameter.
* @param R An unsigned int parameter.
*/
void createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int R, bool visualized = true, bool empty = true);
void createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int R, unsigned int distance_threads = 1, bool visualized = true, bool empty = true);

/**
* @brief Load a graph from a file. Specifically this method is used to receive the contents of a Vamana Index Graph
Expand Down
2 changes: 1 addition & 1 deletion include/StichedVamanaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ template <typename vamana_t> class StichedVamanaIndex : public FilteredVamanaInd
* @param R An unsigned int parameter.
*/
void createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L_small,
const unsigned int R_small, const unsigned int R_stiched, bool visualized = true, bool empty = true);
const unsigned int R_small, const unsigned int R_stiched, unsigned int distance_threads, bool visualized = true, bool empty = true);

};

Expand Down
5 changes: 3 additions & 2 deletions include/VamanaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ template <typename vamana_t> class VamanaIndex {
* @brief Computes the distances between every node in the dataset and stores them in the distance matrix.
*
* @param visualize a boolean flag to visualize the progress of the computation
* @param numThreads the number of threads to use for computation
*/
void computeDistances(const bool visualize = true);
void computeDistances(const bool visualize = true, const unsigned int numThreads = 1);

public:

Expand Down Expand Up @@ -102,7 +103,7 @@ template <typename vamana_t> class VamanaIndex {
* @param R the parameter R
*
*/
void createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int& R, bool visualize = true, double** distanceMatrix = nullptr);
void createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int& R, unsigned int distance_threads = 1, bool visualize = true, double** distanceMatrix = nullptr);

/**
* @brief Saves a specific graph into a file. Specifically this method is used to save the contents of a Vamana
Expand Down
1 change: 1 addition & 0 deletions include/graphics.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const std::string brightWhite = "\033[1;37m";

const std::string ProgressSymbol = "\u25AC";
const std::string RemainingSymbol = "\u25AC";
const std::string tickSymbol = "\u2713";


/**
Expand Down
5 changes: 3 additions & 2 deletions src/Graphics/ProgressBar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void displayProgressBar(
}
else if (current == total) {
std::cout << " " << verticalLineSymbol << " " << yellow;
std::cout << brightGreen << "Done" << std::setw(14) << std::setfill(' ') << reset;
std::cout << brightGreen << "Done " << tickSymbol << std::setw(12) << std::setfill(' ') << reset;
}

// Display elapsed time
Expand Down Expand Up @@ -137,8 +137,9 @@ void withProgress(
auto startTime = std::chrono::steady_clock::now();

for (unsigned int i = start; i < end; i++) {
displayProgressBar(i - start + 1, total, message, startTime, barWidth);
displayProgressBar(i - start, total, message, startTime, barWidth);
func(i);
displayProgressBar(i - start + 1, total, message, startTime, barWidth);
}

std::cout << std::endl;
Expand Down
4 changes: 2 additions & 2 deletions src/VIA/Algorithms/FilteredVamanaIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ FilteredVamanaIndex<vamana_t>::getNodesWithCategoricalValueFilter(const Categori
*/
template <typename vamana_t>
void FilteredVamanaIndex<vamana_t>::createGraph(
const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int R, bool visualized, bool empty) {
const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int R, unsigned int distance_threads, bool visualized, bool empty) {

using Filter = CategoricalAttributeFilter;
using GreedyResult = std::pair<std::set<vamana_t>, std::set<vamana_t>>;
Expand All @@ -91,7 +91,7 @@ void FilteredVamanaIndex<vamana_t>::createGraph(
for (unsigned int i = 0; i < n; i++) {
this->distanceMatrix[i] = new double[n];
}
this->computeDistances();
this->computeDistances(true, distance_threads);
this->G.setNodesCount(n);

// Initialize G to an empty graph and get the medoid node
Expand Down
6 changes: 3 additions & 3 deletions src/VIA/Algorithms/StichedVamanaIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
template <typename vamana_t>
void StichedVamanaIndex<vamana_t>::createGraph(const std::vector<vamana_t>& P, const float& alpha, const unsigned int L_small,
const unsigned int R_small, const unsigned int R_stiched, bool visualized, bool empty) {
const unsigned int R_small, const unsigned int R_stiched, unsigned int distance_threads, bool visualized, bool empty) {

using Filter = CategoricalAttributeFilter;

Expand All @@ -28,7 +28,7 @@ void StichedVamanaIndex<vamana_t>::createGraph(const std::vector<vamana_t>& P, c
for (unsigned int i = 0; i < n; i++) {
this->distanceMatrix[i] = new double[n];
}
this->computeDistances();
this->computeDistances(true, distance_threads);

// Initialize G = (V, E) to an empty graph
this->G.setNodesCount(n);
Expand Down Expand Up @@ -73,7 +73,7 @@ void StichedVamanaIndex<vamana_t>::createGraph(const std::vector<vamana_t>& P, c

// Initialize the sub-index for the current filter and create its graph
VamanaIndex<vamana_t> subIndex;
subIndex.createGraph(Pf[filter], alpha, R_small, L_small, false, this->distanceMatrix);
subIndex.createGraph(Pf[filter], alpha, R_small, L_small, 1, false, this->distanceMatrix);

for (unsigned int i = 0; i < subIndex.getGraph().getNodesCount(); i++) {

Expand Down
89 changes: 59 additions & 30 deletions src/VIA/Algorithms/VamanaIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

#include <thread>
#include <chrono>
#include <atomic>
#include <mutex>
#include <random>
#include <algorithm>
#include <numeric>
#include <fstream>
#include <iostream>

std::mutex distanceMutex;

/**
* @brief Generates a random permutation of integers in a specified range. This function creates a vector
* containing all integers from `start` to `end` and then shuffles them randomly to produce a random permutation.
Expand Down Expand Up @@ -91,26 +95,53 @@ template <typename vamana_t> void VamanaIndex<vamana_t>::createRandomEdges(const
/**
* @brief Computes the distances between every node in the dataset and stores them in the distance matrix.
*/
template <typename vamana_t> void VamanaIndex<vamana_t>::computeDistances(const bool visualize) {

// Define a lambda function to compute the distances between two points
auto compute = [&](int i) {
for (unsigned int j = i; j < this->P.size(); j++) {
double dist = euclideanDistance(this->P.at(i), this->P.at(j));
this->distanceMatrix[i][j] = dist;
this->distanceMatrix[j][i] = dist;
template <typename vamana_t>
void VamanaIndex<vamana_t>::computeDistances(const bool visualize, const unsigned int numThreads) {

std::atomic<int> progress(0);
auto startTime = std::chrono::steady_clock::now();

// Define a lambda function to compute the distances between points
auto compute = [&](int start, int end) {
for (int i = start; i < end; ++i) {
for (unsigned int j = i; j < this->P.size(); ++j) {
double dist = euclideanDistance(this->P.at(i), this->P.at(j));
this->distanceMatrix[i][j] = dist;
this->distanceMatrix[j][i] = dist;
}
progress++;
if (visualize && progress % 100 == 0) {
std::lock_guard<std::mutex> lock(distanceMutex);
displayProgressBar(progress, this->P.size(), "Computing Distances", startTime, 30);
}
}
};

// Compute distances with or without visualization, depending on the visualize flag value
if (visualize) {
withProgress(0, this->P.size(), "Computing Distances", compute);
} else {
for (unsigned int i = 0; i < this->P.size(); i++) {
compute(i);
// Compute distances using multiple threads if numThreads > 1 or a single thread otherwise
if (numThreads > 1) {
std::vector<std::thread> threads;
int chunkSize = this->P.size() / numThreads;
for (unsigned int t = 0; t < numThreads; ++t) {
int start = t * chunkSize;
int end = (t == numThreads - 1) ? this->P.size() : start + chunkSize;
threads.emplace_back(compute, start, end);
}
}

for (auto& thread : threads) {
thread.join();
}
if (visualize) {
displayProgressBar(this->P.size(), this->P.size(), "Computing Distances", startTime, 30);
std::cout << std::endl;
}
}
else {
if (visualize) {
withProgress(0, this->P.size(), "Computing Distances", [&](int i) { compute(i, i + 1); });
} else {
compute(0, this->P.size());
}
}
}

/**
Expand All @@ -131,7 +162,7 @@ template <typename vamana_t> void VamanaIndex<vamana_t>::computeDistances(const
*/
template <typename vamana_t>
void VamanaIndex<vamana_t>::createGraph(
const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int& R, bool visualize, double** distanceMatrix) {
const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const unsigned int& R, unsigned int distance_threads, bool visualize, double** distanceMatrix) {

using GreedyResult = std::pair<std::set<vamana_t>, std::set<vamana_t>>;
GreedyResult greedyResult;
Expand All @@ -151,8 +182,11 @@ const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const
for (unsigned int i = 0; i < n; i++) {
this->distanceMatrix[i] = new double[n];
}
this->computeDistances(visualize);
this->computeDistances(visualize, distance_threads);
}

// this->computeDistances(false);
this->G.setNodesCount(n);

// Set the number of nodes in the graph, fill the nodes with the dataset points, and create random edges for the nodes
this->G.setNodesCount(n);
Expand All @@ -163,43 +197,38 @@ const std::vector<vamana_t>& P, const float& alpha, const unsigned int L, const
GraphNode<vamana_t> s = findMedoid(this->G, visualize, 1000);
std::vector<int> sigma = generateRandomPermutation(0, n-1);

// Define a lambda function for the main loop process of the Vamana algorithm
auto loopProcess = [&](int i) {

// Define a lambda function to process each node in the sigma permutation
auto processNode = [&](int i) {
GraphNode<vamana_t>* sigma_i_node = this->G.getNode(sigma.at(i));
vamana_t sigma_i = sigma_i_node->getData();

// Run Greedy Search and Robust Prune for the current sigma[i] node and its neighbors
greedyResult = GreedySearch(*this, s, this->P.at(sigma.at(i)), 1, L);
RobustPrune(*this, *sigma_i_node, greedyResult.second, alpha, R);

// Get the neighbors of sigma[i] node and iterate over them to run Robust Prune for each one of them as well
for (auto j : *sigma_i_node->getNeighborsVector()) {
std::vector<vamana_t>* sigma_i_neighbors = sigma_i_node->getNeighborsVector();
for (auto j : *sigma_i_neighbors) {
std::set<vamana_t> outgoing;
GraphNode<vamana_t>* j_node = this->G.getNode(j.getIndex());

// The outgoing set has to consist of the neighbors of j and the sigma[i] node itself
for (auto neighbor : *j_node->getNeighborsVector()) {
outgoing.insert(neighbor);
}
outgoing.insert(sigma_i);

// Check if the |N_out(j) union {sigma[i]}| > R and run Robust Prune accordingly
if (outgoing.size() > (unsigned int)R) {
if (outgoing.size() > (long unsigned int)R) {
RobustPrune(*this, *j_node, outgoing, alpha, R);
} else {
j_node->addNeighbor(sigma_i);
}
}

};

// Process each node with or without visualization
// Run the lambda process function if visualization is enabled, otherwise run it without progress visualization
if (visualize) {
withProgress(0, n, "Creating Vamana", loopProcess);
withProgress(0, n, "Creating Vamana", processNode);
} else {
for (unsigned int i = 0; i < n; i++) {
loopProcess(i);
processNode(i);
}
}

Expand Down
Loading