Skip to content

Commit b2ad5e6

Browse files
reduce responsibility of Queue
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 6ec265f commit b2ad5e6

File tree

3 files changed

+22
-27
lines changed

3 files changed

+22
-27
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
ExecuteStatementResponse,
4444
GetStatementResponse,
4545
CreateSessionResponse,
46+
GetChunksResponse,
4647
)
4748

4849
logger = logging.getLogger(__name__)
@@ -305,9 +306,7 @@ def get_allowed_session_configurations() -> List[str]:
305306
"""
306307
return list(ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP.keys())
307308

308-
def get_chunk_links(
309-
self, statement_id: str, chunk_index: int
310-
) -> "GetChunksResponse":
309+
def get_chunk_link(self, statement_id: str, chunk_index: int) -> "ExternalLink":
311310
"""
312311
Get links for chunks starting from the specified index.
313312
@@ -316,16 +315,21 @@ def get_chunk_links(
316315
chunk_index: The starting chunk index
317316
318317
Returns:
319-
GetChunksResponse: Response containing external links
318+
ExternalLink: External link for the chunk
320319
"""
321-
from databricks.sql.backend.sea.models.responses import GetChunksResponse
322320

323321
response_data = self.http_client._make_request(
324322
method="GET",
325323
path=self.CHUNK_PATH_WITH_ID_AND_INDEX.format(statement_id, chunk_index),
326324
)
325+
response = GetChunksResponse.from_dict(response_data)
327326

328-
return GetChunksResponse.from_dict(response_data)
327+
links = response.external_links
328+
link = next((l for l in links if l.chunk_index == chunk_index), None)
329+
if not link:
330+
raise Error(f"No link found for chunk index {chunk_index}")
331+
332+
return link
329333

330334
def _get_schema_bytes(self, sea_response) -> Optional[bytes]:
331335
"""

src/databricks/sql/backend/sea/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ExecuteStatementResponse,
2828
GetStatementResponse,
2929
CreateSessionResponse,
30+
GetChunksResponse,
3031
)
3132

3233
__all__ = [
@@ -49,4 +50,5 @@
4950
"ExecuteStatementResponse",
5051
"GetStatementResponse",
5152
"CreateSessionResponse",
53+
"GetChunksResponse",
5254
]

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -381,30 +381,19 @@ def _fetch_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
381381
)
382382

383383
# Use the SEA client to fetch the chunk links
384-
chunk_info = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
385-
links = chunk_info.external_links
384+
link = self._sea_client.get_chunk_link(self._statement_id, chunk_index)
386385

387-
if not links:
388-
logger.debug(
389-
"SeaCloudFetchQueue: No links found for chunk {}".format(chunk_index)
390-
)
391-
return None
392-
393-
# Get the link for the requested chunk
394-
link = next((l for l in links if l.chunk_index == chunk_index), None)
395-
396-
if link:
397-
logger.debug(
398-
"SeaCloudFetchQueue: Link details for chunk {}: row_offset={}, row_count={}, next_chunk_index={}".format(
399-
link.chunk_index,
400-
link.row_offset,
401-
link.row_count,
402-
link.next_chunk_index,
403-
)
386+
logger.debug(
387+
"SeaCloudFetchQueue: Link details for chunk {}: row_offset={}, row_count={}, next_chunk_index={}".format(
388+
link.chunk_index,
389+
link.row_offset,
390+
link.row_count,
391+
link.next_chunk_index,
404392
)
393+
)
405394

406-
if self.download_manager:
407-
self.download_manager.add_links(self._convert_to_thrift_links([link]))
395+
if self.download_manager:
396+
self.download_manager.add_links(self._convert_to_thrift_links([link]))
408397

409398
return link
410399

0 commit comments

Comments
 (0)