Skip to content

Commit 92d2c7c

Browse files
authored
Merge pull request #93 from scaleapi/fix_local_upload_jankiness
Fix local upload jankiness
2 parents ac8574f + 492c3a0 commit 92d2c7c

File tree

3 files changed

+93
-27
lines changed

3 files changed

+93
-27
lines changed

nucleus/__init__.py

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
import logging
5656
import os
5757
import urllib.request
58+
from asyncio.tasks import Task
5859
from typing import Any, Dict, List, Optional, Union
5960

6061
import aiohttp
62+
import nest_asyncio
6163
import pkg_resources
6264
import requests
6365
import tqdm
@@ -67,11 +69,11 @@
6769

6870
from .annotation import (
6971
BoxAnnotation,
72+
CuboidAnnotation,
73+
Point,
7074
PolygonAnnotation,
7175
Segment,
7276
SegmentationAnnotation,
73-
Point,
74-
CuboidAnnotation,
7577
)
7678
from .constants import (
7779
ANNOTATION_METADATA_SCHEMA_KEY,
@@ -81,8 +83,8 @@
8183
DATASET_ID_KEY,
8284
DATASET_ITEM_IDS_KEY,
8385
DEFAULT_NETWORK_TIMEOUT_SEC,
84-
EMBEDDINGS_URL_KEY,
8586
EMBEDDING_DIMENSION_KEY,
87+
EMBEDDINGS_URL_KEY,
8688
ERROR_ITEMS,
8789
ERROR_PAYLOAD,
8890
ERRORS_KEY,
@@ -413,28 +415,33 @@ def populate_dataset(
413415

414416
agg_response = UploadResponse(json={DATASET_ID_KEY: dataset_id})
415417

416-
tqdm_local_batches = self.tqdm_bar(local_batches)
417-
418-
tqdm_remote_batches = self.tqdm_bar(remote_batches)
419-
420418
async_responses: List[Any] = []
421419

422-
for batch in tqdm_local_batches:
423-
payload = construct_append_payload(batch, update)
424-
responses = self._process_append_requests_local(
425-
dataset_id, payload, update
420+
if local_batches:
421+
tqdm_local_batches = self.tqdm_bar(
422+
local_batches, desc="Local file batches"
426423
)
427-
async_responses.extend(responses)
428-
429-
for batch in tqdm_remote_batches:
430-
payload = construct_append_payload(batch, update)
431-
responses = self._process_append_requests(
432-
dataset_id=dataset_id,
433-
payload=payload,
434-
update=update,
435-
batch_size=batch_size,
424+
425+
for batch in tqdm_local_batches:
426+
payload = construct_append_payload(batch, update)
427+
responses = self._process_append_requests_local(
428+
dataset_id, payload, update
429+
)
430+
async_responses.extend(responses)
431+
432+
if remote_batches:
433+
tqdm_remote_batches = self.tqdm_bar(
434+
remote_batches, desc="Remote file batches"
436435
)
437-
async_responses.extend(responses)
436+
for batch in tqdm_remote_batches:
437+
payload = construct_append_payload(batch, update)
438+
responses = self._process_append_requests(
439+
dataset_id=dataset_id,
440+
payload=payload,
441+
update=update,
442+
batch_size=batch_size,
443+
)
444+
async_responses.extend(responses)
438445

439446
for response in async_responses:
440447
agg_response.update_response(response)
@@ -449,6 +456,8 @@ def _process_append_requests_local(
449456
local_batch_size: int = 10,
450457
):
451458
def get_files(batch):
459+
for item in batch:
460+
item[UPDATE_KEY] = update
452461
request_payload = [
453462
(
454463
ITEMS_KEY,
@@ -481,14 +490,20 @@ def get_files(batch):
481490
files_per_request.append(get_files(batch))
482491
payload_items.append(batch)
483492

484-
loop = asyncio.get_event_loop()
485-
responses = loop.run_until_complete(
486-
self.make_many_files_requests_asynchronously(
487-
files_per_request,
488-
f"dataset/{dataset_id}/append",
489-
)
493+
future = self.make_many_files_requests_asynchronously(
494+
files_per_request,
495+
f"dataset/{dataset_id}/append",
490496
)
491497

498+
try:
499+
loop = asyncio.get_event_loop()
500+
except RuntimeError: # no event loop running:
501+
loop = asyncio.new_event_loop()
502+
responses = loop.run_until_complete(future)
503+
else:
504+
nest_asyncio.apply(loop)
505+
return loop.run_until_complete(future)
506+
492507
def close_files(request_items):
493508
for item in request_items:
494509
# file buffer in location [1][1]

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ requests = "^2.23.0"
3737
tqdm = "^4.41.0"
3838
dataclasses = { version = "^0.7", python = "^3.6.1, <3.7" }
3939
aiohttp = "^3.7.4"
40+
nest-asyncio = "^1.5.1"
4041

4142
[tool.poetry.dev-dependencies]
4243
poetry = "^1.1.5"

tests/test_dataset.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,56 @@ def test_dataset_create_and_delete(CLIENT):
113113
assert response == {"message": "Beginning dataset deletion..."}
114114

115115

116+
def test_dataset_update_metadata_local(dataset):
117+
dataset.append(
118+
[
119+
DatasetItem(
120+
image_location=LOCAL_FILENAME,
121+
metadata={"snake_field": 0},
122+
reference_id="test_image",
123+
)
124+
]
125+
)
126+
dataset.append(
127+
[
128+
DatasetItem(
129+
image_location=LOCAL_FILENAME,
130+
metadata={"snake_field": 1},
131+
reference_id="test_image",
132+
)
133+
],
134+
update=True,
135+
)
136+
resulting_item = dataset.iloc(0)["item"]
137+
print(resulting_item)
138+
assert resulting_item.metadata["snake_field"] == 1
139+
140+
141+
def test_dataset_update_metadata(dataset):
142+
dataset.append(
143+
[
144+
DatasetItem(
145+
image_location=TEST_IMG_URLS[0],
146+
metadata={"snake_field": 0},
147+
reference_id="test_image",
148+
)
149+
]
150+
)
151+
dataset.append(
152+
[
153+
DatasetItem(
154+
image_location=TEST_IMG_URLS[0],
155+
metadata={"snake_field": 1},
156+
reference_id="test_image",
157+
)
158+
],
159+
update=True,
160+
)
161+
resulting_item = dataset.iloc(0)["item"]
162+
print(resulting_item)
163+
assert resulting_item.metadata["snake_field"] == 1
164+
165+
116166
def test_dataset_append(dataset):
117167
def check_is_expected_response(response):
118168
assert isinstance(response, UploadResponse)

0 commit comments

Comments
 (0)