Skip to content

Commit 6108121

Browse files
authored
Fix sync ingest for large number of requests (#363)
1 parent 100ec6c commit 6108121

File tree

7 files changed

+55
-94
lines changed

7 files changed

+55
-94
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.14.20](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.14.20) - 2022-09-23
9+
10+
### Fixed
11+
- Local uploads are correctly batched and prevents flooding the network with requests
12+
13+
814
## [0.14.19](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.14.19) - 2022-08-26
915

1016
### Added

nucleus/annotation_uploader.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ def upload(
5757
update: bool = False,
5858
remote_files_per_upload_request: int = 20,
5959
local_files_per_upload_request: int = 10,
60-
local_file_upload_concurrency: int = 30,
6160
):
6261
"""For more details on parameters and functionality, see dataset.annotate."""
6362
if local_files_per_upload_request > 10:
@@ -84,7 +83,6 @@ def upload(
8483
segmentations=segmentations_with_local_files,
8584
update=update,
8685
local_files_per_upload_request=local_files_per_upload_request,
87-
local_file_upload_concurrency=local_file_upload_concurrency,
8886
)
8987
)
9088
if segmentations_with_remote_files:
@@ -138,7 +136,6 @@ def make_batched_file_form_data_requests(
138136
segmentations: Sequence[SegmentationAnnotation],
139137
update,
140138
local_files_per_upload_request: int,
141-
local_file_upload_concurrency: int,
142139
):
143140
requests = []
144141
for i in range(0, len(segmentations), local_files_per_upload_request):
@@ -158,7 +155,6 @@ def make_batched_file_form_data_requests(
158155
requests=requests,
159156
route=self._route,
160157
progressbar=progressbar,
161-
concurrency=local_file_upload_concurrency,
162158
)
163159

164160
def get_form_data_and_file_pointers_fn(

nucleus/async_utils.py

Lines changed: 44 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,7 @@ class FileFormField:
2727

2828
FileFormData = Sequence[FileFormField]
2929

30-
31-
async def gather_with_concurrency(n, *tasks):
32-
"""Helper method to limit the concurrency when gathering the results from multiple tasks."""
33-
semaphore = asyncio.Semaphore(n)
34-
35-
async def sem_task(task):
36-
async with semaphore:
37-
return await task
38-
39-
return await asyncio.gather(*(sem_task(task) for task in tasks))
30+
UPLOAD_SEMAPHORE = asyncio.Semaphore(10)
4031

4132

4233
class FormDataContextHandler:
@@ -98,7 +89,6 @@ def make_many_form_data_requests_concurrently(
9889
requests: Sequence[FormDataContextHandler],
9990
route: str,
10091
progressbar: tqdm,
101-
concurrency: int = 30,
10292
):
10393
"""
10494
Makes an async post request with form data to a Nucleus endpoint.
@@ -109,14 +99,10 @@ def make_many_form_data_requests_concurrently(
10999
handle generating form data, and opening/closing files for each request.
110100
route: route for the request.
111101
progressbar: A tqdm progress bar to use for showing progress to the user.
112-
concurrency: How many concurrent requests to run at once. Should be exposed
113-
to the user.
114102
"""
115103
loop = get_event_loop()
116104
return loop.run_until_complete(
117-
form_data_request_helper(
118-
client, requests, route, progressbar, concurrency
119-
)
105+
form_data_request_helper(client, requests, route, progressbar)
120106
)
121107

122108

@@ -125,14 +111,13 @@ async def form_data_request_helper(
125111
requests: Sequence[FormDataContextHandler],
126112
route: str,
127113
progressbar: tqdm,
128-
concurrency: int = 30,
129114
):
130115
"""
131116
Makes an async post request with files to a Nucleus endpoint.
132117
133118
Args:
134119
client: The client to use for the request.
135-
requests: Each requst should be a FormDataContextHandler object which will
120+
requests: Each request should be a FormDataContextHandler object which will
136121
handle generating form data, and opening/closing files for each request.
137122
route: route for the request.
138123
"""
@@ -149,7 +134,7 @@ async def form_data_request_helper(
149134
)
150135
for request in requests
151136
]
152-
return await gather_with_concurrency(concurrency, *tasks)
137+
return await asyncio.gather(*tasks)
153138

154139

155140
async def _post_form_data(
@@ -169,47 +154,47 @@ async def _post_form_data(
169154
session: The session to use for the request.
170155
"""
171156
endpoint = f"{client.endpoint}/{route}"
172-
173157
logger.info("Posting to %s", endpoint)
174158

175-
for sleep_time in RetryStrategy.sleep_times() + [-1]:
176-
with request as form:
177-
async with session.post(
178-
endpoint,
179-
data=form,
180-
auth=aiohttp.BasicAuth(client.api_key, ""),
181-
timeout=DEFAULT_NETWORK_TIMEOUT_SEC,
182-
) as response:
183-
logger.info(
184-
"API request has response code %s", response.status
185-
)
186-
187-
try:
188-
data = await response.json()
189-
except aiohttp.client_exceptions.ContentTypeError:
190-
# In case of 404, the server returns text
191-
data = await response.text()
192-
if (
193-
response.status in RetryStrategy.statuses
194-
and sleep_time != -1
195-
):
196-
time.sleep(sleep_time)
197-
continue
198-
199-
if response.status == 503:
200-
raise TimeoutError(
201-
"The request to upload your max is timing out, please lower local_files_per_upload_request in your api call."
159+
async with UPLOAD_SEMAPHORE:
160+
for sleep_time in RetryStrategy.sleep_times() + [-1]:
161+
with request as form:
162+
async with session.post(
163+
endpoint,
164+
data=form,
165+
auth=aiohttp.BasicAuth(client.api_key, ""),
166+
timeout=DEFAULT_NETWORK_TIMEOUT_SEC,
167+
) as response:
168+
logger.info(
169+
"API request has response code %s", response.status
202170
)
203171

204-
if not response.ok:
205-
raise NucleusAPIError(
206-
endpoint,
207-
session.post,
208-
aiohttp_response=(
209-
response.status,
210-
response.reason,
211-
data,
212-
),
213-
)
214-
progressbar.update(1)
215-
return data
172+
try:
173+
data = await response.json()
174+
except aiohttp.client_exceptions.ContentTypeError:
175+
# In case of 404, the server returns text
176+
data = await response.text()
177+
if (
178+
response.status in RetryStrategy.statuses
179+
and sleep_time != -1
180+
):
181+
time.sleep(sleep_time)
182+
continue
183+
184+
if response.status == 503:
185+
raise TimeoutError(
186+
"The request to upload your max is timing out, please lower local_files_per_upload_request in your api call."
187+
)
188+
189+
if not response.ok:
190+
raise NucleusAPIError(
191+
endpoint,
192+
session.post,
193+
aiohttp_response=(
194+
response.status,
195+
response.reason,
196+
data,
197+
),
198+
)
199+
progressbar.update(1)
200+
return data

nucleus/dataset.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
# pylint: disable=C0302
7373

7474

75-
WARN_FOR_LARGE_UPLOAD = 50000
75+
WARN_FOR_LARGE_UPLOAD = 10000
7676
WARN_FOR_LARGE_SCENES_UPLOAD = 5
7777

7878

@@ -336,7 +336,6 @@ def annotate(
336336
asynchronous: bool = False,
337337
remote_files_per_upload_request: int = 20,
338338
local_files_per_upload_request: int = 10,
339-
local_file_upload_concurrency: int = 30,
340339
) -> Union[Dict[str, Any], AsyncJob]:
341340
"""Uploads ground truth annotations to the dataset.
342341
@@ -381,8 +380,6 @@ def annotate(
381380
request. Segmentations have either local or remote files, if you are
382381
getting timeouts while uploading segmentations with local files, you
383382
should lower this value from its default of 10. The maximum is 10.
384-
local_file_upload_concurrency: Number of concurrent local file uploads.
385-
386383
387384
Returns:
388385
If synchronous, payload describing the upload result::
@@ -414,7 +411,6 @@ def annotate(
414411
batch_size=batch_size,
415412
remote_files_per_upload_request=remote_files_per_upload_request,
416413
local_files_per_upload_request=local_files_per_upload_request,
417-
local_file_upload_concurrency=local_file_upload_concurrency,
418414
)
419415

420416
def ingest_tasks(self, task_ids: List[str]) -> dict:
@@ -452,7 +448,6 @@ def append(
452448
batch_size: int = 20,
453449
asynchronous: bool = False,
454450
local_files_per_upload_request: int = 10,
455-
local_file_upload_concurrency: int = 30,
456451
) -> Union[Dict[Any, Any], AsyncJob, UploadResponse]:
457452
"""Appends items or scenes to a dataset.
458453
@@ -535,8 +530,6 @@ def append(
535530
this if you encounter timeouts.
536531
local_files_per_upload_request: Optional; default is 10. We recommend
537532
lowering this if you encounter timeouts.
538-
local_file_upload_concurrency: Optional; default is 30. We recommend
539-
lowering this if you encounter gateway timeouts or Cloudflare errors.
540533
Returns:
541534
For scenes
542535
If synchronous, returns a payload describing the upload result::
@@ -608,7 +601,6 @@ def append(
608601
update=update,
609602
batch_size=batch_size,
610603
local_files_per_upload_request=local_files_per_upload_request,
611-
local_file_upload_concurrency=local_file_upload_concurrency,
612604
)
613605

614606
@deprecated("Prefer using Dataset.append instead.")
@@ -1483,7 +1475,6 @@ def upload_predictions(
14831475
batch_size: int = 5000,
14841476
remote_files_per_upload_request: int = 20,
14851477
local_files_per_upload_request: int = 10,
1486-
local_file_upload_concurrency: int = 30,
14871478
):
14881479
"""Uploads predictions and associates them with an existing :class:`Model`.
14891480
@@ -1541,7 +1532,6 @@ def upload_predictions(
15411532
getting timeouts while uploading segmentations with local files, you
15421533
should lower this value from its default of 10. The maximum is 10.
15431534
This is only relevant for asynchronous=False
1544-
local_file_upload_concurrency: Number of concurrent local file uploads.
15451535
15461536
Returns:
15471537
Payload describing the synchronous upload::
@@ -1579,7 +1569,6 @@ def upload_predictions(
15791569
update=update,
15801570
remote_files_per_upload_request=remote_files_per_upload_request,
15811571
local_files_per_upload_request=local_files_per_upload_request,
1582-
local_file_upload_concurrency=local_file_upload_concurrency,
15831572
)
15841573

15851574
def predictions_iloc(self, model, index):
@@ -1686,7 +1675,6 @@ def _upload_items(
16861675
batch_size: int = 20,
16871676
update: bool = False,
16881677
local_files_per_upload_request: int = 10,
1689-
local_file_upload_concurrency: int = 30,
16901678
) -> UploadResponse:
16911679
"""
16921680
Appends images to a dataset with given dataset_id.
@@ -1700,9 +1688,6 @@ def _upload_items(
17001688
local_files_per_upload_request: How large to make each upload request when your
17011689
files are local. If you get timeouts, you may need to lower this from
17021690
its default of 10. The maximum is 10.
1703-
local_file_upload_concurrency: How many local file requests to send
1704-
concurrently. If you start to see gateway timeouts or cloudflare related
1705-
errors, you may need to lower this from its default of 30.
17061691
17071692
Returns:
17081693
UploadResponse
@@ -1720,7 +1705,6 @@ def _upload_items(
17201705
batch_size=batch_size,
17211706
update=update,
17221707
local_files_per_upload_request=local_files_per_upload_request,
1723-
local_file_upload_concurrency=local_file_upload_concurrency,
17241708
)
17251709

17261710
def update_scene_metadata(

nucleus/dataset_item_uploader.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,15 @@ def upload(
3838
batch_size: int = 5000,
3939
update: bool = False,
4040
local_files_per_upload_request: int = 10,
41-
local_file_upload_concurrency: int = 30,
4241
) -> UploadResponse:
4342
"""
4443
4544
Args:
4645
dataset_items: Items to Upload
4746
batch_size: How many items to pool together for a single request for items
4847
without files to upload
49-
files_per_upload_request: How many items to pool together for a single
48+
local_files_per_upload_request: How many items to pool together for a single
5049
request for items with files to upload
51-
5250
update: Update records instead of overwriting
5351
5452
Returns:
@@ -78,9 +76,7 @@ def upload(
7876
self.dataset_id,
7977
items=local_items,
8078
update=update,
81-
batch_size=batch_size,
8279
local_files_per_upload_request=local_files_per_upload_request,
83-
local_file_upload_concurrency=local_file_upload_concurrency,
8480
)
8581
)
8682

@@ -112,9 +108,7 @@ def _process_append_requests_local(
112108
dataset_id: str,
113109
items: Sequence[DatasetItem],
114110
update: bool,
115-
batch_size: int,
116111
local_files_per_upload_request: int,
117-
local_file_upload_concurrency: int,
118112
):
119113
# Batch into requests
120114
requests = []
@@ -127,15 +121,15 @@ def _process_append_requests_local(
127121
requests.append(request)
128122

129123
progressbar = self._client.tqdm_bar(
130-
total=len(requests), desc="Local file batches"
124+
total=len(requests),
125+
desc=f"Uploading {len(items)} items in {len(requests)} batches",
131126
)
132127

133128
return make_many_form_data_requests_concurrently(
134129
self._client,
135130
requests,
136131
f"dataset/{dataset_id}/append",
137132
progressbar=progressbar,
138-
concurrency=local_file_upload_concurrency,
139133
)
140134

141135
def _process_append_requests(

nucleus/model_run.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ def predict(
120120
batch_size: int = 5000,
121121
remote_files_per_upload_request: int = 20,
122122
local_files_per_upload_request: int = 10,
123-
local_file_upload_concurrency: int = 30,
124123
) -> Union[dict, AsyncJob]:
125124
"""
126125
Uploads model outputs as predictions for a model_run. Returns info about the upload.
@@ -145,8 +144,6 @@ def predict(
145144
getting timeouts while uploading segmentations with local files, you
146145
should lower this value from its default of 10. The maximum is 10.
147146
This is only relevant for asynchronous=False
148-
local_file_upload_concurrency: Number of concurrent local file uploads.
149-
This is only relevant for asynchronous=False
150147
:return:
151148
{
152149
"model_run_id": str,
@@ -177,7 +174,6 @@ def predict(
177174
batch_size=batch_size,
178175
remote_files_per_upload_request=remote_files_per_upload_request,
179176
local_files_per_upload_request=local_files_per_upload_request,
180-
local_file_upload_concurrency=local_file_upload_concurrency,
181177
)
182178

183179
def iloc(self, i: int):

tests/test_prediction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ def test_non_existent_taxonomy_category_pred_upload_async(model_run: ModelRun):
782782

783783
status = job.status()
784784
assert status["job_id"] == job.job_id
785-
assert status["status"] == "Errored"
785+
assert status["status"] == "Errored_Server"
786786
assert status["job_progress"] == "0.00"
787787

788788

0 commit comments

Comments
 (0)