Skip to content

Commit eb7ec80

Browse files
reduce redundant code in CloudFetchQueue
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 66d0df6 commit eb7ec80

File tree

2 files changed

+50
-133
lines changed

2 files changed

+50
-133
lines changed

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 41 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -320,37 +320,26 @@ def __init__(
320320
self._statement_id = statement_id
321321
self._total_chunk_count = total_chunk_count
322322

323-
# Track the current chunk we're processing
324-
self._current_chunk_index: Optional[int] = None
325-
self._current_chunk_link: Optional["ExternalLink"] = None
326-
327323
logger.debug(
328324
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
329325
statement_id, total_chunk_count
330326
)
331327
)
332328

333-
if initial_links:
334-
initial_links = []
335-
# logger.debug("SeaCloudFetchQueue: Initial links provided:")
336-
# for link in initial_links:
337-
# logger.debug(
338-
# "- chunk: {}, row offset: {}, row count: {}, next chunk: {}".format(
339-
# link.chunk_index,
340-
# link.row_offset,
341-
# link.row_count,
342-
# link.next_chunk_index,
343-
# )
344-
# )
345-
346-
# Initialize download manager with initial links
329+
initial_link = next((l for l in initial_links if l.chunk_index == 0), None)
330+
if not initial_link:
331+
raise ValueError("No initial link found for chunk index 0")
332+
347333
self.download_manager = ResultFileDownloadManager(
348-
links=self._convert_to_thrift_links(initial_links),
334+
links=[],
349335
max_download_threads=max_download_threads,
350-
lz4_compressed=self.lz4_compressed,
351-
ssl_options=self._ssl_options,
336+
lz4_compressed=lz4_compressed,
337+
ssl_options=ssl_options,
352338
)
353339

340+
# Track the current chunk we're processing
341+
self._current_chunk_link: Optional["ExternalLink"] = initial_link
342+
354343
# Initialize table and position
355344
self.table = self._create_next_table()
356345
if self.table:
@@ -360,129 +349,60 @@ def __init__(
360349
)
361350
)
362351

363-
def _convert_to_thrift_links(
364-
self, links: List["ExternalLink"]
365-
) -> List[TSparkArrowResultLink]:
352+
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
366353
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
367-
if not links:
368-
logger.debug("SeaCloudFetchQueue: No links to convert to Thrift format")
369-
return []
370-
371-
logger.debug(
372-
"SeaCloudFetchQueue: Converting {} links to Thrift format".format(
373-
len(links)
374-
)
375-
)
376-
thrift_links = []
377-
for link in links:
378-
# Parse the ISO format expiration time
379-
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
380-
381-
thrift_link = TSparkArrowResultLink(
382-
fileLink=link.external_link,
383-
expiryTime=expiry_time,
384-
rowCount=link.row_count,
385-
bytesNum=link.byte_count,
386-
startRowOffset=link.row_offset,
387-
httpHeaders=link.http_headers or {},
388-
)
389-
thrift_links.append(thrift_link)
390-
return thrift_links
354+
if not link:
355+
logger.debug("SeaCloudFetchQueue: No link to convert to Thrift format")
356+
return None
391357

392-
def _fetch_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
393-
"""Fetch link for the specified chunk index."""
394358
logger.debug(
395-
"SeaCloudFetchQueue: Fetching chunk {} using SEA client".format(chunk_index)
359+
"SeaCloudFetchQueue: Converting link to Thrift format".format(link)
396360
)
397361

398-
# Use the SEA client to fetch the chunk links
399-
link = self._sea_client.get_chunk_link(self._statement_id, chunk_index)
362+
# Parse the ISO format expiration time
363+
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
400364

401-
logger.debug(
402-
"SeaCloudFetchQueue: Link details for chunk {}: row_offset={}, row_count={}, next_chunk_index={}".format(
403-
link.chunk_index,
404-
link.row_offset,
405-
link.row_count,
406-
link.next_chunk_index,
407-
)
365+
return TSparkArrowResultLink(
366+
fileLink=link.external_link,
367+
expiryTime=expiry_time,
368+
rowCount=link.row_count,
369+
bytesNum=link.byte_count,
370+
startRowOffset=link.row_offset,
371+
httpHeaders=link.http_headers or {},
408372
)
409373

410-
if self.download_manager:
411-
self.download_manager.add_links(self._convert_to_thrift_links([link]))
412-
413-
return link
414-
415374
def _create_next_table(self) -> Union["pyarrow.Table", None]:
416375
"""Create next table by retrieving the logical next downloaded file."""
417-
# if we're still processing the current table, just return it
418-
if self.table is not None and self.table_row_index < self.table.num_rows:
419-
logger.info(
420-
"SeaCloudFetchQueue: Still processing current table, rows left: {}".format(
421-
self.table.num_rows - self.table_row_index
422-
)
423-
)
424-
return self.table
376+
logger.debug(
377+
f"SeaCloudFetchQueue: Creating next table, current chunk link: {self._current_chunk_link}"
378+
)
425379

426-
# if we've reached the end of the response, return None
427-
if (
428-
self._current_chunk_link
429-
and self._current_chunk_link.next_chunk_index is None
430-
):
431-
logger.info(
432-
"SeaCloudFetchQueue: Reached end of chunks (no next chunk index)"
433-
)
380+
if not self._current_chunk_link:
381+
logger.debug("SeaCloudFetchQueue: No current chunk link, returning None")
434382
return None
435383

436-
# Determine the next chunk index
437-
next_chunk_index = (
438-
0
439-
if self._current_chunk_link is None
440-
else self._current_chunk_link.next_chunk_index
441-
)
442-
if next_chunk_index is None:
443-
logger.info(
444-
"SeaCloudFetchQueue: Reached end of chunks (next_chunk_index is None)"
384+
if self.download_manager:
385+
self.download_manager.add_link(
386+
self._convert_to_thrift_link(self._current_chunk_link)
445387
)
446-
return None
447388

448-
logger.info(
449-
"SeaCloudFetchQueue: Trying to get downloaded file for chunk {}".format(
450-
next_chunk_index
451-
)
452-
)
389+
row_offset = self._current_chunk_link.row_offset
390+
arrow_table = self._create_table_at_offset(row_offset)
453391

454-
# Update current chunk to the next one
455-
self._current_chunk_index = next_chunk_index
392+
next_chunk_index = self._current_chunk_link.next_chunk_index
393+
self._current_chunk_link = None
456394
try:
457-
self._current_chunk_link = self._fetch_chunk_link(next_chunk_index)
395+
self._current_chunk_link = self._sea_client.get_chunk_link(
396+
self._statement_id, next_chunk_index
397+
)
458398
except Exception as e:
459399
logger.error(
460400
"SeaCloudFetchQueue: Error fetching link for chunk {}: {}".format(
461-
self._current_chunk_index, e
401+
next_chunk_index, e
462402
)
463403
)
464-
return None
465-
if not self._current_chunk_link:
466-
logger.error(
467-
"SeaCloudFetchQueue: No link found for chunk {}".format(
468-
self._current_chunk_index
469-
)
470-
)
471-
return None
472404

473-
# Get the data for the current chunk
474-
row_offset = self._current_chunk_link.row_offset
475-
476-
logger.info(
477-
"SeaCloudFetchQueue: Current chunk details - index: {}, row_offset: {}, row_count: {}, next_chunk_index: {}".format(
478-
self._current_chunk_link.chunk_index,
479-
self._current_chunk_link.row_offset,
480-
self._current_chunk_link.row_count,
481-
self._current_chunk_link.next_chunk_index,
482-
)
483-
)
484-
485-
return self._create_table_at_offset(row_offset)
405+
return arrow_table
486406

487407

488408
class ThriftCloudFetchQueue(CloudFetchQueue):

src/databricks/sql/cloudfetch/download_manager.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,24 +101,21 @@ def _schedule_downloads(self):
101101
task = self._thread_pool.submit(handler.run)
102102
self._download_tasks.append(task)
103103

104-
def add_links(self, links: List[TSparkArrowResultLink]):
104+
def add_link(self, link: TSparkArrowResultLink):
105105
"""
106106
Add more links to the download manager.
107107
Args:
108108
links: List of links to add
109109
"""
110-
for link in links:
111-
if link.rowCount <= 0:
112-
continue
113-
logger.debug(
114-
"ResultFileDownloadManager: adding file link, start offset {}, row count: {}".format(
115-
link.startRowOffset, link.rowCount
116-
)
117-
)
118-
self._pending_links.append(link)
110+
if link.rowCount <= 0:
111+
return
119112

120-
# Make sure the download queue is always full
121-
self._schedule_downloads()
113+
logger.debug(
114+
"ResultFileDownloadManager: adding file link, start offset {}, row count: {}".format(
115+
link.startRowOffset, link.rowCount
116+
)
117+
)
118+
self._pending_links.append(link)
122119

123120
def _shutdown_manager(self):
124121
# Clear download handlers and shutdown the thread pool

0 commit comments

Comments
 (0)