Skip to content

Commit f649c94

Browse files
author
Diego Ardila
committed
Kill grequests
1 parent ce9d988 commit f649c94

File tree

3 files changed

+16
-128
lines changed

3 files changed

+16
-128
lines changed

conftest.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,3 @@
1-
# grequests must be imported before any module not designed for reentrancy,
2-
# because it relies on aggressive monkey patching that breaks if done after many
3-
# other common module imports, e.g. ssl.
4-
#
5-
# So we import it before everything else. For details see:
6-
# https://github.com/gevent/gevent/issues/1016#issuecomment-328530533
7-
# https://github.com/spyoungtech/grequests/issues/8
8-
import grequests
9-
10-
################
11-
121
import logging
132
import os
143

nucleus/__init__.py

Lines changed: 16 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,20 @@
5353
import json
5454
import logging
5555
import os
56-
import warnings
57-
from typing import Any, Callable, Dict, List, Optional, Union
56+
from typing import Any, Dict, List, Optional, Union
5857

59-
import grequests
6058
import pkg_resources
6159
import requests
6260
import tqdm
6361
import tqdm.notebook as tqdm_notebook
64-
from requests.adapters import HTTPAdapter
6562

6663
# pylint: disable=E1101
6764
# TODO: refactor to reduce this file to under 1000 lines.
6865
# pylint: disable=C0302
69-
from requests.packages.urllib3.util.retry import Retry
7066

7167
from .annotation import (
7268
BoxAnnotation,
7369
PolygonAnnotation,
74-
Segment,
7570
SegmentationAnnotation,
7671
)
7772
from .constants import (
@@ -145,6 +140,7 @@ def __init__(
145140
api_key: str,
146141
use_notebook: bool = False,
147142
endpoint: str = None,
143+
verify: bool = True,
148144
):
149145
self.api_key = api_key
150146
self.tqdm_bar = tqdm.tqdm
@@ -324,13 +320,13 @@ def populate_dataset(
324320
dataset_id: str,
325321
dataset_items: List[DatasetItem],
326322
batch_size: int = 100,
327-
force: bool = False,
323+
update: bool = False,
328324
):
329325
"""
330326
Appends images to a dataset with given dataset_id.
331-
Overwrites images on collision if forced.
327+
Overwrites images on collision if updated.
332328
:param dataset_id: id of a dataset
333-
:param payload: { "items": List[DatasetItem], "force": bool }
329+
:param payload: { "items": List[DatasetItem], "update": bool }
334330
:param local: flag if images are stored locally
335331
:param batch_size: size of the batch for long payload
336332
:return:
@@ -373,16 +369,19 @@ def populate_dataset(
373369
async_responses: List[Any] = []
374370

375371
for batch in tqdm_local_batches:
376-
payload = construct_append_payload(batch, force)
372+
payload = construct_append_payload(batch, update)
377373
responses = self._process_append_requests_local(
378-
dataset_id, payload, force
374+
dataset_id, payload, update
379375
)
380376
async_responses.extend(responses)
381377

382378
for batch in tqdm_remote_batches:
383-
payload = construct_append_payload(batch, force)
379+
payload = construct_append_payload(batch, update)
384380
responses = self._process_append_requests(
385-
dataset_id, payload, force, batch_size, batch_size
381+
dataset_id=dataset_id,
382+
payload=payload,
383+
update=update,
384+
batch_size=batch_size,
386385
)
387386
async_responses.extend(responses)
388387

@@ -397,20 +396,7 @@ def _process_append_requests_local(
397396
payload: dict,
398397
update: bool,
399398
local_batch_size: int = 10,
400-
size: int = 10,
401399
):
402-
def error(batch_items: dict) -> UploadResponse:
403-
return UploadResponse(
404-
{
405-
DATASET_ID_KEY: dataset_id,
406-
ERROR_ITEMS: len(batch_items),
407-
ERROR_PAYLOAD: batch_items,
408-
}
409-
)
410-
411-
def exception_handler(request, exception):
412-
logger.error(exception)
413-
414400
def preprocess_payload(batch):
415401
request_payload = [
416402
(ITEMS_KEY, (None, json.dumps(batch), "application/json"))
@@ -438,21 +424,14 @@ def preprocess_payload(batch):
438424
request_payloads.append(batch_payload)
439425
payload_items.append(batch)
440426

441-
async_requests = [
442-
self._make_grequest(
427+
responses = [
428+
self.make_request(
443429
payload,
444430
f"dataset/{dataset_id}/append",
445-
local=True,
446431
)
447432
for payload in request_payloads
448433
]
449434

450-
async_responses = grequests.map(
451-
async_requests,
452-
exception_handler=exception_handler,
453-
size=size,
454-
)
455-
456435
def close_files(request_items):
457436
for item in request_items:
458437
# file buffer in location [1][1]
@@ -463,15 +442,6 @@ def close_files(request_items):
463442
for p in request_payloads:
464443
close_files(p)
465444

466-
# response object will be None if an error occurred
467-
async_responses = [
468-
response
469-
if (response and response.status_code == 200)
470-
else error(request_items)
471-
for response, request_items in zip(async_responses, payload_items)
472-
]
473-
responses.extend(async_responses)
474-
475445
return responses
476446

477447
def _process_append_requests(
@@ -480,49 +450,22 @@ def _process_append_requests(
480450
payload: dict,
481451
update: bool,
482452
batch_size: int = 20,
483-
size: int = 10,
484453
):
485-
def default_error(payload: dict) -> UploadResponse:
486-
return UploadResponse(
487-
{
488-
DATASET_ID_KEY: dataset_id,
489-
ERROR_ITEMS: len(payload[ITEMS_KEY]),
490-
ERROR_PAYLOAD: payload[ITEMS_KEY],
491-
}
492-
)
493-
494-
def exception_handler(request, exception):
495-
logger.error(exception)
496-
497454
items = payload[ITEMS_KEY]
498455
payloads = [
499456
# batch_size images per request
500457
{ITEMS_KEY: items[i : i + batch_size], UPDATE_KEY: update}
501458
for i in range(0, len(items), batch_size)
502459
]
503460

504-
async_requests = [
505-
self._make_grequest(
461+
return [
462+
self.make_request(
506463
payload,
507464
f"dataset/{dataset_id}/append",
508-
local=False,
509465
)
510466
for payload in payloads
511467
]
512468

513-
async_responses = grequests.map(
514-
async_requests, exception_handler=exception_handler, size=size
515-
)
516-
517-
async_responses = [
518-
response
519-
if (response and response.status_code == 200)
520-
else default_error(payload)
521-
for response, payload in zip(async_responses, payloads)
522-
]
523-
524-
return async_responses
525-
526469
def annotate_dataset(
527470
self,
528471
dataset_id: str,
@@ -1070,49 +1013,6 @@ def delete_custom_index(self, dataset_id: str):
10701013
requests_command=requests.delete,
10711014
)
10721015

1073-
def _make_grequest(
1074-
self,
1075-
payload: dict,
1076-
route: str,
1077-
session=None,
1078-
requests_command: Callable = grequests.post,
1079-
local=True,
1080-
):
1081-
"""
1082-
makes a grequest to Nucleus endpoint
1083-
:param payload: file dict for multipart-formdata
1084-
:param route: route for the request
1085-
:param session: requests.session
1086-
:param requests_command: grequests.post, grequests.get, grequests.delete
1087-
:return: An async grequest object
1088-
"""
1089-
adapter = HTTPAdapter(max_retries=Retry(total=3))
1090-
sess = requests.Session()
1091-
sess.mount("https://", adapter)
1092-
sess.mount("http://", adapter)
1093-
1094-
endpoint = f"{self.endpoint}/{route}"
1095-
logger.info("Posting to %s", endpoint)
1096-
1097-
if local:
1098-
post = requests_command(
1099-
endpoint,
1100-
session=sess,
1101-
files=payload,
1102-
auth=(self.api_key, ""),
1103-
timeout=DEFAULT_NETWORK_TIMEOUT_SEC,
1104-
)
1105-
else:
1106-
post = requests_command(
1107-
endpoint,
1108-
session=sess,
1109-
json=payload,
1110-
headers={"Content-Type": "application/json"},
1111-
auth=(self.api_key, ""),
1112-
timeout=DEFAULT_NETWORK_TIMEOUT_SEC,
1113-
)
1114-
return post
1115-
11161016
def _make_request_raw(
11171017
self, payload: dict, endpoint: str, requests_command=requests.post
11181018
):

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ packages = [{include="nucleus"}]
3333

3434
[tool.poetry.dependencies]
3535
python = "^3.6.2"
36-
grequests = "^0.6.0"
3736
requests = "^2.25.1"
3837
tqdm = "^4.41.0"
3938
dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" }

0 commit comments

Comments
 (0)