Skip to content

[PLT-1008] Export content parses incorrectly #1628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion libs/labelbox/src/labelbox/schema/data_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def _export(
res = client.execute(create_task_query_str,
query_params,
error_log_key="errors")
print(res)
res = res[mutation_name]
task_id = res["taskId"]
is_streamable = res["isStreamable"]
Expand Down
253 changes: 190 additions & 63 deletions libs/labelbox/src/labelbox/schema/export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-

Deprecated: This converter is deprecated and will be removed in a future release.
"""

def __init__(self) -> None:
warnings.warn("JSON converter is deprecated and will be removed in a future release")
warnings.warn(
"JSON converter is deprecated and will be removed in a future release"
)
super().__init__()

def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]:
Expand Down Expand Up @@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods
"""

def __init__(self):
warnings.warn("_MultiGCSFileReader is deprecated and will be removed in a future release")
warnings.warn(
"_MultiGCSFileReader is deprecated and will be removed in a future release"
)
super().__init__()
self._retrieval_strategy = None

Expand All @@ -415,54 +419,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
result = self._retrieval_strategy.get_next_chunk()


@dataclass
class BufferedJsonConverterOutput:
"""Output with the JSON object"""
json: Any


class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]):
"""Converts JSON data in a buffered manner
"""
def convert(
self, input_args: Converter.ConverterInputArgs
) -> Iterator[BufferedJsonConverterOutput]:
yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data))


class _BufferedGCSFileReader(_Reader):
"""Reads data from multiple GCS files and buffer them to disk"""

def __init__(self):
super().__init__()
self._retrieval_strategy = None

def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None:
"""Sets the retrieval strategy."""
self._retrieval_strategy = strategy

def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
if not self._retrieval_strategy:
raise ValueError("retrieval strategy not set")
# create a buffer
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
result = self._retrieval_strategy.get_next_chunk()
while result:
file_info, raw_data = result
temp_file.seek(file_info.offsets.start)
temp_file.write(raw_data)
result = self._retrieval_strategy.get_next_chunk()
# read buffer
with open(temp_file.name, 'r') as temp_file_reopened:
for idx, line in enumerate(temp_file_reopened):
yield _MetadataFileInfo(
offsets=Range(start=0, end=len(line) - 1),
lines=Range(start=idx, end=idx + 1),
file=temp_file.name), line
# manually delete buffer
os.unlink(temp_file.name)


class Stream(Generic[OutputT]):
"""Streams data from a Reader."""

Expand Down Expand Up @@ -520,6 +476,144 @@ def start(
stream_handler(output)


class _BufferedFileRetrieverByOffset(FileRetrieverStrategy): # pylint: disable=too-few-public-methods
"""Retrieves files by offset."""

def __init__(
self,
ctx: _TaskContext,
offset: int,
) -> None:
super().__init__(ctx)
self._current_offset = offset
self._current_line = 0
if self._current_offset >= self._ctx.metadata_header.total_size:
raise ValueError(
f"offset is out of range, max offset is {self._ctx.metadata_header.total_size - 1}"
)

def get_next_chunk(self) -> Optional[Tuple[_MetadataFileInfo, str]]:
if self._current_offset >= self._ctx.metadata_header.total_size:
return None
query = (
f"query GetExportFileFromOffsetPyApi"
f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)"
f"{{task(where: $where)"
f"{{{'exportFileFromOffset'}(streamType: $streamType, offset: $offset)"
f"{{offsets {{start end}} lines {{start end}} file}}"
f"}}}}")
variables = {
"where": {
"id": self._ctx.task_id
},
"streamType": self._ctx.stream_type.value,
"offset": str(self._current_offset),
}
file_info, file_content = self._get_file_content(
query, variables, "exportFileFromOffset")
file_info.offsets.start = self._current_offset
file_info.lines.start = self._current_line
self._current_offset = file_info.offsets.end + 1
self._current_line = file_info.lines.end + 1
return file_info, file_content


class BufferedStream(Generic[OutputT]):
"""Streams data from a Reader."""

def __init__(
self,
ctx: _TaskContext,
):
self._ctx = ctx
self._reader = _BufferedGCSFileReader()
self._converter = _BufferedJsonConverter()
self._reader.set_retrieval_strategy(
_BufferedFileRetrieverByOffset(self._ctx, 0))

def __iter__(self):
yield from self._fetch()

def _fetch(self,) -> Iterator[OutputT]:
"""Fetches the result data.
Returns an iterator that yields the offset and the data.
"""
if self._ctx.metadata_header.total_size is None:
return

stream = self._reader.read()
with self._converter as converter:
for file_info, raw_data in stream:
for output in converter.convert(
Converter.ConverterInputArgs(self._ctx, file_info,
raw_data)):
yield output

def start(
self,
stream_handler: Optional[Callable[[OutputT], None]] = None) -> None:
"""Starts streaming the result data.
Calls the stream_handler for each result.
"""
# this calls the __iter__ method, which in turn calls the _fetch method
for output in self:
if stream_handler:
stream_handler(output)


@dataclass
class BufferedJsonConverterOutput:
"""Output with the JSON object"""
json: Any


class _BufferedJsonConverter(Converter[BufferedJsonConverterOutput]):
"""Converts JSON data in a buffered manner
"""

def convert(
self, input_args: Converter.ConverterInputArgs
) -> Iterator[BufferedJsonConverterOutput]:
yield BufferedJsonConverterOutput(json=json.loads(input_args.raw_data))


class _BufferedGCSFileReader(_Reader):
"""Reads data from multiple GCS files and buffer them to disk"""

def __init__(self):
super().__init__()
self._retrieval_strategy = None

def set_retrieval_strategy(self, strategy: FileRetrieverStrategy) -> None:
"""Sets the retrieval strategy."""
self._retrieval_strategy = strategy

def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
if not self._retrieval_strategy:
raise ValueError("retrieval strategy not set")
# create a buffer
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as temp_file:
result = self._retrieval_strategy.get_next_chunk()
while result:
_, raw_data = result
# there is something wrong with the way the offsets are being calculated
# so just write all of the chunks as is too the file, with pointer initially
# pointed to the start of the file (like what is in GCS) and do not
# rely on offsets for file location
# temp_file.seek(file_info.offsets.start)
temp_file.write(raw_data)
result = self._retrieval_strategy.get_next_chunk()
# read buffer
with open(temp_file.name, 'r') as temp_file_reopened:
for idx, line in enumerate(temp_file_reopened):
yield _MetadataFileInfo(offsets=Range(start=0,
end=len(line) - 1),
lines=Range(start=idx, end=idx + 1),
file=temp_file.name), line
# manually delete buffer
os.unlink(temp_file.name)


class ExportTask:
"""
An adapter class for working with task objects, providing extended functionality
Expand Down Expand Up @@ -645,12 +739,11 @@ def errors(self):
self._task.client, self._task.uid, StreamType.ERRORS)
if metadata_header is None:
return None
Stream(
_TaskContext(self._task.client, self._task.uid, StreamType.ERRORS,
metadata_header),
_BufferedGCSFileReader(),
_BufferedJsonConverter(),
).start(stream_handler=lambda output: data.append(output.json))
BufferedStream(
_TaskContext(
self._task.client, self._task.uid, StreamType.ERRORS,
metadata_header),).start(
stream_handler=lambda output: data.append(output.json))
return data

@property
Expand All @@ -667,12 +760,11 @@ def result(self):
self._task.client, self._task.uid, StreamType.RESULT)
if metadata_header is None:
return []
Stream(
_TaskContext(self._task.client, self._task.uid,
StreamType.RESULT, metadata_header),
_BufferedGCSFileReader(),
_BufferedJsonConverter(),
).start(stream_handler=lambda output: data.append(output.json))
BufferedStream(
_TaskContext(
self._task.client, self._task.uid, StreamType.RESULT,
metadata_header),).start(
stream_handler=lambda output: data.append(output.json))
return data
return self._task.result_url

Expand Down Expand Up @@ -747,6 +839,38 @@ def has_errors(self) -> bool:
total_size = self.get_total_file_size(StreamType.ERRORS)
return total_size is not None and total_size > 0

def get_buffered_stream(
self,
stream_type: StreamType = StreamType.RESULT,
) -> BufferedStream:
"""
Returns the result of the task.

Args:
stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT.

Returns:
Stream: The buffered stream object.

Raises:
ExportTask.ExportTaskException: If the task has failed or is not ready yet.
ValueError: If the task does not have the specified stream type.
"""
if self._task.status == "FAILED":
raise ExportTask.ExportTaskException("Task failed")
if self._task.status != "COMPLETE":
raise ExportTask.ExportTaskException("Task is not ready yet")

metadata_header = self._get_metadata_header(self._task.client,
self._task.uid, stream_type)
if metadata_header is None:
raise ValueError(
f"Task {self._task.uid} does not have a {stream_type.value} stream"
)
return BufferedStream(
_TaskContext(self._task.client, self._task.uid, stream_type,
metadata_header),)

@overload
def get_stream(
self,
Expand All @@ -768,6 +892,9 @@ def get_stream(
converter: Optional[Converter] = None,
stream_type: StreamType = StreamType.RESULT,
) -> Stream:
warnings.warn(
"get_stream is deprecated and will be removed in a future release, use get_buffered_stream"
)
if converter is None:
converter = JsonConverter()
"""Returns the result of the task."""
Expand Down
2 changes: 0 additions & 2 deletions libs/labelbox/src/labelbox/schema/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,8 +1022,6 @@ def create_batches_from_dataset(
consensus_settings = ConsensusSettings(**consensus_settings).dict(
by_alias=True)

print("Creating batches from dataset %s", dataset_id)

method = 'createBatchesFromDataset'
mutation_str = """mutation %sPyApi($projectId: ID!, $input: CreateBatchesFromDatasetInput!) {
project(where: {id: $projectId}) {
Expand Down
7 changes: 6 additions & 1 deletion libs/labelbox/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,14 @@ def consensus_project(client, rand_gen):

@pytest.fixture
def model_config(client, rand_gen, valid_model_id):
model_config = client.create_model_config(name=rand_gen(str), model_id=valid_model_id, inference_params = {"param": "value"})
model_config = client.create_model_config(
name=rand_gen(str),
model_id=valid_model_id,
inference_params={"param": "value"})
yield model_config
client.delete_model_config(model_config.uid)


@pytest.fixture
def consensus_project_with_batch(consensus_project, initial_dataset, rand_gen,
image_url):
Expand Down Expand Up @@ -1050,6 +1054,7 @@ def embedding(client: Client):
yield embedding
embedding.delete()


@pytest.fixture
def valid_model_id():
return "2c903542-d1da-48fd-9db1-8c62571bd3d2"
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ def test_with_data_row_object(self, client, data_row,
assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1
assert (json.loads(list(export_task.get_stream())[0].json_str)
["data_row"]["id"] == data_row.uid)

def test_with_data_row_object_buffered(self, client, data_row,
wait_for_data_row_processing):
data_row = wait_for_data_row_processing(client, data_row)
time.sleep(7) # temp fix for ES indexing delay
export_task = DataRow.export(
client=client,
data_rows=[data_row],
task_name="TestExportDataRow:test_with_data_row_object_buffered",
)
export_task.wait_till_done()
assert export_task.status == "COMPLETE"
assert isinstance(export_task, ExportTask)
assert export_task.has_result()
assert export_task.has_errors() is False
assert export_task.get_total_file_size(
stream_type=StreamType.RESULT) > 0
assert export_task.get_total_lines(stream_type=StreamType.RESULT) == 1
assert list(export_task.get_buffered_stream())[0].json["data_row"]["id"] == data_row.uid

def test_with_id(self, client, data_row, wait_for_data_row_processing):
data_row = wait_for_data_row_processing(client, data_row)
Expand Down
2 changes: 1 addition & 1 deletion libs/labelbox/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,4 +453,4 @@ def print_perf_summary():
num_of_entries = 10 if len(sorted_dict) >= 10 else len(sorted_dict)
slowest_fixtures = [(aaa, sorted_dict[aaa])
for aaa in islice(sorted_dict, num_of_entries)]
print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr)
print("\nTop slowest fixtures:\n", slowest_fixtures, file=sys.stderr)
Loading