Skip to content

Replace direct http connection with requests.Session() Using connection pool #1733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 52 additions & 68 deletions libs/labelbox/src/labelbox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,25 @@ def __init__(self,
self.app_url = app_url
self.endpoint = endpoint
self.rest_endpoint = rest_endpoint
self._data_row_metadata_ontology = None
self._adv_client = AdvClient.factory(rest_endpoint, api_key)
self._connection: requests.Session = self._init_connection()

def _init_connection(self) -> requests.Session:
connection = requests.Session(
) # using default connection pool size of 10
connection.headers.update(self._default_headers())

return connection

self.headers = {
def _default_headers(self):
return {
'Authorization': 'Bearer %s' % self.api_key,
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
'X-User-Agent': f"python-sdk {SDK_VERSION}",
'X-Python-Version': f"{python_version_info()}",
}
self._data_row_metadata_ontology = None
self._adv_client = AdvClient.factory(rest_endpoint, api_key)

@retry.Retry(predicate=retry.if_exception_type(
labelbox.exceptions.InternalServerError,
Expand Down Expand Up @@ -193,18 +202,19 @@ def convert_value(value):
"/graphql", "/_gql")

try:
request = {
'url': endpoint,
'data': data,
'headers': self.headers,
'timeout': timeout
}
headers = self._connection.headers.copy()
if files:
request.update({'files': files})
request['headers'] = {
'Authorization': self.headers['Authorization']
}
response = requests.post(**request)
del headers['Content-Type']
del headers['Accept']
request = requests.Request('POST',
endpoint,
headers=headers,
data=data,
files=files if files else None)

prepped: requests.PreparedRequest = request.prepare()

response = self._connection.send(prepped, timeout=timeout)
logger.debug("Response: %s", response.text)
except requests.exceptions.Timeout as e:
raise labelbox.exceptions.TimeoutError(str(e))
Expand Down Expand Up @@ -409,14 +419,21 @@ def upload_data(self,
"map": (None, json.dumps({"1": ["variables.file"]})),
}

response = requests.post(
self.endpoint,
headers={"authorization": "Bearer %s" % self.api_key},
data=request_data,
files={
"1": (filename, content, content_type) if
(filename and content_type) else content
})
files = {
"1": (filename, content, content_type) if
(filename and content_type) else content
}
headers = self._connection.headers.copy()
headers.pop("Content-Type", None)
request = requests.Request('POST',
self.endpoint,
headers=headers,
data=request_data,
files=files)

prepped: requests.PreparedRequest = request.prepare()

response = self._connection.send(prepped)

if response.status_code == 502:
error_502 = '502 Bad Gateway'
Expand Down Expand Up @@ -1085,6 +1102,7 @@ def get_feature_schema(self, feature_schema_id):
query_str = """query rootSchemaNodePyApi($rootSchemaNodeWhere: RootSchemaNodeWhere!){
rootSchemaNode(where: $rootSchemaNodeWhere){%s}
}""" % query.results_query_part(Entity.FeatureSchema)

res = self.execute(
query_str,
{'rootSchemaNodeWhere': {
Expand Down Expand Up @@ -1195,10 +1213,7 @@ def delete_unused_feature_schema(self, feature_schema_id: str) -> None:

endpoint = self.rest_endpoint + "/feature-schemas/" + urllib.parse.quote(
feature_schema_id)
response = requests.delete(
endpoint,
headers=self.headers,
)
response = self._connection.delete(endpoint)

if response.status_code != requests.codes.no_content:
raise labelbox.exceptions.LabelboxError(
Expand All @@ -1215,10 +1230,7 @@ def delete_unused_ontology(self, ontology_id: str) -> None:
"""
endpoint = self.rest_endpoint + "/ontologies/" + urllib.parse.quote(
ontology_id)
response = requests.delete(
endpoint,
headers=self.headers,
)
response = self._connection.delete(endpoint)

if response.status_code != requests.codes.no_content:
raise labelbox.exceptions.LabelboxError(
Expand All @@ -1240,11 +1252,7 @@ def update_feature_schema_title(self, feature_schema_id: str,

endpoint = self.rest_endpoint + "/feature-schemas/" + urllib.parse.quote(
feature_schema_id) + '/definition'
response = requests.patch(
endpoint,
headers=self.headers,
json={"title": title},
)
response = self._connection.patch(endpoint, json={"title": title})

if response.status_code == requests.codes.ok:
return self.get_feature_schema(feature_schema_id)
Expand Down Expand Up @@ -1273,11 +1281,8 @@ def upsert_feature_schema(self, feature_schema: Dict) -> FeatureSchema:
"featureSchemaId") or "new_feature_schema_id"
endpoint = self.rest_endpoint + "/feature-schemas/" + urllib.parse.quote(
feature_schema_id)
response = requests.put(
endpoint,
headers=self.headers,
json={"normalized": json.dumps(feature_schema)},
)
response = self._connection.put(
endpoint, json={"normalized": json.dumps(feature_schema)})

if response.status_code == requests.codes.ok:
return self.get_feature_schema(response.json()['schemaId'])
Expand All @@ -1303,11 +1308,7 @@ def insert_feature_schema_into_ontology(self, feature_schema_id: str,
endpoint = self.rest_endpoint + '/ontologies/' + urllib.parse.quote(
ontology_id) + "/feature-schemas/" + urllib.parse.quote(
feature_schema_id)
response = requests.post(
endpoint,
headers=self.headers,
json={"position": position},
)
response = self._connection.post(endpoint, json={"position": position})
if response.status_code != requests.codes.created:
raise labelbox.exceptions.LabelboxError(
"Failed to insert the feature schema into the ontology, message: "
Expand All @@ -1328,11 +1329,7 @@ def get_unused_ontologies(self, after: str = None) -> List[str]:
"""

endpoint = self.rest_endpoint + "/ontologies/unused"
response = requests.get(
endpoint,
headers=self.headers,
json={"after": after},
)
response = self._connection.get(endpoint, json={"after": after})

if response.status_code == requests.codes.ok:
return response.json()
Expand All @@ -1356,11 +1353,7 @@ def get_unused_feature_schemas(self, after: str = None) -> List[str]:
"""

endpoint = self.rest_endpoint + "/feature-schemas/unused"
response = requests.get(
endpoint,
headers=self.headers,
json={"after": after},
)
response = self._connection.get(endpoint, json={"after": after})

if response.status_code == requests.codes.ok:
return response.json()
Expand Down Expand Up @@ -1881,10 +1874,7 @@ def is_feature_schema_archived(self, ontology_id: str,

ontology_endpoint = self.rest_endpoint + "/ontologies/" + urllib.parse.quote(
ontology_id)
response = requests.get(
ontology_endpoint,
headers=self.headers,
)
response = self._connection.get(ontology_endpoint)

if response.status_code == requests.codes.ok:
feature_schema_nodes = response.json()['featureSchemaNodes']
Expand Down Expand Up @@ -1960,10 +1950,7 @@ def delete_feature_schema_from_ontology(
ontology_endpoint = self.rest_endpoint + "/ontologies/" + urllib.parse.quote(
ontology_id) + "/feature-schemas/" + urllib.parse.quote(
feature_schema_id)
response = requests.delete(
ontology_endpoint,
headers=self.headers,
)
response = self._connection.delete(ontology_endpoint)

if response.status_code == requests.codes.ok:
response_json = response.json()
Expand Down Expand Up @@ -1997,10 +1984,7 @@ def unarchive_feature_schema_node(self, ontology_id: str,
ontology_endpoint = self.rest_endpoint + "/ontologies/" + urllib.parse.quote(
ontology_id) + '/feature-schemas/' + urllib.parse.quote(
root_feature_schema_id) + '/unarchive'
response = requests.patch(
ontology_endpoint,
headers=self.headers,
)
response = self._connection.patch(ontology_endpoint)
if response.status_code == requests.codes.ok:
if not bool(response.json()['unarchived']):
raise labelbox.exceptions.LabelboxError(
Expand Down
2 changes: 2 additions & 0 deletions libs/labelbox/tests/integration/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def project_to_test_where(client, rand_gen):

# Avoid assertions using equality to prevent intermittent failures due to
# other builds simultaneously adding projects to test org
@pytest.mark.skip(
reason="broken due to get_projects HF for sunset-custom-editor")
def test_where(client, project_to_test_where):
p_a, p_b, p_c = project_to_test_where
p_a_name = p_a.name
Expand Down
4 changes: 2 additions & 2 deletions libs/labelbox/tests/integration/test_labeler_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


@pytest.mark.skipif(
condition=os.environ['LABELBOX_TEST_ENVIRON'] == "onprem",
reason="longer runtime than expected for onprem. unskip when resolved.")
condition=os.environ['LABELBOX_TEST_ENVIRON'] != "prod",
reason="only works for prod")
def test_labeler_performance(configured_project_with_label):
project, _, _, _ = configured_project_with_label

Expand Down
3 changes: 3 additions & 0 deletions libs/labelbox/tests/integration/test_labeling_frontend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from labelbox import LabelingFrontend
import pytest


def test_get_labeling_frontends(client):
Expand All @@ -7,6 +8,8 @@ def test_get_labeling_frontends(client):
assert len(filtered_frontends)


@pytest.mark.skip(
reason="broken due to get_projects HF for sunset-custom-editor")
def test_labeling_frontend_connecting_to_project(project):
client = project.client
default_labeling_frontend = next(
Expand Down
15 changes: 8 additions & 7 deletions libs/labelbox/tests/integration/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def delete_tag(tag_id: str):
delete_tag(tagB.uid)


@pytest.mark.skip(
reason="broken due to get_projects HF for sunset-custom-editor")
def test_project_filtering(client, rand_gen, data_for_project_test):
name_1 = rand_gen(str)
p1 = data_for_project_test(name_1)
Expand All @@ -144,7 +146,6 @@ def test_attach_instructions(client, project):
assert str(
execinfo.value
) == "Cannot attach instructions to a project that has not been set up."

editor = list(
client.get_labeling_frontends(
where=LabelingFrontend.name == "editor"))[0]
Expand Down Expand Up @@ -218,7 +219,7 @@ def test_create_batch_with_global_keys_sync(project: Project, data_rows):
global_keys = [dr.global_key for dr in data_rows]
batch_name = f'batch {uuid.uuid4()}'
batch = project.create_batch(batch_name, global_keys=global_keys)

assert batch.size == len(set(data_rows))


Expand All @@ -227,7 +228,7 @@ def test_create_batch_with_global_keys_async(project: Project, data_rows):
global_keys = [dr.global_key for dr in data_rows]
batch_name = f'batch {uuid.uuid4()}'
batch = project._create_batch_async(batch_name, global_keys=global_keys)

assert batch.size == len(set(data_rows))


Expand All @@ -245,8 +246,7 @@ def test_media_type(client, project: Project, rand_gen):
# Exclude LLM media types for now, as they are not supported
if MediaType[media_type] in [
MediaType.LLMPromptCreation,
MediaType.LLMPromptResponseCreation,
MediaType.LLM
MediaType.LLMPromptResponseCreation, MediaType.LLM
]:
continue

Expand Down Expand Up @@ -284,7 +284,8 @@ def test_label_count(client, configured_batch_project_with_label):

def test_clone(client, project, rand_gen):
# cannot clone unknown project media type
project = client.create_project(name=rand_gen(str), media_type=MediaType.Image)
project = client.create_project(name=rand_gen(str),
media_type=MediaType.Image)
cloned_project = project.clone()

assert cloned_project.description == project.description
Expand All @@ -295,4 +296,4 @@ def test_clone(client, project, rand_gen):
assert cloned_project.get_label_count() == 0

project.delete()
cloned_project.delete()
cloned_project.delete()