Skip to content

Commit fb53dd9

Browse files
pre-fetch next chunk link on processing current
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent f90b4d4 commit fb53dd9

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

examples/experimental/test_sea_multi_chunk.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from pathlib import Path
1515
from databricks.sql.client import Connection
1616

17-
logging.basicConfig(level=logging.INFO)
17+
logging.basicConfig(level=logging.DEBUG)
1818
logger = logging.getLogger(__name__)
1919

2020

@@ -195,7 +195,7 @@ def main():
195195
sys.exit(1)
196196

197197
# Get row count from command line or use default
198-
requested_row_count = 5000
198+
requested_row_count = 10000
199199

200200
if len(sys.argv) > 1:
201201
try:

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ def __init__(
334334

335335
# Track the current chunk we're processing
336336
self._current_chunk_link: Optional["ExternalLink"] = initial_link
337+
self._download_current_link()
337338

338339
# Initialize table and position
339340
self.table = self._create_next_table()
@@ -351,8 +352,22 @@ def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink
351352
httpHeaders=link.http_headers or {},
352353
)
353354

355+
def _download_current_link(self):
356+
"""Download the current chunk link."""
357+
if not self._current_chunk_link:
358+
return None
359+
360+
if not self.download_manager:
361+
logger.debug("SeaCloudFetchQueue: No download manager, returning")
362+
return None
363+
364+
thrift_link = self._convert_to_thrift_link(self._current_chunk_link)
365+
self.download_manager.add_link(thrift_link)
366+
354367
def _progress_chunk_link(self):
355368
"""Progress to the next chunk link."""
369+
if not self._current_chunk_link:
370+
return None
356371

357372
next_chunk_index = self._current_chunk_link.next_chunk_index
358373

@@ -369,24 +384,19 @@ def _progress_chunk_link(self):
369384
next_chunk_index, e
370385
)
371386
)
387+
return None
388+
372389
logger.debug(
373390
f"SeaCloudFetchQueue: Progressed to link for chunk {next_chunk_index}: {self._current_chunk_link}"
374391
)
392+
self._download_current_link()
375393

376394
def _create_next_table(self) -> Union["pyarrow.Table", None]:
377395
"""Create next table by retrieving the logical next downloaded file."""
378396
if not self._current_chunk_link:
379-
logger.debug("SeaCloudFetchQueue: No current chunk link, returning None")
397+
logger.debug("SeaCloudFetchQueue: No current chunk link, returning")
380398
return None
381399

382-
logger.debug(
383-
f"SeaCloudFetchQueue: Trying to get downloaded file for chunk {self._current_chunk_link.chunk_index}"
384-
)
385-
386-
if self.download_manager:
387-
thrift_link = self._convert_to_thrift_link(self._current_chunk_link)
388-
self.download_manager.add_link(thrift_link)
389-
390400
row_offset = self._current_chunk_link.row_offset
391401
arrow_table = self._create_table_at_offset(row_offset)
392402

0 commit comments

Comments
 (0)