Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion app/test/benchmark/text_similarity_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def test_alegre_text_similarity_old_api_contract(self):

# TODO: UPDATE A VECTOR

@unittest.skip("disabled to avoid confounding parallel tests")
def test_many_vector_stores_non_parallel_old(self, num_saves=100):
"""
write a many vectors as quickly as possible from a single thread (old endpoint)
Expand Down Expand Up @@ -194,6 +195,7 @@ def test_many_vector_stores_non_parallel_old(self, num_saves=100):
# sent 100 delete requests in 2.6708664894104004 seconds. rate= 0.026708664894104003
# assert delete_rate < 0.02

@unittest.skip("disabled to avoid confounding parallel tests")
def test_many_vector_stores_non_parallel_sync(self, num_saves=100):
"""
write a many vectors as quickly as possible from a single thread (new sync endpoint)
Expand Down Expand Up @@ -224,6 +226,7 @@ def test_many_vector_stores_non_parallel_sync(self, num_saves=100):
f"submitted {num_saves} vectorization requests to /similarity/sync/text in {duration} seconds. rate= {submit_rate}"
)

@unittest.skip("disabled to avoid confounding parallel tests")
def test_many_vector_stores_non_parallel_async(self, num_saves=100):
"""
write a many vectors as quickly as possible from a single thread (new _async_ endpoint)
Expand Down Expand Up @@ -323,6 +326,24 @@ def _query_doc_from_alegre_sync(self, doc):
# confirm correct doc returned
assert json.loads(response.text)["result"][0]["id"] == doc["doc_id"], f'response was {response.text}'

def _query_doc_from_alegre_async(self, doc):
response = requests.post(
self.ALEGRE_BASE_URL + "/similarity/async/text",
json={
"text": doc["text"],
"context": doc["context"],
"model": self.MODEL_KEY, # model must be included
"threshold": 0.0, # return all results
},
headers={
"Content-Type": "application/json",
"User-Agent": "Alegre Load Test", # TODO: cfg should know version
},
)
assert response.ok is True, f"response was {response}"
# confirm correct doc returned
assert json.loads(response.text)["result"][0]["id"] == doc["doc_id"], f'response was {response.text}'

def _delete_doc_from_alegre(self, doc):
# DELETE A VECTOR (corresponding to the text of a content item)
response = requests.delete(
Expand All @@ -335,6 +356,7 @@ def _delete_doc_from_alegre(self, doc):
)
assert response.ok is True, f" delete response was {response} : {response.text}"

@unittest.skip("not testing deprecated endpoint to avoid confounding tests")
def test_many_vector_stores_semi_parallel_old(
self, num_saves=10, thread_pool_size=10
):
Expand Down Expand Up @@ -393,7 +415,7 @@ def test_many_vector_stores_semi_parallel_old(
# ... so 10X slower at ~ 1/sec

def test_many_vector_stores_semi_parallel_sync(
self, num_saves=10, thread_pool_size=10
self, num_saves=1000, thread_pool_size=100
):
"""
write a whole bunch of vectors as quickly as possible
Expand Down Expand Up @@ -443,6 +465,57 @@ def test_many_vector_stores_semi_parallel_sync(
f"submitted {num_saves} batch parallelized delete requests in {duration} seconds. rate= {delete_rate}"
)

def test_many_vector_stores_semi_parallel_async(
self, num_saves=1000, thread_pool_size=100
):
"""
write a whole bunch of vectors as quickly as possible
from a single thread using the new async route
"""

documents = []
# make the documents
for n in range(num_saves):
doc = {
"text": self.fake_content_source.text(),
"doc_id": f"test_doc_{n}",
"context": {"type": "alegre_parallel_async_test_text"},
"model": self.MODEL_KEY, # model must be included
}
documents.append(doc)

start_time = time.time()
pool = ThreadPoolExecutor(max_workers=thread_pool_size)
# store vector
pool.map(self._store_doc_to_alegre_async, documents)
# NOTE: I'm not very confidant that pool.map is actually blocking
end_time = time.time()
duration = end_time - start_time
submit_rate = duration / num_saves
print(
f"New async endpoint submitted {num_saves} batch parallelized vectorization requests in {duration} seconds. rate= {submit_rate}"
)

# query vector
start_time = time.time()
pool.map(self._query_doc_from_alegre_async, documents)
end_time = time.time()
duration = end_time - start_time
query_rate = duration / num_saves
print(
f"submitted {num_saves} batch parallelized async query requests in {duration} seconds. rate= {query_rate}"
)

# delete vector
start_time = time.time()
pool.map(self._delete_doc_from_alegre, documents)
end_time = time.time()
duration = end_time - start_time
delete_rate = duration / num_saves
print(
f"submitted {num_saves} batch parallelized delete requests in {duration} seconds. rate= {delete_rate}"
)


if __name__ == "__main__":
environment = os.getenv("DEPLOY_ENV", "local")
Expand Down
2 changes: 1 addition & 1 deletion production/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ set +o allexport
python manage.py init
python manage.py init_perl_functions
python manage.py db upgrade
gunicorn --preload -w 8 --threads 8 -b 0.0.0.0:${ALEGRE_PORT} --access-logfile - --error-logfile - wsgi:app
gunicorn --preload -w 8 --threads 16 -b 0.0.0.0:${ALEGRE_PORT} --access-logfile - --error-logfile - wsgi:app
Loading