Skip to content

Commit e58438b

Browse files
authored
Merge branch 'develop' into PNO/fix_get_overview
2 parents 700982c + 80c730c commit e58438b

File tree

8 files changed

+389
-72
lines changed

8 files changed

+389
-72
lines changed

libs/labelbox/src/labelbox/schema/data_row.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,6 @@ def _export(
356356
res = client.execute(create_task_query_str,
357357
query_params,
358358
error_log_key="errors")
359-
print(res)
360359
res = res[mutation_name]
361360
task_id = res["taskId"]
362361
is_streamable = res["isStreamable"]

libs/labelbox/src/labelbox/schema/export_task.py

Lines changed: 190 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-
114114
115115
Deprecated: This converter is deprecated and will be removed in a future release.
116116
"""
117-
117+
118118
def __init__(self) -> None:
119-
warnings.warn("JSON converter is deprecated and will be removed in a future release")
119+
warnings.warn(
120+
"JSON converter is deprecated and will be removed in a future release"
121+
)
120122
super().__init__()
121123

122124
def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]:
@@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods
397399
"""
398400

399401
def __init__(self):
400-
warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release")
402+
warnings.warn(
403+
"_MultiGCSFileReader is deprecated and will be removed in a future release"
404+
)
401405
super().__init__()
402406
self._retrieval_strategy = None
403407

@@ -415,54 +419,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
415419
result = self._retrieval_strategy.get_next_chunk()
416420

417421

418-
@dataclass
419-
class BufferedJsonConverterOutput:
420-
"""Output with the JSON object"""
421-
json: Any
422-
423-
424-
class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]):
425-
"""Converts JSON data in a buffered manner
426-
"""
427-
def convert(
428-
self, input_args: Converter.ConverterInputArgs
429-
) -> Iterator[BufferedJsonConverterOutput]:
430-
yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data))
431-
432-
433-
class _BufferedGCSFileReader(_Reader):
434-
"""Reads data from multiple GCS files and buffer them to disk"""
435-
436-
def __init__(self):
437-
super().__init__()
438-
self._retrieval_strategy = None
439-
440-
def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None:
441-
"""Sets the retrieval strategy."""
442-
self._retrieval_strategy = strategy
443-
444-
def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
445-
if not self._retrieval_strategy:
446-
raise ValueError("retrieval strategy not set")
447-
# create a buffer
448-
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
449-
result = self._retrieval_strategy.get_next_chunk()
450-
while result:
451-
file_info, raw_data = result
452-
temp_file.seek(file_info.offsets.start)
453-
temp_file.write(raw_data)
454-
result = self._retrieval_strategy.get_next_chunk()
455-
# read buffer
456-
with open(temp_file.name, 'r') as temp_file_reopened:
457-
for idx, line in enumerate(temp_file_reopened):
458-
yield _MetadataFileInfo(
459-
offsets=Range(start=0, end=len(line) - 1),
460-
lines=Range(start=idx, end=idx + 1),
461-
file=temp_file.name), line
462-
# manually delete buffer
463-
os.unlink(temp_file.name)
464-
465-
466422
class Stream(Generic[OutputT]):
467423
"""Streams data from a Reader."""
468424

@@ -520,6 +476,144 @@ def start(
520476
stream_handler(output)
521477

522478

479+
class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods
480+
"""Retrieves files by offset."""
481+
482+
def __init__(
483+
self,
484+
ctx: _TaskContext,
485+
offset: int,
486+
) -> None:
487+
super().__init__(ctx)
488+
self._current_offset = offset
489+
self._current_line = 0
490+
if self._current_offset >= self._ctx.metadata_header.total_size:
491+
raise ValueError(
492+
f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}"
493+
)
494+
495+
def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]:
496+
if self._current_offset >= self._ctx.metadata_header.total_size:
497+
return None
498+
query = (
499+
f"query GetExportFileFromOffsetPyApi"
500+
f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)"
501+
f"{{task(where: $where)"
502+
f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)"
503+
f"{{offsets {{start end}} lines {{start end}} file}}"
504+
f"}}}}")
505+
variables = {
506+
"where": {
507+
"id": self._ctx.task_id
508+
},
509+
"streamType": self._ctx.stream_type.value,
510+
"offset": str(self._current_offset),
511+
}
512+
file_info, file_content = self._get_file_content(
513+
query, variables, "exportFileFromOffset")
514+
file_info.offsets.start = self._current_offset
515+
file_info.lines.start = self._current_line
516+
self._current_offset = file_info.offsets.end + 1
517+
self._current_line = file_info.lines.end + 1
518+
return file_info, file_content
519+
520+
521+
class BufferedStream(Generic[OutputT]):
522+
"""Streams data from a Reader."""
523+
524+
def __init__(
525+
self,
526+
ctx: _TaskContext,
527+
):
528+
self._ctx = ctx
529+
self._reader = _BufferedGCSFileReader()
530+
self._converter = _BufferedJsonConverter()
531+
self._reader.set_retrieval_strategy(
532+
_BufferedFileRetrieverByOffset(self._ctx, 0))
533+
534+
def __iter__(self):
535+
yield from self._fetch()
536+
537+
def _fetch(self,) -> Iterator[OutputT]:
538+
"""Fetches the result data.
539+
Returns an iterator that yields the offset and the data.
540+
"""
541+
if self._ctx.metadata_header.total_size is None:
542+
return
543+
544+
stream = self._reader.read()
545+
with self._converter as converter:
546+
for file_info, raw_data in stream:
547+
for output in converter.convert(
548+
Converter.ConverterInputArgs(self._ctx, file_info,
549+
raw_data)):
550+
yield output
551+
552+
def start(
553+
self,
554+
stream_handler: Optional[Callable[[OutputT], None]] = None) -> None:
555+
"""Starts streaming the result data.
556+
Calls the stream_handler for each result.
557+
"""
558+
# this calls the __iter__ method, which in turn calls the _fetch method
559+
for output in self:
560+
if stream_handler:
561+
stream_handler(output)
562+
563+
564+
@dataclass
565+
class BufferedJsonConverterOutput:
566+
"""Output with the JSON object"""
567+
json: Any
568+
569+
570+
class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]):
571+
"""Converts JSON data in a buffered manner
572+
"""
573+
574+
def convert(
575+
self, input_args: Converter.ConverterInputArgs
576+
) -> Iterator[BufferedJsonConverterOutput]:
577+
yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data))
578+
579+
580+
class _BufferedGCSFileReader(_Reader):
581+
"""Reads data from multiple GCS files and buffer them to disk"""
582+
583+
def __init__(self):
584+
super().__init__()
585+
self._retrieval_strategy = None
586+
587+
def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None:
588+
"""Sets the retrieval strategy."""
589+
self._retrieval_strategy = strategy
590+
591+
def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
592+
if not self._retrieval_strategy:
593+
raise ValueError("retrieval strategy not set")
594+
# create a buffer
595+
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
596+
result = self._retrieval_strategy.get_next_chunk()
597+
while result:
598+
_, raw_data = result
599+
# there is something wrong with the way the offsets are being calculated
600+
# so just write all of the chunks as is too the file, with pointer initially
601+
# pointed to the start of the file (like what is in GCS) and do not
602+
# rely on offsets for file location
603+
# temp_file.seek(file_info.offsets.start)
604+
temp_file.write(raw_data)
605+
result = self._retrieval_strategy.get_next_chunk()
606+
# read buffer
607+
with open(temp_file.name, 'r') as temp_file_reopened:
608+
for idx, line in enumerate(temp_file_reopened):
609+
yield _MetadataFileInfo(offsets=Range(start=0,
610+
end=len(line) - 1),
611+
lines=Range(start=idx, end=idx + 1),
612+
file=temp_file.name), line
613+
# manually delete buffer
614+
os.unlink(temp_file.name)
615+
616+
523617
class ExportTask:
524618
"""
525619
An adapter class for working with task objects, providing extended functionality
@@ -645,12 +739,11 @@ def errors(self):
645739
self._task.client, self._task.uid, StreamType.ERRORS)
646740
if metadata_header is None:
647741
return None
648-
Stream(
649-
_TaskContext(self._task.client, self._task.uid, StreamType.ERRORS,
650-
metadata_header),
651-
_BufferedGCSFileReader(),
652-
_BufferedJsonConverter(),
653-
).start(stream_handler=lambda output: data.append(output.json))
742+
BufferedStream(
743+
_TaskContext(
744+
self._task.client, self._task.uid, StreamType.ERRORS,
745+
metadata_header),).start(
746+
stream_handler=lambda output: data.append(output.json))
654747
return data
655748

656749
@property
@@ -667,12 +760,11 @@ def result(self):
667760
self._task.client, self._task.uid, StreamType.RESULT)
668761
if metadata_header is None:
669762
return []
670-
Stream(
671-
_TaskContext(self._task.client, self._task.uid,
672-
StreamType.RESULT, metadata_header),
673-
_BufferedGCSFileReader(),
674-
_BufferedJsonConverter(),
675-
).start(stream_handler=lambda output: data.append(output.json))
763+
BufferedStream(
764+
_TaskContext(
765+
self._task.client, self._task.uid, StreamType.RESULT,
766+
metadata_header),).start(
767+
stream_handler=lambda output: data.append(output.json))
676768
return data
677769
return self._task.result_url
678770

@@ -747,6 +839,38 @@ def has_errors(self) -> bool:
747839
total_size = self.get_total_file_size(StreamType.ERRORS)
748840
return total_size is not None and total_size > 0
749841

842+
def get_buffered_stream(
843+
self,
844+
stream_type: StreamType = StreamType.RESULT,
845+
) -> BufferedStream:
846+
"""
847+
Returns the result of the task.
848+
849+
Args:
850+
stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT.
851+
852+
Returns:
853+
Stream: The buffered stream object.
854+
855+
Raises:
856+
ExportTask.ExportTaskException: If the task has failed or is not ready yet.
857+
ValueError: If the task does not have the specified stream type.
858+
"""
859+
if self._task.status == "FAILED":
860+
raise ExportTask.ExportTaskException("Task failed")
861+
if self._task.status != "COMPLETE":
862+
raise ExportTask.ExportTaskException("Task is not ready yet")
863+
864+
metadata_header = self._get_metadata_header(self._task.client,
865+
self._task.uid, stream_type)
866+
if metadata_header is None:
867+
raise ValueError(
868+
f"Task {self._task.uid} does not have a {stream_type.value} stream"
869+
)
870+
return BufferedStream(
871+
_TaskContext(self._task.client, self._task.uid, stream_type,
872+
metadata_header),)
873+
750874
@overload
751875
def get_stream(
752876
self,
@@ -768,6 +892,9 @@ def get_stream(
768892
converter: Optional[Converter] = None,
769893
stream_type: StreamType = StreamType.RESULT,
770894
) -> Stream:
895+
warnings.warn(
896+
"get_stream is deprecated and will be removed in a future release, use get_buffered_stream"
897+
)
771898
if converter is None:
772899
converter = JsonConverter()
773900
"""Returns the result of the task."""

libs/labelbox/src/labelbox/schema/project.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,8 +1022,6 @@ def create_batches_from_dataset(
10221022
consensus_settings = ConsensusSettings(**consensus_settings).dict(
10231023
by_alias=True)
10241024

1025-
print("Creating batches from dataset %s", dataset_id)
1026-
10271025
method = 'createBatchesFromDataset'
10281026
mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateBatchesFromDatasetInput!) {
10291027
project(where: {id: $projectId}) {

libs/labelbox/tests/conftest.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,14 @@ def consensus_project(client, rand_gen):
430430

431431
@pytest.fixture
432432
def model_config(client, rand_gen, valid_model_id):
433-
model_config = client.create_model_config(name=rand_gen(str), model_id=valid_model_id, inference_params = {"param": "value"})
433+
model_config = client.create_model_config(
434+
name=rand_gen(str),
435+
model_id=valid_model_id,
436+
inference_params={"param": "value"})
434437
yield model_config
435438
client.delete_model_config(model_config.uid)
436439

440+
437441
@pytest.fixture
438442
def consensus_project_with_batch(consensus_project, initial_dataset, rand_gen,
439443
image_url):
@@ -1050,6 +1054,7 @@ def embedding(client: Client):
10501054
yield embedding
10511055
embedding.delete()
10521056

1057+
10531058
@pytest.fixture
10541059
def valid_model_id():
10551060
return "2c903542-d1da-48fd-9db1-8c62571bd3d2"

libs/labelbox/tests/data/export/streamable/test_export_data_rows_streamable.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ def test_with_data_row_object(self, client, data_row,
2727
assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1
2828
assert (json.loads(list(export_task.get_stream())[0].json_str)
2929
["data_row"]["id"] == data_row.uid)
30+
31+
def test_with_data_row_object_buffered(self, client, data_row,
32+
wait_for_data_row_processing):
33+
data_row = wait_for_data_row_processing(client, data_row)
34+
time.sleep(7) # temp fix for ES indexing delay
35+
export_task = DataRow.export(
36+
client=client,
37+
data_rows=[data_row],
38+
task_name="TestExportDataRow:test_with_data_row_object_buffered",
39+
)
40+
export_task.wait_till_done()
41+
assert export_task.status == "COMPLETE"
42+
assert isinstance(export_task, ExportTask)
43+
assert export_task.has_result()
44+
assert export_task.has_errors() is False
45+
assert export_task.get_total_file_size(
46+
stream_type=StreamType.RESULT) > 0
47+
assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1
48+
assert list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid
3049

3150
def test_with_id(self, client, data_row, wait_for_data_row_processing):
3251
data_row = wait_for_data_row_processing(client, data_row)

libs/labelbox/tests/integration/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,4 +453,4 @@ def print_perf_summary():
453453
num_of_entries = 10 if len(sorted_dict) >= 10 else len(sorted_dict)
454454
slowest_fixtures = [(aaa, sorted_dict[aaa])
455455
for aaa in islice(sorted_dict, num_of_entries)]
456-
print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr)
456+
print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr)

0 commit comments

Comments
 (0)