Skip to content

Commit 015fb76

Browse files
remove reliance on schema_bytes in SEA
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 5b49405 commit 015fb76

File tree

4 files changed

+6
-96
lines changed

4 files changed

+6
-96
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 1 addition & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -331,74 +331,6 @@ def get_chunk_link(self, statement_id: str, chunk_index: int) -> "ExternalLink":
331331

332332
return link
333333

334-
def _get_schema_bytes(self, sea_response) -> Optional[bytes]:
335-
"""
336-
Extract schema bytes from the SEA response.
337-
338-
For ARROW format, we need to get the schema bytes from the first chunk.
339-
If the first chunk is not available, we need to get it from the server.
340-
341-
Args:
342-
sea_response: The response from the SEA API
343-
344-
Returns:
345-
bytes: The schema bytes or None if not available
346-
"""
347-
import requests
348-
import lz4.frame
349-
350-
# Check if we have the first chunk in the response
351-
result_data = sea_response.get("result", {})
352-
external_links = result_data.get("external_links", [])
353-
354-
if not external_links:
355-
return None
356-
357-
# Find the first chunk (chunk_index = 0)
358-
first_chunk = None
359-
for link in external_links:
360-
if link.get("chunk_index") == 0:
361-
first_chunk = link
362-
break
363-
364-
if not first_chunk:
365-
# Try to fetch the first chunk from the server
366-
statement_id = sea_response.get("statement_id")
367-
if not statement_id:
368-
return None
369-
370-
chunks_response = self.get_chunk_links(statement_id, 0)
371-
if not chunks_response.external_links:
372-
return None
373-
374-
first_chunk = chunks_response.external_links[0].__dict__
375-
376-
# Download the first chunk to get the schema bytes
377-
external_link = first_chunk.get("external_link")
378-
http_headers = first_chunk.get("http_headers", {})
379-
380-
if not external_link:
381-
return None
382-
383-
# Use requests to download the first chunk
384-
http_response = requests.get(
385-
external_link,
386-
headers=http_headers,
387-
verify=self.ssl_options.tls_verify,
388-
)
389-
390-
if http_response.status_code != 200:
391-
raise Error(f"Failed to download schema bytes: {http_response.text}")
392-
393-
# Extract schema bytes from the Arrow file
394-
# The schema is at the beginning of the file
395-
data = http_response.content
396-
if sea_response.get("manifest", {}).get("result_compression") == "LZ4_FRAME":
397-
data = lz4.frame.decompress(data)
398-
399-
# Return the schema bytes
400-
return data
401-
402334
def _results_message_to_execute_response(self, sea_response, command_id):
403335
"""
404336
Convert a SEA response to an ExecuteResponse and extract result data.
@@ -441,13 +373,6 @@ def _results_message_to_execute_response(self, sea_response, command_id):
441373
)
442374
description = columns if columns else None
443375

444-
# Extract schema bytes for Arrow format
445-
schema_bytes = None
446-
format = manifest_data.get("format")
447-
if format == "ARROW_STREAM":
448-
# For ARROW format, we need to get the schema bytes
449-
schema_bytes = self._get_schema_bytes(sea_response)
450-
451376
# Check for compression
452377
lz4_compressed = manifest_data.get("result_compression") == "LZ4_FRAME"
453378

@@ -502,7 +427,7 @@ def _results_message_to_execute_response(self, sea_response, command_id):
502427
has_been_closed_server_side=False,
503428
lz4_compressed=lz4_compressed,
504429
is_staging_operation=False,
505-
arrow_schema_bytes=schema_bytes,
430+
arrow_schema_bytes=None,
506431
result_format=manifest_data.get("format"),
507432
)
508433

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ class SeaCloudFetchQueue(CloudFetchQueue):
285285
def __init__(
286286
self,
287287
initial_links: List["ExternalLink"],
288-
schema_bytes: bytes,
289288
max_download_threads: int,
290289
ssl_options: SSLOptions,
291290
sea_client: "SeaDatabricksClient",
@@ -309,7 +308,7 @@ def __init__(
309308
description: Column descriptions
310309
"""
311310
super().__init__(
312-
schema_bytes=schema_bytes,
311+
schema_bytes=b"",
313312
max_download_threads=max_download_threads,
314313
ssl_options=ssl_options,
315314
lz4_compressed=lz4_compressed,
@@ -344,10 +343,6 @@ def __init__(
344343

345344
def _convert_to_thrift_link(self, link: "ExternalLink") -> TSparkArrowResultLink:
346345
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
347-
logger.debug(
348-
"SeaCloudFetchQueue: Converting link to Thrift format".format(link)
349-
)
350-
351346
# Parse the ISO format expiration time
352347
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
353348
return TSparkArrowResultLink(
@@ -470,9 +465,9 @@ def _create_next_table(self) -> Union["pyarrow.Table", None]:
470465
arrow_table = self._create_table_at_offset(self.start_row_index)
471466
if arrow_table:
472467
self.start_row_index += arrow_table.num_rows
473-
logger.debug(
474-
"ThriftCloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
475-
arrow_table.num_rows, self.start_row_index
468+
logger.debug(
469+
"ThriftCloudFetchQueue: Found downloaded file, row count: {}, new start offset: {}".format(
470+
arrow_table.num_rows, self.start_row_index
471+
)
476472
)
477-
)
478473
return arrow_table

src/databricks/sql/result_set.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,9 +497,6 @@ def __init__(
497497
manifest,
498498
str(self.statement_id),
499499
description=desc,
500-
schema_bytes=execute_response.arrow_schema_bytes
501-
if execute_response.arrow_schema_bytes
502-
else None,
503500
max_download_threads=sea_client.max_download_threads,
504501
ssl_options=sea_client.ssl_options,
505502
sea_client=sea_client,

src/databricks/sql/utils.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ def build_queue(
132132
manifest: Optional[ResultManifest],
133133
statement_id: str,
134134
description: Optional[List[Tuple[Any, ...]]] = None,
135-
schema_bytes: Optional[bytes] = None,
136135
max_download_threads: Optional[int] = None,
137136
ssl_options: Optional[SSLOptions] = None,
138137
sea_client: Optional["SeaDatabricksClient"] = None,
@@ -146,7 +145,6 @@ def build_queue(
146145
manifest (ResultManifest): Manifest from SEA response
147146
statement_id (str): Statement ID for the query
148147
description (List[List[Any]]): Column descriptions
149-
schema_bytes (bytes): Arrow schema bytes
150148
max_download_threads (int): Maximum number of download threads
151149
ssl_options (SSLOptions): SSL options for downloads
152150
sea_client (SeaDatabricksClient): SEA client for fetching additional links
@@ -160,10 +158,6 @@ def build_queue(
160158
return JsonQueue(sea_result_data.data)
161159
elif sea_result_data.external_links is not None:
162160
# EXTERNAL_LINKS disposition
163-
if not schema_bytes:
164-
raise ValueError(
165-
"Schema bytes are required for EXTERNAL_LINKS disposition"
166-
)
167161
if not max_download_threads:
168162
raise ValueError(
169163
"Max download threads is required for EXTERNAL_LINKS disposition"
@@ -181,7 +175,6 @@ def build_queue(
181175

182176
return SeaCloudFetchQueue(
183177
initial_links=sea_result_data.external_links,
184-
schema_bytes=schema_bytes,
185178
max_download_threads=max_download_threads,
186179
ssl_options=ssl_options,
187180
sea_client=sea_client,

0 commit comments

Comments
 (0)