Skip to content

Commit 66d0df6

Browse files
reduce repetition in arrow tablee creation
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent b2ad5e6 commit 66d0df6

File tree

1 file changed

+30
-94
lines changed

1 file changed

+30
-94
lines changed

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 30 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,32 @@ def _create_empty_table(self) -> "pyarrow.Table":
247247
"""Create a 0-row table with just the schema bytes."""
248248
return create_arrow_table_from_arrow_file(self.schema_bytes, self.description)
249249

250+
def _create_table_at_offset(self, offset: int) -> Union["pyarrow.Table", None]:
251+
"""Create next table by retrieving the logical next downloaded file."""
252+
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
253+
if not self.download_manager:
254+
logger.debug("ThriftCloudFetchQueue: No download manager available")
255+
return None
256+
257+
downloaded_file = self.download_manager.get_next_downloaded_file(offset)
258+
if not downloaded_file:
259+
# None signals no more Arrow tables can be built from the remaining handlers if any remain
260+
return None
261+
262+
arrow_table = create_arrow_table_from_arrow_file(
263+
downloaded_file.file_bytes, self.description
264+
)
265+
266+
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
267+
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
268+
if arrow_table.num_rows > downloaded_file.row_count:
269+
arrow_table = arrow_table.slice(0, downloaded_file.row_count)
270+
271+
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
272+
assert downloaded_file.row_count == arrow_table.num_rows
273+
274+
return arrow_table
275+
250276
@abstractmethod
251277
def _create_next_table(self) -> Union["pyarrow.Table", None]:
252278
"""Create next table by retrieving the logical next downloaded file."""
@@ -365,17 +391,6 @@ def _convert_to_thrift_links(
365391

366392
def _fetch_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
367393
"""Fetch link for the specified chunk index."""
368-
# Check if we already have this chunk as our current chunk
369-
if (
370-
self._current_chunk_link
371-
and self._current_chunk_link.chunk_index == chunk_index
372-
):
373-
logger.debug(
374-
"SeaCloudFetchQueue: Already have current chunk {}".format(chunk_index)
375-
)
376-
return self._current_chunk_link
377-
378-
# We need to fetch this chunk
379394
logger.debug(
380395
"SeaCloudFetchQueue: Fetching chunk {} using SEA client".format(chunk_index)
381396
)
@@ -467,57 +482,7 @@ def _create_next_table(self) -> Union["pyarrow.Table", None]:
467482
)
468483
)
469484

470-
if not self.download_manager:
471-
logger.info("SeaCloudFetchQueue: No download manager available")
472-
return None
473-
474-
downloaded_file = self.download_manager.get_next_downloaded_file(row_offset)
475-
if not downloaded_file:
476-
logger.info(
477-
"SeaCloudFetchQueue: Cannot find downloaded file for row {}".format(
478-
row_offset
479-
)
480-
)
481-
# If we can't find the file for the requested offset, we've reached the end
482-
# This is a change from the original implementation, which would continue with the wrong file
483-
logger.info("SeaCloudFetchQueue: No more files available, ending fetch")
484-
return None
485-
486-
logger.info(
487-
"SeaCloudFetchQueue: Downloaded file details - start_row_offset: {}, row_count: {}".format(
488-
downloaded_file.start_row_offset, downloaded_file.row_count
489-
)
490-
)
491-
492-
arrow_table = create_arrow_table_from_arrow_file(
493-
downloaded_file.file_bytes, self.description
494-
)
495-
496-
logger.info(
497-
"SeaCloudFetchQueue: Created arrow table with {} rows".format(
498-
arrow_table.num_rows
499-
)
500-
)
501-
502-
# Ensure the table has the correct number of rows
503-
if arrow_table.num_rows > downloaded_file.row_count:
504-
logger.info(
505-
"SeaCloudFetchQueue: Arrow table has more rows ({}) than expected ({}), slicing...".format(
506-
arrow_table.num_rows, downloaded_file.row_count
507-
)
508-
)
509-
arrow_table = arrow_table.slice(0, downloaded_file.row_count)
510-
511-
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
512-
assert downloaded_file.row_count == arrow_table.num_rows
513-
514-
logger.info(
515-
"SeaCloudFetchQueue: Found downloaded file for chunk {}, row count: {}, row offset: {}".format(
516-
self._current_chunk_index, arrow_table.num_rows, row_offset
517-
)
518-
)
519-
520-
return arrow_table
485+
return self._create_table_at_offset(row_offset)
521486

522487

523488
class ThriftCloudFetchQueue(CloudFetchQueue):
@@ -581,46 +546,17 @@ def __init__(
581546
self.table = self._create_next_table()
582547

583548
def _create_next_table(self) -> Union["pyarrow.Table", None]:
584-
"""Create next table by retrieving the logical next downloaded file."""
585549
logger.debug(
586550
"ThriftCloudFetchQueue: Trying to get downloaded file for row {}".format(
587551
self.start_row_index
588552
)
589553
)
590-
# Create next table by retrieving the logical next downloaded file, or return None to signal end of queue
591-
if not self.download_manager:
592-
logger.debug("ThriftCloudFetchQueue: No download manager available")
593-
return None
594-
595-
downloaded_file = self.download_manager.get_next_downloaded_file(
596-
self.start_row_index
597-
)
598-
if not downloaded_file:
599-
logger.debug(
600-
"ThriftCloudFetchQueue: Cannot find downloaded file for row {}".format(
601-
self.start_row_index
602-
)
603-
)
604-
# None signals no more Arrow tables can be built from the remaining handlers if any remain
605-
return None
606-
607-
arrow_table = create_arrow_table_from_arrow_file(
608-
downloaded_file.file_bytes, self.description
609-
)
610-
611-
# The server rarely prepares the exact number of rows requested by the client in cloud fetch.
612-
# Subsequently, we drop the extraneous rows in the last file if more rows are retrieved than requested
613-
if arrow_table.num_rows > downloaded_file.row_count:
614-
arrow_table = arrow_table.slice(0, downloaded_file.row_count)
615-
616-
# At this point, whether the file has extraneous rows or not, the arrow table should have the correct num rows
617-
assert downloaded_file.row_count == arrow_table.num_rows
618-
self.start_row_index += arrow_table.num_rows
619-
554+
arrow_table = self._create_table_at_offset(self.start_row_index)
555+
if arrow_table:
556+
self.start_row_index += arrow_table.num_rows
620557
logger.debug(
621558
"ThriftCloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
622559
arrow_table.num_rows, self.start_row_index
623560
)
624561
)
625-
626562
return arrow_table

0 commit comments

Comments
 (0)