Skip to content

Commit d68e4ea

Browse files
re-introduce schema_bytes for better abstraction (likely temporary)
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent a165f1c commit d68e4ea

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

src/databricks/sql/cloud_fetch_queue.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,21 +135,23 @@ def __init__(
135135
self,
136136
max_download_threads: int,
137137
ssl_options: SSLOptions,
138+
schema_bytes: bytes,
138139
lz4_compressed: bool = True,
139140
description: Optional[List[Tuple[Any, ...]]] = None,
140141
):
141142
"""
142143
Initialize the base CloudFetchQueue.
143144
144145
Args:
145-
schema_bytes: Arrow schema bytes
146146
max_download_threads: Maximum number of download threads
147147
ssl_options: SSL options for downloads
148+
schema_bytes: Arrow schema bytes
148149
lz4_compressed: Whether the data is LZ4 compressed
149150
description: Column descriptions
150151
"""
151152
self.lz4_compressed = lz4_compressed
152153
self.description = description
154+
self.schema_bytes = schema_bytes
153155
self._ssl_options = ssl_options
154156
self.max_download_threads = max_download_threads
155157

@@ -191,7 +193,6 @@ def next_n_rows(self, num_rows: int) -> "pyarrow.Table":
191193
"""Get up to the next n rows of the cloud fetch Arrow dataframes."""
192194
if not self.table:
193195
# Return empty pyarrow table to cause retry of fetch
194-
logger.info("SeaCloudFetchQueue: No table available, returning empty table")
195196
return self._create_empty_table()
196197

197198
logger.info("SeaCloudFetchQueue: Retrieving up to {} rows".format(num_rows))
@@ -309,6 +310,7 @@ def __init__(
309310
super().__init__(
310311
max_download_threads=max_download_threads,
311312
ssl_options=ssl_options,
313+
schema_bytes=b"",
312314
lz4_compressed=lz4_compressed,
313315
description=description,
314316
)
@@ -435,11 +437,11 @@ def __init__(
435437
super().__init__(
436438
max_download_threads=max_download_threads,
437439
ssl_options=ssl_options,
440+
schema_bytes=schema_bytes,
438441
lz4_compressed=lz4_compressed,
439442
description=description,
440443
)
441444

442-
self.schema_bytes = schema_bytes
443445
self.start_row_index = start_row_offset
444446
self.result_links = result_links or []
445447

src/databricks/sql/result_set.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,18 @@ def _fill_results_buffer(self):
272272
self.results = results
273273
self.has_more_rows = has_more_rows
274274

275+
def _convert_columnar_table(self, table):
276+
column_names = [c[0] for c in self.description]
277+
ResultRow = Row(*column_names)
278+
result = []
279+
for row_index in range(table.num_rows):
280+
curr_row = []
281+
for col_index in range(table.num_columns):
282+
curr_row.append(table.get_item(col_index, row_index))
283+
result.append(ResultRow(*curr_row))
284+
285+
return result
286+
275287
def merge_columnar(self, result1, result2) -> "ColumnTable":
276288
"""
277289
Function to merge / combining the columnar results into a single result

0 commit comments

Comments
 (0)