50
50
geometry | dict | Representation of the bounding box in the Box2DGeometry format.\n
51
51
metadata | dict | An arbitrary metadata blob for the annotation.\n
52
52
"""
53
+ import asyncio
53
54
import json
54
55
import logging
55
56
import os
56
57
from typing import Any , Dict , List , Optional , Union
57
58
59
+ import aiohttp
58
60
import pkg_resources
59
61
import requests
60
62
import tqdm
61
63
import tqdm .notebook as tqdm_notebook
62
64
63
- # pylint: disable=E1101
64
- # TODO: refactor to reduce this file to under 1000 lines.
65
- # pylint: disable=C0302
66
-
67
65
from .annotation import (
68
66
BoxAnnotation ,
69
67
PolygonAnnotation ,
70
- SegmentationAnnotation ,
71
68
Segment ,
69
+ SegmentationAnnotation ,
72
70
)
73
71
from .constants import (
74
72
ANNOTATION_METADATA_SCHEMA_KEY ,
122
120
from .slice import Slice
123
121
from .upload_response import UploadResponse
124
122
123
+ # pylint: disable=E1101
124
+ # TODO: refactor to reduce this file to under 1000 lines.
125
+ # pylint: disable=C0302
126
+
127
+
125
128
__version__ = pkg_resources .get_distribution ("scale-nucleus" ).version
126
129
127
130
logger = logging .getLogger (__name__ )
@@ -141,7 +144,6 @@ def __init__(
141
144
api_key : str ,
142
145
use_notebook : bool = False ,
143
146
endpoint : str = None ,
144
- verify : bool = True ,
145
147
):
146
148
self .api_key = api_key
147
149
self .tqdm_bar = tqdm .tqdm
@@ -424,13 +426,12 @@ def get_files(batch):
424
426
files_per_request .append (get_files (batch ))
425
427
payload_items .append (batch )
426
428
427
- responses = [
428
- self ._make_files_request (
429
- files = files ,
430
- route = f"dataset/{ dataset_id } /append" ,
429
+ responses = asyncio . run (
430
+ self .make_many_files_requests_asynchronously (
431
+ files_per_request ,
432
+ f"dataset/{ dataset_id } /append" ,
431
433
)
432
- for files in files_per_request
433
- ]
434
+ )
434
435
435
436
def close_files (request_items ):
436
437
for item in request_items :
@@ -444,6 +445,70 @@ def close_files(request_items):
444
445
445
446
return responses
446
447
448
+ async def make_many_files_requests_asynchronously (
449
+ self , files_per_request , route
450
+ ):
451
+ """
452
+ Makes an async post request with files to a Nucleus endpoint.
453
+
454
+ :param files_per_request: A list of lists of tuples (name, (filename, file_pointer, content_type))
455
+ name will become the name by which the multer can build an array.
456
+ :param route: route for the request
457
+ :return: awaitable list(response)
458
+ """
459
+ async with aiohttp .ClientSession () as session :
460
+ tasks = [
461
+ asyncio .ensure_future (
462
+ self ._make_files_request (
463
+ files = files , route = route , session = session
464
+ )
465
+ )
466
+ for files in files_per_request
467
+ ]
468
+ return await asyncio .gather (* tasks )
469
+
470
+ async def _make_files_request (
471
+ self ,
472
+ files ,
473
+ route : str ,
474
+ session : aiohttp .ClientSession ,
475
+ ):
476
+ """
477
+ Makes an async post request with files to a Nucleus endpoint.
478
+
479
+ :param files: A list of tuples (filename, file_pointer, file_type)
480
+ :param route: route for the request
481
+ :param session: Session to use for post.
482
+ :return: response
483
+ """
484
+ endpoint = f"{ self .endpoint } /{ route } "
485
+
486
+ logger .info ("Posting to %s" , endpoint )
487
+
488
+ form = aiohttp .FormData ()
489
+
490
+ for file in files :
491
+ form .add_field (
492
+ name = file [0 ],
493
+ filename = file [1 ][0 ],
494
+ value = file [1 ][1 ],
495
+ content_type = file [1 ][2 ],
496
+ )
497
+
498
+ async with session .post (
499
+ endpoint ,
500
+ data = form ,
501
+ auth = aiohttp .BasicAuth (self .api_key , "" ),
502
+ timeout = DEFAULT_NETWORK_TIMEOUT_SEC ,
503
+ ) as response :
504
+ logger .info ("API request has response code %s" , response .status )
505
+ if not response .ok :
506
+ self .handle_bad_response (
507
+ endpoint , session .post , aiohttp_response = response
508
+ )
509
+
510
+ return await response .json ()
511
+
447
512
def _process_append_requests (
448
513
self ,
449
514
dataset_id : str ,
@@ -1013,35 +1078,6 @@ def delete_custom_index(self, dataset_id: str):
1013
1078
requests_command = requests .delete ,
1014
1079
)
1015
1080
1016
- def _make_files_request (
1017
- self , files , route : str , requests_command = requests .post
1018
- ):
1019
- """
1020
- Makes a request to Nucleus endpoint. This method returns the raw
1021
- requests.Response object which is useful for unit testing.
1022
-
1023
- :param payload: given payload
1024
- :param endpoint: endpoint + route for the request
1025
- :param requests_command: requests.post, requests.get, requests.delete
1026
- :return: response
1027
- """
1028
- endpoint = f"{ self .endpoint } /{ route } "
1029
-
1030
- logger .info ("Posting to %s" , endpoint )
1031
-
1032
- response = requests_command (
1033
- endpoint ,
1034
- files = files ,
1035
- auth = (self .api_key , "" ),
1036
- timeout = DEFAULT_NETWORK_TIMEOUT_SEC ,
1037
- )
1038
- logger .info ("API request has response code %s" , response .status_code )
1039
-
1040
- if not response .ok :
1041
- self .handle_bad_response (endpoint , requests_command , response )
1042
-
1043
- return response .json ()
1044
-
1045
1081
def make_request (
1046
1082
self , payload : dict , route : str , requests_command = requests .post
1047
1083
) -> dict :
@@ -1072,5 +1108,13 @@ def make_request(
1072
1108
1073
1109
return response .json ()
1074
1110
1075
- def handle_bad_response (self , endpoint , requests_command , response ):
1076
- raise NucleusAPIError (endpoint , requests_command , response )
1111
+ def handle_bad_response (
1112
+ self ,
1113
+ endpoint ,
1114
+ requests_command ,
1115
+ requests_response = None ,
1116
+ aiohttp_response = None ,
1117
+ ):
1118
+ raise NucleusAPIError (
1119
+ endpoint , requests_command , requests_response , aiohttp_response
1120
+ )
0 commit comments