Skip to content

Commit 87d26b8

Browse files
authored
[PLT-817] Move to buffered reader (#1584)
1 parent d645a29 commit 87d26b8

File tree

2 files changed

+102
-15
lines changed

2 files changed

+102
-15
lines changed

libs/labelbox/src/labelbox/schema/export_task.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
Union,
1717
TYPE_CHECKING,
1818
overload,
19+
Any,
1920
)
2021

2122
import requests
23+
import warnings
24+
import tempfile
25+
import os
2226
from labelbox import pydantic_compat
2327

2428
from labelbox.schema.task import Task
@@ -106,7 +110,14 @@ class JsonConverterOutput:
106110

107111

108112
class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-public-methods
109-
"""Converts JSON data."""
113+
"""Converts JSON data.
114+
115+
Deprecated: This converter is deprecated and will be removed in a future release.
116+
"""
117+
118+
def __init__(self) -> None:
119+
warnings.warn("JSON converter is deprecated and will be removed in a future release")
120+
super().__init__()
110121

111122
def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]:
112123
object_offsets: List[Tuple[int, int]] = []
@@ -166,7 +177,8 @@ class FileConverterOutput:
166177

167178

168179
class FileConverter(Converter[FileConverterOutput]):
169-
"""Converts data to a file."""
180+
"""Converts data to a file.
181+
"""
170182

171183
def __init__(self, file_path: str) -> None:
172184
super().__init__()
@@ -248,6 +260,7 @@ def __init__(
248260

249261
def _find_line_at_offset(self, file_content: str,
250262
target_offset: int) -> int:
263+
# TODO: Remove this, incorrect parsing of JSON to find braces
251264
stack = []
252265
line_number = 0
253266

@@ -313,6 +326,7 @@ def __init__(
313326
)
314327

315328
def _find_offset_of_line(self, file_content: str, target_line: int):
329+
# TODO: Remove this, incorrect parsing of JSON to find braces
316330
start_offset = None
317331
stack = []
318332
line_number = 0
@@ -377,9 +391,13 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
377391

378392

379393
class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods
380-
"""Reads data from multiple GCS files in a seamless way."""
394+
"""Reads data from multiple GCS files in a seamless way.
395+
396+
Deprecated: This reader is deprecated and will be removed in a future release.
397+
"""
381398

382399
def __init__(self):
400+
warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release")
383401
super().__init__()
384402
self._retrieval_strategy = None
385403

@@ -397,6 +415,54 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
397415
result = self._retrieval_strategy.get_next_chunk()
398416

399417

418+
@dataclass
419+
class BufferedJsonConverterOutput:
420+
"""Output with the JSON object"""
421+
json: Any
422+
423+
424+
class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]):
425+
"""Converts JSON data in a buffered manner
426+
"""
427+
def convert(
428+
self, input_args: Converter.ConverterInputArgs
429+
) -> Iterator[BufferedJsonConverterOutput]:
430+
yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data))
431+
432+
433+
class _BufferedGCSFileReader(_Reader):
434+
"""Reads data from multiple GCS files and buffer them to disk"""
435+
436+
def __init__(self):
437+
super().__init__()
438+
self._retrieval_strategy = None
439+
440+
def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None:
441+
"""Sets the retrieval strategy."""
442+
self._retrieval_strategy = strategy
443+
444+
def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
445+
if not self._retrieval_strategy:
446+
raise ValueError("retrieval strategy not set")
447+
# create a buffer
448+
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
449+
result = self._retrieval_strategy.get_next_chunk()
450+
while result:
451+
file_info, raw_data = result
452+
temp_file.seek(file_info.offsets.start)
453+
temp_file.write(raw_data)
454+
result = self._retrieval_strategy.get_next_chunk()
455+
# read buffer
456+
with open(temp_file.name, 'r') as temp_file_reopened:
457+
for idx, line in enumerate(temp_file_reopened):
458+
yield _MetadataFileInfo(
459+
offsets=Range(start=0, end=len(line) - 1),
460+
lines=Range(start=idx, end=idx + 1),
461+
file=temp_file.name), line
462+
# manually delete buffer
463+
os.unlink(temp_file.name)
464+
465+
400466
class Stream(Generic[OutputT]):
401467
"""Streams data from a Reader."""
402468

@@ -582,12 +648,9 @@ def errors(self):
582648
Stream(
583649
_TaskContext(self._task.client, self._task.uid, StreamType.ERRORS,
584650
metadata_header),
585-
_MultiGCSFileReader(),
586-
JsonConverter(),
587-
).start(stream_handler=lambda output: [
588-
data.append(json.loads(row)) for row in output.json_str.split(
589-
'\n') if row
590-
])
651+
_BufferedGCSFileReader(),
652+
_BufferedJsonConverter(),
653+
).start(stream_handler=lambda output: data.append(output.json))
591654
return data
592655

593656
@property
@@ -607,12 +670,9 @@ def result(self):
607670
Stream(
608671
_TaskContext(self._task.client, self._task.uid,
609672
StreamType.RESULT, metadata_header),
610-
_MultiGCSFileReader(),
611-
JsonConverter(),
612-
).start(stream_handler=lambda output: [
613-
data.append(json.loads(row)) for row in output.json_str.split(
614-
'\n') if row
615-
])
673+
_BufferedGCSFileReader(),
674+
_BufferedJsonConverter(),
675+
).start(stream_handler=lambda output: data.append(output.json))
616676
return data
617677
return self._task.result_url
618678

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import json
2+
3+
from unittest.mock import MagicMock, patch
4+
from labelbox.schema.export_task import ExportTask
5+
6+
class TestExportTask:
7+
8+
def test_export_task(self):
9+
with patch('requests.get') as mock_requests_get:
10+
mock_task = MagicMock()
11+
mock_task.client.execute.side_effect = [
12+
{"task": {"exportMetadataHeader": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } },
13+
{"task": {"exportFileFromOffset": { "total_size": 1, "total_lines": 1, "lines": { "start": 0, "end": 1 }, "offsets": { "start": 0, "end": 0 }, "file": "file" } } },
14+
]
15+
mock_task.status = "COMPLETE"
16+
data = {
17+
"data_row": {
18+
"raw_data": """
19+
{"raw_text":"}{"}
20+
{"raw_text":"\\nbad"}
21+
"""
22+
}
23+
}
24+
mock_requests_get.return_value.text = json.dumps(data)
25+
mock_requests_get.return_value.content = "b"
26+
export_task = ExportTask(mock_task, is_export_v2=True)
27+
assert export_task.result[0] == data

0 commit comments

Comments
 (0)