Skip to content

Commit 148e04a

Browse files
author
Val Brodsky
committed
Support for uploading from files
1 parent 3d45f00 commit 148e04a

File tree

3 files changed

+116
-46
lines changed

3 files changed

+116
-46
lines changed

libs/labelbox/src/labelbox/schema/dataset.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,35 @@ def create_data_rows(
280280
InvalidAttributeError: If there are fields in `items` not valid for
281281
a DataRow.
282282
ValueError: When the upload parameters are invalid
283-
"""
284283
284+
NOTE dicts and strings items can not be mixed in the same call. It is a responsibility of the caller to ensure that all items are of the same type.
285+
"""
286+
if isinstance(items[0], str):
287+
items = self._build_from_local_paths(items) # Assume list of file paths
285288
specs = DataRowCreateItem.build(self.uid, items)
286289
return self._exec_upsert_data_rows(specs, file_upload_thread_count)
287290

291+
def _build_from_local_paths(
292+
self,
293+
items: List[str],
294+
file_upload_thread_count=FILE_UPLOAD_THREAD_COUNT) -> List[dict]:
295+
uploaded_items = []
296+
297+
def upload_file(item):
298+
item_url = self.client.upload_file(item)
299+
return {'row_data': item_url, 'external_id': item}
300+
301+
with ThreadPoolExecutor(file_upload_thread_count) as executor:
302+
futures = [
303+
executor.submit(upload_file, item)
304+
for item in items
305+
if isinstance(item, str) and os.path.exists(item)
306+
]
307+
more_items = [future.result() for future in as_completed(futures)]
308+
uploaded_items.extend(more_items)
309+
310+
return uploaded_items
311+
288312
def data_rows_for_external_id(self,
289313
external_id,
290314
limit=10) -> List["DataRow"]:
@@ -589,6 +613,7 @@ def _exec_upsert_data_rows(
589613
self,
590614
specs: List[DataRowItemBase],
591615
file_upload_thread_count: int = FILE_UPLOAD_THREAD_COUNT) -> "Task":
616+
592617
manifest = DataRowUploader.upload_in_chunks(
593618
client=self.client,
594619
specs=specs,

libs/labelbox/src/labelbox/schema/internal/data_row_uploader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from concurrent.futures import ThreadPoolExecutor, as_completed
44

5-
from typing import Iterable, List
5+
from typing import Iterable, List, Union
66

77
from labelbox.exceptions import InvalidQueryError
88
from labelbox.exceptions import InvalidAttributeError

libs/labelbox/tests/integration/test_data_rows.py

Lines changed: 89 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime
44
import json
55
import requests
6+
import os
67

78
from unittest.mock import patch
89
import pytest
@@ -171,66 +172,110 @@ def test_lookup_data_rows(client, dataset):
171172

172173
def test_data_row_bulk_creation(dataset, rand_gen, image_url):
173174
client = dataset.client
175+
data_rows = []
174176
assert len(list(dataset.data_rows())) == 0
175177

176-
with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE',
177-
new=1): # Force chunking
178-
# Test creation using URL
179-
task = dataset.create_data_rows([
180-
{
181-
DataRow.row_data: image_url
182-
},
183-
{
184-
"row_data": image_url
185-
},
186-
])
187-
task.wait_till_done()
188-
assert task.has_errors() is False
189-
assert task.status == "COMPLETE"
178+
try:
179+
with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE',
180+
new=1): # Force chunking
181+
# Test creation using URL
182+
task = dataset.create_data_rows([
183+
{
184+
DataRow.row_data: image_url
185+
},
186+
{
187+
"row_data": image_url
188+
},
189+
])
190+
task.wait_till_done()
191+
assert task.has_errors() is False
192+
assert task.status == "COMPLETE"
190193

191-
results = task.result
192-
assert len(results) == 2
193-
row_data = [result["row_data"] for result in results]
194-
assert row_data == [image_url, image_url]
195-
results_all = task.result_all
196-
row_data = [result["row_data"] for result in results_all]
197-
assert row_data == [image_url, image_url]
194+
results = task.result
195+
assert len(results) == 2
196+
row_data = [result["row_data"] for result in results]
197+
assert row_data == [image_url, image_url]
198+
results_all = task.result_all
199+
row_data = [result["row_data"] for result in results_all]
200+
assert row_data == [image_url, image_url]
198201

199-
data_rows = list(dataset.data_rows())
200-
assert len(data_rows) == 2
201-
assert {data_row.row_data for data_row in data_rows} == {image_url}
202-
assert {data_row.global_key for data_row in data_rows} == {None}
202+
data_rows = list(dataset.data_rows())
203+
assert len(data_rows) == 2
204+
assert {data_row.row_data for data_row in data_rows} == {image_url}
205+
assert {data_row.global_key for data_row in data_rows} == {None}
203206

204-
data_rows = list(dataset.data_rows(from_cursor=data_rows[0].uid))
205-
assert len(data_rows) == 1
207+
data_rows = list(dataset.data_rows(from_cursor=data_rows[0].uid))
208+
assert len(data_rows) == 1
206209

207-
# Test creation using file name
208-
with NamedTemporaryFile() as fp:
209-
data = rand_gen(str).encode()
210-
fp.write(data)
211-
fp.flush()
212-
task = dataset.create_data_rows([fp.name])
210+
finally:
211+
for dr in data_rows:
212+
dr.delete()
213+
214+
215+
@pytest.fixture
216+
def local_image_file(image_url) -> NamedTemporaryFile:
217+
response = requests.get(image_url, stream=True)
218+
response.raise_for_status()
219+
220+
with NamedTemporaryFile(delete=False) as f:
221+
for chunk in response.iter_content(chunk_size=8192):
222+
if chunk:
223+
f.write(chunk)
224+
225+
yield f # Return the path to the temp file
226+
227+
os.remove(f.name)
228+
229+
230+
def test_data_row_bulk_creation_from_file(dataset, local_image_file, image_url):
231+
with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE',
232+
new=1): # Force chunking
233+
task = dataset.create_data_rows(
234+
[local_image_file.name, local_image_file.name])
213235
task.wait_till_done()
214236
assert task.status == "COMPLETE"
237+
assert len(task.result) == 2
238+
assert task.has_errors() is False
239+
results = [r for r in task.result_all]
240+
row_data = [result["row_data"] for result in results]
241+
assert row_data == [image_url, image_url]
242+
215243

244+
def test_data_row_bulk_creation_from_row_data_file_external_id(
245+
dataset, local_image_file, image_url):
246+
with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE',
247+
new=1): # Force chunking
216248
task = dataset.create_data_rows([{
217-
"row_data": fp.name,
249+
"row_data": local_image_file.name,
218250
'external_id': 'some_name'
251+
}, {
252+
"row_data": image_url,
253+
'external_id': 'some_name2'
219254
}])
220-
task.wait_till_done()
221255
assert task.status == "COMPLETE"
256+
assert len(task.result) == 2
257+
assert task.has_errors() is False
258+
results = [r for r in task.result_all]
259+
row_data = [result["row_data"] for result in results]
260+
assert row_data == [image_url, image_url]
261+
222262

223-
task = dataset.create_data_rows([{"row_data": fp.name}])
263+
def test_data_row_bulk_creation_from_row_data_file(dataset, rand_gen,
264+
local_image_file, image_url):
265+
with patch('labelbox.schema.dataset.UPSERT_CHUNK_SIZE',
266+
new=1): # Force chunking
267+
task = dataset.create_data_rows([{
268+
"row_data": local_image_file.name
269+
}, {
270+
"row_data": local_image_file.name
271+
}])
224272
task.wait_till_done()
225273
assert task.status == "COMPLETE"
226-
227-
data_rows = list(dataset.data_rows())
228-
assert len(data_rows) == 5
229-
url = ({data_row.row_data for data_row in data_rows} - {image_url}).pop()
230-
assert requests.get(url).content == data
231-
232-
for dr in data_rows:
233-
dr.delete()
274+
assert len(task.result) == 2
275+
assert task.has_errors() is False
276+
results = [r for r in task.result_all]
277+
row_data = [result["row_data"] for result in results]
278+
assert row_data == [image_url, image_url]
234279

235280

236281
@pytest.mark.slow

0 commit comments

Comments
 (0)